2 Commits

Author SHA1 Message Date
zero@thinky 428140ff15 feat(backend): add job metrics
ci-agent / build (push) Failing after 3m1s
2026-04-05 00:44:57 +03:00
zero@thinky 7be99f8e91 feat: big ahh commit
- agent+proto+backend: transfer service status
- agent: fix returning empty message on nonzero exit status
- backend: refactor collector+commander and handlers dependent on them: implement agent accounting via grpc stats handler
2026-04-05 00:44:56 +03:00
13 changed files with 618 additions and 190 deletions
+1 -1
View File
@@ -3,7 +3,7 @@ module gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent
go 1.26.1
require (
gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto v0.0.0-20260403214837-94be9799f47d
gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto v0.0.0-20260404174628-3389df740c20
github.com/hpcloud/tail v1.0.0
github.com/samber/lo v1.53.0
golang.org/x/sync v0.20.0
+8 -2
View File
@@ -47,8 +47,14 @@ func (*CommandExecutor) Execute(command *proto.Command) (fc *proto.FinishedComma
_, err := io.Copy(stderrbuf, stderr)
return err
})
if err := cmd.Wait(); err != nil {
return nil, err
if waitErr := cmd.Wait(); waitErr != nil {
var exitErr *exec.ExitError
if !errors.As(waitErr, &exitErr) {
return nil, waitErr
}
fc.Status = int32(exitErr.ExitCode())
} else {
fc.Status = int32(cmd.ProcessState.ExitCode())
}
if err := eg.Wait(); err != nil {
return nil, err
+57
View File
@@ -22,6 +22,7 @@ import (
"github.com/samber/lo"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
)
@@ -110,6 +111,13 @@ func main() {
return ccli.HandleCommands(ctx, grpcAddr, creds)
})
// Start services update stream
if len(cfg.Services) > 0 {
wg.Go(func() error {
return reportServices(ctx, grpcAddr, creds, cfg.Label, cfg.Services, lgr)
})
}
// Start log collectors
if len(cfg.Services) > 0 {
wg.Go(func() error {
@@ -301,3 +309,52 @@ func reconnectStream(
return fmt.Errorf("failed to reconnect after 5 attempts for service %s", service)
}
// reportServices periodically sends service status updates to the backend via gRPC.
// For now, all configured services are reported as "up" every 5 seconds.
func reportServices(
ctx context.Context,
grpcAddr string,
creds credentials.TransportCredentials,
label string,
services []config.ServiceConfig,
lgr *logger.Logger,
) error {
conn, err := grpc.NewClient(grpcAddr, grpc.WithTransportCredentials(creds))
if err != nil {
return fmt.Errorf("failed to connect for services report: %w", err)
}
defer conn.Close()
ccli := proto.NewCollectorClient(conn)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
// Send immediately on start, then every 5 seconds
for {
svcUpdates := make([]*proto.ServicesUpdate_ServiceUpdate, 0, len(services))
for _, svc := range services {
svcUpdates = append(svcUpdates, &proto.ServicesUpdate_ServiceUpdate{
Name: svc.Name,
Status: "up",
})
}
md := metadata.New(map[string]string{"whoami": label})
_, err := ccli.ReportServices(
metadata.NewOutgoingContext(ctx, md),
&proto.ServicesUpdate{Services: svcUpdates},
)
if err != nil {
lgr.Warn("Failed to report services", "err", err)
} else {
lgr.Debug("Services reported successfully", "count", len(services))
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
}
}
+19 -6
View File
@@ -82,10 +82,13 @@ func main() {
}()
}
// Initialize Collector gRPC service
coll := collector.New(logRepo)
// Initialize Collector (log streaming) with its own ConnTracker
collTracker := collector.NewConnTracker()
coll := collector.New(logRepo, collTracker)
cmdr := commander.New(jobRepo)
// Initialize ConnTracker for Commander agent lifecycle
cmdTracker := commander.NewConnTracker()
cmdr := commander.New(jobRepo, cmdTracker)
// Initialize script interpreter repository and service
scriptRepo := repository.NewScriptInterpreterRepo(db)
@@ -93,8 +96,11 @@ func main() {
log.Printf("Warning: failed to initialize script interpreters table: %v", err)
}
scriptSvc := service.NewScriptServiceWithInterpreters(h.Repo, scriptRepo)
scriptHandlers := handlers.NewScriptHandlers(scriptSvc, cmdr)
jobsHandlers := handlers.NewJobsHandlers(cmdr, scriptSvc)
scriptHandlers := handlers.NewScriptHandlers(scriptSvc, cmdTracker)
jobsHandlers := handlers.NewJobsHandlers(cmdTracker, scriptSvc,
os.Getenv("WHEREAMI"), /* our address for redirects */
jobRepo,
)
// Initialize script management service and handlers
scriptManageSvc := service.NewScriptService(h.Repo)
@@ -201,6 +207,9 @@ func main() {
jobsGroup.Use(auth.AuthMiddleware(), handlers.RequireAdmin())
{
jobsGroup.POST("", jobsHandlers.AddJob)
jobsGroup.POST("/:id/wait", jobsHandlers.WaitJob)
jobsGroup.GET("/metrics", jobsHandlers.GetJobMetrics)
jobsGroup.POST("/check_cmd", jobsHandlers.CheckCmd)
}
// Agent registration
@@ -290,7 +299,11 @@ func main() {
MinVersion: tls.VersionTLS12,
}
grpcServer := grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsConfig)))
grpcServer := grpc.NewServer(
grpc.Creds(credentials.NewTLS(tlsConfig)),
grpc.StatsHandler(collTracker),
grpc.StatsHandler(cmdTracker),
)
proto.RegisterCommanderServer(grpcServer, cmdr)
proto.RegisterCollectorServer(grpcServer, coll)
+1 -1
View File
@@ -3,7 +3,7 @@ module gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend
go 1.26.1
require (
gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto v0.0.0-20260404165608-1d75935a08a5
gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto v0.0.0-20260404174628-3389df740c20
github.com/ClickHouse/clickhouse-go/v2 v2.44.0
github.com/gin-gonic/gin v1.12.0
github.com/samber/lo v1.53.0
+12 -41
View File
@@ -4,7 +4,6 @@ import (
"fmt"
"io"
"log"
"sync"
"time"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/repository"
@@ -13,26 +12,19 @@ import (
"google.golang.org/grpc/metadata"
)
// Collector handles log streaming from connected agents.
type Collector struct {
proto.UnimplementedCollectorServer
logRepo *repository.LogRepository
agents map[string]*Agent
mu sync.RWMutex
tracker *ConnTracker
batchSize int
flushInterval time.Duration
}
type Agent struct {
ID string
Label string
Services []string
ConnectedAt time.Time
}
func New(logRepo *repository.LogRepository) *Collector {
func New(logRepo *repository.LogRepository, tracker *ConnTracker) *Collector {
return &Collector{
logRepo: logRepo,
agents: make(map[string]*Agent),
tracker: tracker,
batchSize: 100,
flushInterval: 2 * time.Second,
}
@@ -56,27 +48,15 @@ func (c *Collector) Stream(stream proto.Collector_StreamServer) error {
}
service := serviceVals[0]
servicesVals := md["services"]
var services []string
if len(servicesVals) > 0 {
services = servicesVals
}
// Register agent
c.mu.Lock()
c.agents[agentName] = &Agent{
agent := &Agent{
ID: agentName,
Label: agentName,
Services: services,
Services: make([]Service, 0),
ConnectedAt: time.Now(),
}
c.mu.Unlock()
defer func() {
c.mu.Lock()
delete(c.agents, agentName)
c.mu.Unlock()
}()
c.tracker.Register(agent)
defer c.tracker.Unregister(agent.ID)
log.Printf("Agent %s connected, streaming logs for service: %s", agentName, service)
@@ -139,7 +119,6 @@ func (c *Collector) Stream(stream proto.Collector_StreamServer) error {
for {
select {
case <-stream.Context().Done():
// Context cancelled, flush remaining
_ = flush()
return stream.Context().Err()
case <-ticker.C:
@@ -162,7 +141,6 @@ func (c *Collector) Stream(stream proto.Collector_StreamServer) error {
}
case err := <-errCh:
if err == io.EOF {
// Client closed stream
return flush()
}
return fmt.Errorf("failed to receive: %w", err)
@@ -170,19 +148,12 @@ func (c *Collector) Stream(stream proto.Collector_StreamServer) error {
}
}
// GetAgent delegates to the tracker.
func (c *Collector) GetAgent(name string) (*Agent, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
a, ok := c.agents[name]
return a, ok
return c.tracker.GetAgent(name)
}
// Agents delegates to the tracker.
func (c *Collector) Agents() []*Agent {
c.mu.RLock()
defer c.mu.RUnlock()
result := make([]*Agent, 0, len(c.agents))
for _, a := range c.agents {
result = append(result, a)
}
return result
return c.tracker.Agents()
}
@@ -0,0 +1,38 @@
package collector
import (
"context"
"fmt"
"log"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto/proto"
"google.golang.org/grpc/metadata"
)
// ReportServices handles a unary service status update from an agent.
// Agents send their current services list, which is stored in the collector.
func (c *Collector) ReportServices(ctx context.Context, req *proto.ServicesUpdate) (*proto.ServicesUpdateResp, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, fmt.Errorf("no metadata in context")
}
whoamiVals := md["whoami"]
if len(whoamiVals) == 0 {
return nil, fmt.Errorf("whoami metadata missing")
}
agentName := whoamiVals[0]
services := make([]Service, 0, len(req.Services))
for _, s := range req.Services {
services = append(services, Service{s.Name, s.Status})
}
if ok := c.tracker.UpdateServices(agentName, services); ok {
log.Printf("Updated services for agent %s: %v", agentName, services)
} else {
log.Printf("Warning: received services update for unknown agent %s", agentName)
}
return &proto.ServicesUpdateResp{}, nil
}
@@ -0,0 +1,111 @@
package collector
import (
"context"
"log"
"sync"
"time"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
)
// ConnTracker tracks connected Collector agents and handles cleanup on disconnect.
// It implements grpc.StatsHandler for disconnect detection.
type ConnTracker struct {
mu sync.RWMutex
agents map[string]*Agent
}
func NewConnTracker() *ConnTracker {
return &ConnTracker{
agents: make(map[string]*Agent),
}
}
// Register adds an agent to the tracker. Called by Collector.Stream().
func (t *ConnTracker) Register(agent *Agent) {
t.mu.Lock()
t.agents[agent.ID] = agent
t.mu.Unlock()
log.Printf("[collector] agent registered: %s", agent.ID)
}
// Unregister removes an agent from the tracker.
func (t *ConnTracker) Unregister(id string) {
t.mu.Lock()
delete(t.agents, id)
t.mu.Unlock()
log.Printf("[collector] agent unregistered: %s", id)
}
// GetAgent returns the agent for the given ID.
func (t *ConnTracker) GetAgent(id string) (*Agent, bool) {
t.mu.RLock()
defer t.mu.RUnlock()
a, ok := t.agents[id]
return a, ok
}
// Agents returns all connected agents.
func (t *ConnTracker) Agents() []*Agent {
t.mu.RLock()
defer t.mu.RUnlock()
result := make([]*Agent, 0, len(t.agents))
for _, a := range t.agents {
result = append(result, a)
}
return result
}
// grpc.StatsHandler implementation.
func (t *ConnTracker) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
return ctx
}
func (t *ConnTracker) HandleRPC(ctx context.Context, _ stats.RPCStats) {}
func (t *ConnTracker) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}
func (t *ConnTracker) HandleConn(ctx context.Context, s stats.ConnStats) {
switch s.(type) {
case *stats.ConnEnd:
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return
}
whoamiVals := md["whoami"]
if len(whoamiVals) == 0 {
return
}
t.Unregister(whoamiVals[0])
}
}
// UpdateServices updates the services list for the given agent.
func (t *ConnTracker) UpdateServices(id string, services []Service) bool {
t.mu.Lock()
defer t.mu.Unlock()
agent, ok := t.agents[id]
if !ok {
return false
}
agent.Services = services
return true
}
// Service represents a named service with its current status.
type Service struct {
Name, Status string
}
// Agent represents a connected agent streaming logs to the collector.
type Agent struct {
ID string
Label string
Services []Service
ConnectedAt time.Time
}
+119 -64
View File
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"log"
"sync"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/models"
@@ -11,27 +12,30 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
)
// Commander handles command execution on connected agents.
type Commander struct {
proto.UnimplementedCommanderServer
agents map[string]Agent
mu sync.RWMutex
tracker *ConnTracker
jobber Jobber
}
// Jobber persists job state.
type Jobber interface {
InitJob(ctx context.Context, agentID string, job models.JobForInsert) (int64, error)
UpdateJobInDB(ctx context.Context, jid int64, msg models.JobForUpdate) (models.Job, error)
}
func New(jobber Jobber) *Commander {
func New(jobber Jobber, tracker *ConnTracker) *Commander {
return &Commander{
agents: make(map[string]Agent),
jobber: jobber,
tracker: tracker,
}
}
// Agent represents a connected agent with an active bidirectional stream.
type Agent struct {
bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command]
in chan *proto.Command
@@ -40,10 +44,11 @@ type Agent struct {
ctx context.Context
aid string
Token string // agent id
Token string
Label string
Services []string
}
type JobOut struct {
fc models.Job
err error
@@ -53,61 +58,91 @@ type Job struct {
out chan JobOut
}
func (self *Commander) GetAgent(aid string) (agent Agent, ok bool) {
self.mu.RLock()
defer self.mu.RUnlock()
agent, ok = self.agents[aid]
return
// ConnTracker tracks connected agents and handles cleanup on disconnect.
// It implements grpc.StatsHandler for disconnect detection.
type ConnTracker struct {
mu sync.RWMutex
agents map[string]*Agent
}
// GetAgentByLabel searches for an agent by its human-readable label.
func (self *Commander) GetAgentByLabel(label string) (agent Agent, ok bool) {
func (self *ConnTracker) GetAgentByLabel(label string) (agent Agent, ok bool) {
self.mu.RLock()
defer self.mu.RUnlock()
for _, a := range self.agents {
if a.Label == label {
return a, true
return *a, true
}
}
return
}
func (self *Commander) Agents() []Agent {
self.mu.RLock()
defer self.mu.RUnlock()
result := make([]Agent, 0, len(self.agents))
for _, a := range self.agents {
func NewConnTracker() *ConnTracker {
return &ConnTracker{
agents: make(map[string]*Agent),
}
}
func (t *ConnTracker) Register(aid string, agent *Agent) {
t.mu.Lock()
t.agents[aid] = agent
t.mu.Unlock()
log.Printf("[conntracker] agent registered: %s", aid)
}
func (t *ConnTracker) Unregister(aid string) {
t.mu.Lock()
delete(t.agents, aid)
t.mu.Unlock()
log.Printf("[conntracker] agent unregistered: %s", aid)
}
func (t *ConnTracker) GetAgent(aid string) (*Agent, bool) {
t.mu.RLock()
defer t.mu.RUnlock()
a, ok := t.agents[aid]
return a, ok
}
func (t *ConnTracker) Agents() []*Agent {
t.mu.RLock()
defer t.mu.RUnlock()
result := make([]*Agent, 0, len(t.agents))
for _, a := range t.agents {
result = append(result, a)
}
return result
}
func (self *Commander) removeAgent(aid string) {
self.mu.Lock()
defer self.mu.Unlock()
delete(self.agents, aid)
// grpc.StatsHandler implementation.
func (t *ConnTracker) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
return ctx
}
func (self *Agent) AddJob(job models.JobForInsert) (int64, error) {
jid, err := self.jobber.InitJob(self.ctx, self.aid, job)
if err != nil {
return 0, err
}
self.jobs[jid] = newJob()
self.in <- &proto.Command{
Id: jid,
Command: job.Command,
Stdin: job.Stdin,
}
return jid, err
func (t *ConnTracker) HandleRPC(ctx context.Context, _ stats.RPCStats) {}
func (t *ConnTracker) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}
func (self *Agent) WaitJob(jid int64) (*models.Job, error) {
result := <-self.jobs[jid].out
return &result.fc, result.err
func (t *ConnTracker) HandleConn(ctx context.Context, s stats.ConnStats) {
switch s.(type) {
case *stats.ConnEnd:
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return
}
aidVals := md["agentid"]
if len(aidVals) == 0 {
return
}
t.Unregister(aidVals[0])
}
}
func (self *Commander) Stream(
// Stream handles a new agent connection and runs the send/recv loops.
func (c *Commander) Stream(
bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command],
) error {
md, ok := metadata.FromIncomingContext(bidi.Context())
@@ -121,35 +156,58 @@ func (self *Commander) Stream(
aid := aidVals[0]
var label string
labelVals := md["label"]
if len(labelVals) > 0 {
label = labelVals[0]
if vals := md["label"]; len(vals) > 0 {
label = vals[0]
}
agent := newAgent(bidi, self.jobber, aid, label)
self.mu.Lock()
self.agents[aid] = agent
self.mu.Unlock()
agent := NewAgent(bidi.Context(), c.jobber, aid, label)
agent.bidi = bidi
c.tracker.Register(aid, agent)
defer c.tracker.Unregister(aid)
defer self.removeAgent(aid)
return agent.run()
}
func (self *Agent) run() error {
// GetAgent returns the agent by ID. Delegates to the tracker.
func (c *Commander) GetAgent(aid string) (*Agent, bool) {
return c.tracker.GetAgent(aid)
}
func (a *Agent) AddJob(job models.JobForInsert) (int64, error) {
jid, err := a.jobber.InitJob(a.ctx, a.aid, job)
if err != nil {
return 0, err
}
a.jobs[jid] = newJob()
a.in <- &proto.Command{
Id: jid,
Command: job.Command,
Stdin: job.Stdin,
}
return jid, nil
}
func (a *Agent) WaitJob(jid int64) (*models.Job, error) {
result := <-a.jobs[jid].out
return &result.fc, result.err
}
func (a *Agent) run() error {
wg := new(errgroup.Group)
wg.Go(self.recv)
wg.Go(self.send)
wg.Go(a.recv)
wg.Go(a.send)
return wg.Wait()
}
func (self *Agent) recv() error {
func (a *Agent) recv() error {
for {
job, err := func() (job models.Job, err error) {
msg, err := self.bidi.Recv()
msg, err := a.bidi.Recv()
if err != nil {
return
}
return self.jobber.UpdateJobInDB(self.ctx, msg.Id, models.JobForUpdate{
return a.jobber.UpdateJobInDB(a.ctx, msg.Id, models.JobForUpdate{
Stdout: msg.Stdout,
Stderr: msg.Stderr,
Status: msg.Status,
@@ -158,8 +216,7 @@ func (self *Agent) recv() error {
if err == io.EOF {
return nil
}
// TODO: that would blow up at some point
out := self.jobs[job.ID].out
out := a.jobs[job.ID].out
out <- JobOut{
fc: job,
err: err,
@@ -168,28 +225,26 @@ func (self *Agent) recv() error {
}
}
func (self *Agent) send() error {
for job := range self.in {
if err := self.bidi.Send(job); err != nil {
func (a *Agent) send() error {
for job := range a.in {
if err := a.bidi.Send(job); err != nil {
return err
}
}
return io.EOF
// self.jobs[]
}
func newAgent(
bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command],
func NewAgent(
ctx context.Context,
jobber Jobber,
aid string,
label string,
) Agent {
return Agent{
bidi: bidi,
in: make(chan *proto.Command),
) *Agent {
return &Agent{
in: make(chan *proto.Command, 10),
jobs: make(map[int64]Job),
jobber: jobber,
ctx: bidi.Context(),
ctx: ctx,
aid: aid,
Label: label,
Token: aid,
+6 -1
View File
@@ -1,6 +1,7 @@
package handlers
import (
"fmt"
"net/http"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/grpcsrv/collector"
@@ -36,10 +37,14 @@ func (ag *AgentsGroup) List(c *gin.Context) {
agents := make([]AgentInfo, 0)
for _, agent := range ag.collector.Agents() {
services := make([]string, 0, len(agent.Services))
for _, s := range agent.Services {
services = append(services, fmt.Sprintf("%s:%s", s.Name, s.Status))
}
agents = append(agents, AgentInfo{
Token: agent.ID,
Label: agent.Label,
Services: agent.Services,
Services: services,
ConnectedAt: agent.ConnectedAt.Format("2006-01-02 15:04:05"),
})
}
+179 -34
View File
@@ -5,29 +5,44 @@ import (
"fmt"
"net/http"
"os/exec"
"strconv"
"time"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/grpcsrv/commander"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/models"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/repository"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/service"
"github.com/gin-gonic/gin"
)
type JobsHandlers struct {
cmder *commander.Commander
tracker *commander.ConnTracker
svc *service.ScriptService
whereami string
jobRepo *repository.JobRepository
}
func NewJobsHandlers(cmder *commander.Commander, svc *service.ScriptService) JobsHandlers {
return JobsHandlers{cmder: cmder, svc: svc}
func NewJobsHandlers(tracker *commander.ConnTracker, svc *service.ScriptService, whereami string, jobRepo *repository.JobRepository) JobsHandlers {
return JobsHandlers{tracker: tracker, svc: svc, whereami: whereami, jobRepo: jobRepo}
}
// AddJobIn is the request body for creating a job.
type AddJobIn struct {
Command string `json:"command" binding:"required"`
InterpreterID int64 `json:"interpreter_id"`
Stdin *string `json:"stdin"`
AgentID string `json:"agent_id" binding:"required"`
}
// AddJobOut is the response body for a submitted job.
type AddJobOut struct {
ID int64 `json:"id"`
Command []string `json:"command"`
WaitURL string `json:"wait_url"`
}
// JobResult is the response body for a completed job.
type JobResult struct {
ID int64 `json:"id"`
Command []string `json:"command"`
Stdin *string `json:"stdin"`
@@ -36,41 +51,38 @@ type AddJobOut struct {
Status int32 `json:"status"`
}
// AddJob creates and executes a job on a target agent.
// @Summary Create and run a job on an agent
// @Description Sends a command to the specified agent, waits for execution, and returns the result
// WaitJobIn is the request body for waiting on a job.
type WaitJobIn struct {
AgentID string `json:"agent_id" binding:"required"`
}
// AddJob submits a job to an agent and returns a wait_url for the result.
// @Summary Submit a job to an agent
// @Description Sends a command to the specified agent and returns a URL to wait for the result
// @Tags jobs
// @Accept json
// @Produce json
// @Param body body AddJobIn true "Job request"
// @Success 201 {object} AddJobOut
// @Security Bearer
// @Router /jobs [post]
func (self *JobsHandlers) AddJob(c *gin.Context) {
err := func() error {
func (h *JobsHandlers) AddJob(c *gin.Context) {
var in AddJobIn
if err := c.Bind(&in); err != nil {
return err
}
agent, ok := self.cmder.GetAgent(in.AgentID)
if !ok {
c.Status(http.StatusNotFound)
return fmt.Errorf("agent not found")
c.Error(err)
return
}
var command []string
if in.InterpreterID == 0 {
command = []string{"sh", "-c", in.Command}
} else {
var err error
command, err = self.svc.ResolveCommand(
c.Request.Context(),
in.InterpreterID,
in.Command,
)
if err != nil {
return err
agent, ok := h.tracker.GetAgent(in.AgentID)
if !ok {
c.Status(http.StatusNotFound)
c.Error(fmt.Errorf("agent not found"))
return
}
command, err := resolveCommand(c, h.svc, in.InterpreterID, in.Command)
if err != nil {
c.Error(err)
return
}
jid, err := agent.AddJob(models.JobForInsert{
@@ -78,13 +90,59 @@ func (self *JobsHandlers) AddJob(c *gin.Context) {
Stdin: in.Stdin,
})
if err != nil {
return err
}
job, err := agent.WaitJob(jid)
if err != nil && !errors.As(err, &exec.ExitError{}) {
return err
c.Error(err)
return
}
waitURL := fmt.Sprintf("%s/api/v1/jobs/%d/wait", h.whereami, jid)
c.JSON(http.StatusCreated, AddJobOut{
ID: jid,
Command: command,
WaitURL: waitURL,
})
}
// WaitJob waits for a submitted job to complete (long-poll).
// If the job is already done, returns immediately.
// @Summary Wait for job result
// @Description Long-polls for a job result. Returns immediately if the job is already finished.
// @Tags jobs
// @Accept json
// @Produce json
// @Param id path int true "Job ID"
// @Param body body WaitJobIn true "Agent reference"
// @Success 200 {object} JobResult
// @Failure 400 {object} map[string]string
// @Failure 404 {object} map[string]string
// @Router /jobs/{id}/wait [post]
func (h *JobsHandlers) WaitJob(c *gin.Context) {
jid, err := strconv.ParseInt(c.Param("id"), 10, 64)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid job id"})
return
}
var in WaitJobIn
if err := c.Bind(&in); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
return
}
agent, ok := h.tracker.GetAgent(in.AgentID)
if !ok {
c.Status(http.StatusNotFound)
c.Error(fmt.Errorf("agent not found"))
return
}
job, err := agent.WaitJob(jid)
if err != nil {
c.Error(err)
return
}
c.JSON(http.StatusOK, JobResult{
ID: job.ID,
Command: job.Command,
Stdin: job.Stdin,
@@ -92,9 +150,96 @@ func (self *JobsHandlers) AddJob(c *gin.Context) {
Stderr: job.Stderr,
Status: job.Status,
})
return nil
}()
}
func resolveCommand(c *gin.Context, svc *service.ScriptService, interpID int64, cmd string) ([]string, error) {
if interpID == 0 {
return []string{"sh", "-c", cmd}, nil
}
command, err := svc.ResolveCommand(c.Request.Context(), interpID, cmd)
if err != nil {
return nil, err
}
return command, nil
}
// @Summary Check command path
// @Description Validates that a command binary exists on the system
// @Tags jobs
// @Accept json
// @Param body body CheckCmdIn true "Command to check"
// @Success 200 {object} CheckCmdOut
// @Failure 404 {object} map[string]string
// @Router /jobs/check_cmd [post]
func (h *JobsHandlers) CheckCmd(c *gin.Context) {
var in struct {
Command string `json:"command" binding:"required"`
}
if err := c.Bind(&in); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request"})
return
}
if _, err := exec.LookPath(in.Command); err != nil {
if errors.Is(err, exec.ErrNotFound) {
c.JSON(http.StatusNotFound, gin.H{"error": "command not found"})
return
}
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, CheckCmdOut{Exists: true})
}
type CheckCmdIn struct {
Command string `json:"command" binding:"required" example:"bash"`
}
type CheckCmdOut struct {
Exists bool `json:"exists"`
}
// JobMetricsOut is the response body for the job metrics endpoint.
type JobMetricsOut struct {
Total int `json:"total"`
Success int `json:"success"`
Failed int `json:"failed"`
Pending int `json:"pending"`
Period string `json:"period"`
}
// GetJobMetrics returns job success metrics over a parameterized period.
// @Summary Get job metrics
// @Description Returns total, successful, failed, and pending job counts over the given period
// @Tags jobs
// @Produce json
// @Param period query string false "Time period (e.g. 1h, 24h, 7d)" default(24h)
// @Success 200 {object} JobMetricsOut
// @Failure 400 {object} map[string]string
// @Security Bearer
// @Router /jobs/metrics [get]
func (h *JobsHandlers) GetJobMetrics(c *gin.Context) {
periodStr := c.DefaultQuery("period", "24h")
period, err := time.ParseDuration(periodStr)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid period, use Go duration format (e.g. 1h, 24h, 7d)"})
return
}
since := time.Now().Add(-period)
metrics, err := h.jobRepo.GetJobMetrics(c.Request.Context(), since)
if err != nil {
c.Error(err)
return
}
c.JSON(http.StatusOK, JobMetricsOut{
Total: metrics.Total,
Success: metrics.Success,
Failed: metrics.Failed,
Pending: metrics.Pending,
Period: periodStr,
})
}
+16 -16
View File
@@ -14,11 +14,11 @@ import (
type ScriptHandlers struct {
svc *service.ScriptService
cmder *commander.Commander
tracker *commander.ConnTracker
}
func NewScriptHandlers(svc *service.ScriptService, cmder *commander.Commander) ScriptHandlers {
return ScriptHandlers{svc: svc, cmder: cmder}
func NewScriptHandlers(svc *service.ScriptService, tracker *commander.ConnTracker) ScriptHandlers {
return ScriptHandlers{svc: svc, tracker: tracker}
}
type RunScriptIn struct {
@@ -47,14 +47,14 @@ type RunScriptOut struct {
// @Success 201 {object} RunScriptOut
// @Security Bearer
// @Router /scripts/run [post]
func (self *ScriptHandlers) RunScript(c *gin.Context) {
func (h *ScriptHandlers) RunScript(c *gin.Context) {
err := func() error {
var in RunScriptIn
if err := c.Bind(&in); err != nil {
return err
}
command, err := self.svc.ResolveCommand(
command, err := h.svc.ResolveCommand(
c.Request.Context(),
in.InterpreterID,
in.ScriptText,
@@ -63,7 +63,7 @@ func (self *ScriptHandlers) RunScript(c *gin.Context) {
return err
}
agent, ok := self.cmder.GetAgent(in.AgentID)
agent, ok := h.tracker.GetAgent(in.AgentID)
if !ok {
c.Status(http.StatusNotFound)
return fmt.Errorf("agent not found")
@@ -105,8 +105,8 @@ func (self *ScriptHandlers) RunScript(c *gin.Context) {
// @Success 200 {array} repository.ScriptInterpreter
// @Security Bearer
// @Router /scripts/interpreters [get]
func (self *ScriptHandlers) ListInterpreters(c *gin.Context) {
interpreters, err := self.svc.List(c.Request.Context())
func (h *ScriptHandlers) ListInterpreters(c *gin.Context) {
interpreters, err := h.svc.List(c.Request.Context())
if err != nil {
c.Error(err)
return
@@ -124,14 +124,14 @@ func (self *ScriptHandlers) ListInterpreters(c *gin.Context) {
// @Success 201 {object} repository.ScriptInterpreter
// @Security Bearer
// @Router /scripts/interpreters [post]
func (self *ScriptHandlers) CreateInterpreter(c *gin.Context) {
func (h *ScriptHandlers) CreateInterpreter(c *gin.Context) {
var in repository.ScriptInterpreterCreate
if err := c.BindJSON(&in); err != nil {
c.Error(err)
return
}
si, err := self.svc.Create(c.Request.Context(), in)
si, err := h.svc.Create(c.Request.Context(), in)
if err != nil {
c.Error(err)
return
@@ -148,14 +148,14 @@ func (self *ScriptHandlers) CreateInterpreter(c *gin.Context) {
// @Success 200 {object} repository.ScriptInterpreter
// @Security Bearer
// @Router /scripts/interpreters/:id [get]
func (self *ScriptHandlers) GetInterpreter(c *gin.Context) {
func (h *ScriptHandlers) GetInterpreter(c *gin.Context) {
id, err := strconv.ParseInt(c.Param("id"), 10, 64)
if err != nil {
c.Error(err)
return
}
si, err := self.svc.GetByID(c.Request.Context(), id)
si, err := h.svc.GetByID(c.Request.Context(), id)
if err != nil {
c.Error(err)
return
@@ -174,7 +174,7 @@ func (self *ScriptHandlers) GetInterpreter(c *gin.Context) {
// @Success 200 {object} repository.ScriptInterpreter
// @Security Bearer
// @Router /scripts/interpreters/:id [put]
func (self *ScriptHandlers) UpdateInterpreter(c *gin.Context) {
func (h *ScriptHandlers) UpdateInterpreter(c *gin.Context) {
id, err := strconv.ParseInt(c.Param("id"), 10, 64)
if err != nil {
c.Error(err)
@@ -187,7 +187,7 @@ func (self *ScriptHandlers) UpdateInterpreter(c *gin.Context) {
return
}
si, err := self.svc.Update(c.Request.Context(), id, in)
si, err := h.svc.Update(c.Request.Context(), id, in)
if err != nil {
c.Error(err)
return
@@ -203,14 +203,14 @@ func (self *ScriptHandlers) UpdateInterpreter(c *gin.Context) {
// @Success 204
// @Security Bearer
// @Router /scripts/interpreters/:id [delete]
func (self *ScriptHandlers) DeleteInterpreter(c *gin.Context) {
func (h *ScriptHandlers) DeleteInterpreter(c *gin.Context) {
id, err := strconv.ParseInt(c.Param("id"), 10, 64)
if err != nil {
c.Error(err)
return
}
if err := self.svc.Delete(c.Request.Context(), id); err != nil {
if err := h.svc.Delete(c.Request.Context(), id); err != nil {
c.Error(err)
return
}
@@ -5,6 +5,7 @@ import (
"database/sql"
"encoding/json"
"fmt"
"time"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/models"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/storage"
@@ -103,3 +104,29 @@ func (r *JobRepository) GetJobByID(ctx context.Context, jid int64) (models.Job,
job.Stdin = stdinVal
return job, nil
}
type JobMetrics struct {
Total int
Success int
Failed int
Pending int
}
// GetJobMetrics returns job success metrics for jobs updated since the given time.
// A successful job has status == 0, failed has status != 0, pending has status == 0 with empty stdout/stderr.
func (r *JobRepository) GetJobMetrics(ctx context.Context, since time.Time) (JobMetrics, error) {
var m JobMetrics
err := r.DB.QueryRowContext(ctx,
`SELECT
COUNT(*),
SUM(CASE WHEN status = 0 AND (stdout != '' OR stderr != '') THEN 1 ELSE 0 END),
SUM(CASE WHEN status != 0 THEN 1 ELSE 0 END),
SUM(CASE WHEN status = 0 AND stdout = '' AND stderr = '' THEN 1 ELSE 0 END)
FROM jobs WHERE updated_at >= ?`,
since,
).Scan(&m.Total, &m.Success, &m.Failed, &m.Pending)
if err != nil {
return JobMetrics{}, err
}
return m, nil
}