From 7be99f8e9177bebd5e8326f97ce9b9e84656ad26 Mon Sep 17 00:00:00 2001 From: "zero@thinky" Date: Sat, 4 Apr 2026 22:55:39 +0300 Subject: [PATCH] feat: big ahh commit - agent+proto+backend: transfer service status - agent: fix returning empty message on nonzero exit status - backend: refactor collector+commander and handlers dependent on them: implement agent accounting via grpc stats handler --- agent/go.mod | 2 +- agent/internal/commander/impl.go | 10 +- agent/main.go | 57 +++++ backend/cmd/main.go | 21 +- backend/go.mod | 2 +- .../internal/grpcsrv/collector/collector.go | 53 +---- .../internal/grpcsrv/collector/services.go | 38 ++++ backend/internal/grpcsrv/collector/tracker.go | 111 ++++++++++ .../internal/grpcsrv/commander/commander.go | 189 ++++++++++------ backend/internal/handlers/agents.go | 7 +- backend/internal/handlers/jobs.go | 207 +++++++++++++----- backend/internal/handlers/scripts.go | 34 +-- 12 files changed, 541 insertions(+), 190 deletions(-) create mode 100644 backend/internal/grpcsrv/collector/services.go create mode 100644 backend/internal/grpcsrv/collector/tracker.go diff --git a/agent/go.mod b/agent/go.mod index 64b0c24..2f2c0a2 100644 --- a/agent/go.mod +++ b/agent/go.mod @@ -3,7 +3,7 @@ module gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent go 1.26.1 require ( - gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto v0.0.0-20260403214837-94be9799f47d + gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto v0.0.0-20260404174628-3389df740c20 github.com/hpcloud/tail v1.0.0 github.com/samber/lo v1.53.0 golang.org/x/sync v0.20.0 diff --git a/agent/internal/commander/impl.go b/agent/internal/commander/impl.go index 1a61e3a..a81e336 100644 --- a/agent/internal/commander/impl.go +++ b/agent/internal/commander/impl.go @@ -47,8 +47,14 @@ func (*CommandExecutor) Execute(command *proto.Command) (fc *proto.FinishedComma _, err := io.Copy(stderrbuf, stderr) return err }) - if err := cmd.Wait(); err != nil { - return nil, err + if waitErr := cmd.Wait(); waitErr != nil { + var exitErr *exec.ExitError + if !errors.As(waitErr, &exitErr) { + return nil, waitErr + } + fc.Status = int32(exitErr.ExitCode()) + } else { + fc.Status = int32(cmd.ProcessState.ExitCode()) } if err := eg.Wait(); err != nil { return nil, err diff --git a/agent/main.go b/agent/main.go index 18f72ff..d91baf6 100644 --- a/agent/main.go +++ b/agent/main.go @@ -22,6 +22,7 @@ import ( "github.com/samber/lo" "golang.org/x/sync/errgroup" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" ) @@ -110,6 +111,13 @@ func main() { return ccli.HandleCommands(ctx, grpcAddr, creds) }) + // Start services update stream + if len(cfg.Services) > 0 { + wg.Go(func() error { + return reportServices(ctx, grpcAddr, creds, cfg.Label, cfg.Services, lgr) + }) + } + // Start log collectors if len(cfg.Services) > 0 { wg.Go(func() error { @@ -301,3 +309,52 @@ func reconnectStream( return fmt.Errorf("failed to reconnect after 5 attempts for service %s", service) } + +// reportServices periodically sends service status updates to the backend via gRPC. +// For now, all configured services are reported as "up" every 5 seconds. +func reportServices( + ctx context.Context, + grpcAddr string, + creds credentials.TransportCredentials, + label string, + services []config.ServiceConfig, + lgr *logger.Logger, +) error { + conn, err := grpc.NewClient(grpcAddr, grpc.WithTransportCredentials(creds)) + if err != nil { + return fmt.Errorf("failed to connect for services report: %w", err) + } + defer conn.Close() + + ccli := proto.NewCollectorClient(conn) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + // Send immediately on start, then every 5 seconds + for { + svcUpdates := make([]*proto.ServicesUpdate_ServiceUpdate, 0, len(services)) + for _, svc := range services { + svcUpdates = append(svcUpdates, &proto.ServicesUpdate_ServiceUpdate{ + Name: svc.Name, + Status: "up", + }) + } + + md := metadata.New(map[string]string{"whoami": label}) + _, err := ccli.ReportServices( + metadata.NewOutgoingContext(ctx, md), + &proto.ServicesUpdate{Services: svcUpdates}, + ) + if err != nil { + lgr.Warn("Failed to report services", "err", err) + } else { + lgr.Debug("Services reported successfully", "count", len(services)) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } +} diff --git a/backend/cmd/main.go b/backend/cmd/main.go index 45d04fc..3d08249 100644 --- a/backend/cmd/main.go +++ b/backend/cmd/main.go @@ -82,10 +82,13 @@ func main() { }() } - // Initialize Collector gRPC service - coll := collector.New(logRepo) + // Initialize Collector (log streaming) with its own ConnTracker + collTracker := collector.NewConnTracker() + coll := collector.New(logRepo, collTracker) - cmdr := commander.New(jobRepo) + // Initialize ConnTracker for Commander agent lifecycle + cmdTracker := commander.NewConnTracker() + cmdr := commander.New(jobRepo, cmdTracker) // Initialize script interpreter repository and service scriptRepo := repository.NewScriptInterpreterRepo(db) @@ -93,8 +96,9 @@ func main() { log.Printf("Warning: failed to initialize script interpreters table: %v", err) } scriptSvc := service.NewScriptServiceWithInterpreters(h.Repo, scriptRepo) - scriptHandlers := handlers.NewScriptHandlers(scriptSvc, cmdr) - jobsHandlers := handlers.NewJobsHandlers(cmdr, scriptSvc) + scriptHandlers := handlers.NewScriptHandlers(scriptSvc, cmdTracker) + jobsHandlers := handlers.NewJobsHandlers(cmdTracker, scriptSvc, + os.Getenv("WHEREAMI") /* our address for redirects */) // Initialize script management service and handlers scriptManageSvc := service.NewScriptService(h.Repo) @@ -201,6 +205,7 @@ func main() { jobsGroup.Use(auth.AuthMiddleware(), handlers.RequireAdmin()) { jobsGroup.POST("", jobsHandlers.AddJob) + jobsGroup.POST("/:id/wait", jobsHandlers.WaitJob) } // Agent registration @@ -290,7 +295,11 @@ func main() { MinVersion: tls.VersionTLS12, } - grpcServer := grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsConfig))) + grpcServer := grpc.NewServer( + grpc.Creds(credentials.NewTLS(tlsConfig)), + grpc.StatsHandler(collTracker), + grpc.StatsHandler(cmdTracker), + ) proto.RegisterCommanderServer(grpcServer, cmdr) proto.RegisterCollectorServer(grpcServer, coll) diff --git a/backend/go.mod b/backend/go.mod index 4605051..1c2c254 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -3,7 +3,7 @@ module gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend go 1.26.1 require ( - gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto v0.0.0-20260404165608-1d75935a08a5 + gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto v0.0.0-20260404174628-3389df740c20 github.com/ClickHouse/clickhouse-go/v2 v2.44.0 github.com/gin-gonic/gin v1.12.0 github.com/samber/lo v1.53.0 diff --git a/backend/internal/grpcsrv/collector/collector.go b/backend/internal/grpcsrv/collector/collector.go index ab711ea..ab738af 100644 --- a/backend/internal/grpcsrv/collector/collector.go +++ b/backend/internal/grpcsrv/collector/collector.go @@ -4,7 +4,6 @@ import ( "fmt" "io" "log" - "sync" "time" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/repository" @@ -13,26 +12,19 @@ import ( "google.golang.org/grpc/metadata" ) +// Collector handles log streaming from connected agents. type Collector struct { proto.UnimplementedCollectorServer logRepo *repository.LogRepository - agents map[string]*Agent - mu sync.RWMutex + tracker *ConnTracker batchSize int flushInterval time.Duration } -type Agent struct { - ID string - Label string - Services []string - ConnectedAt time.Time -} - -func New(logRepo *repository.LogRepository) *Collector { +func New(logRepo *repository.LogRepository, tracker *ConnTracker) *Collector { return &Collector{ logRepo: logRepo, - agents: make(map[string]*Agent), + tracker: tracker, batchSize: 100, flushInterval: 2 * time.Second, } @@ -56,27 +48,15 @@ func (c *Collector) Stream(stream proto.Collector_StreamServer) error { } service := serviceVals[0] - servicesVals := md["services"] - var services []string - if len(servicesVals) > 0 { - services = servicesVals - } - - // Register agent - c.mu.Lock() - c.agents[agentName] = &Agent{ + agent := &Agent{ ID: agentName, Label: agentName, - Services: services, + Services: make([]Service, 0), ConnectedAt: time.Now(), } - c.mu.Unlock() - defer func() { - c.mu.Lock() - delete(c.agents, agentName) - c.mu.Unlock() - }() + c.tracker.Register(agent) + defer c.tracker.Unregister(agent.ID) log.Printf("Agent %s connected, streaming logs for service: %s", agentName, service) @@ -139,7 +119,6 @@ func (c *Collector) Stream(stream proto.Collector_StreamServer) error { for { select { case <-stream.Context().Done(): - // Context cancelled, flush remaining _ = flush() return stream.Context().Err() case <-ticker.C: @@ -162,7 +141,6 @@ func (c *Collector) Stream(stream proto.Collector_StreamServer) error { } case err := <-errCh: if err == io.EOF { - // Client closed stream return flush() } return fmt.Errorf("failed to receive: %w", err) @@ -170,19 +148,12 @@ func (c *Collector) Stream(stream proto.Collector_StreamServer) error { } } +// GetAgent delegates to the tracker. func (c *Collector) GetAgent(name string) (*Agent, bool) { - c.mu.RLock() - defer c.mu.RUnlock() - a, ok := c.agents[name] - return a, ok + return c.tracker.GetAgent(name) } +// Agents delegates to the tracker. func (c *Collector) Agents() []*Agent { - c.mu.RLock() - defer c.mu.RUnlock() - result := make([]*Agent, 0, len(c.agents)) - for _, a := range c.agents { - result = append(result, a) - } - return result + return c.tracker.Agents() } diff --git a/backend/internal/grpcsrv/collector/services.go b/backend/internal/grpcsrv/collector/services.go new file mode 100644 index 0000000..ca3c0a9 --- /dev/null +++ b/backend/internal/grpcsrv/collector/services.go @@ -0,0 +1,38 @@ +package collector + +import ( + "context" + "fmt" + "log" + + "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto/proto" + "google.golang.org/grpc/metadata" +) + +// ReportServices handles a unary service status update from an agent. +// Agents send their current services list, which is stored in the collector. +func (c *Collector) ReportServices(ctx context.Context, req *proto.ServicesUpdate) (*proto.ServicesUpdateResp, error) { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return nil, fmt.Errorf("no metadata in context") + } + + whoamiVals := md["whoami"] + if len(whoamiVals) == 0 { + return nil, fmt.Errorf("whoami metadata missing") + } + agentName := whoamiVals[0] + + services := make([]Service, 0, len(req.Services)) + for _, s := range req.Services { + services = append(services, Service{s.Name, s.Status}) + } + + if ok := c.tracker.UpdateServices(agentName, services); ok { + log.Printf("Updated services for agent %s: %v", agentName, services) + } else { + log.Printf("Warning: received services update for unknown agent %s", agentName) + } + + return &proto.ServicesUpdateResp{}, nil +} diff --git a/backend/internal/grpcsrv/collector/tracker.go b/backend/internal/grpcsrv/collector/tracker.go new file mode 100644 index 0000000..7998c6b --- /dev/null +++ b/backend/internal/grpcsrv/collector/tracker.go @@ -0,0 +1,111 @@ +package collector + +import ( + "context" + "log" + "sync" + "time" + + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/stats" +) + +// ConnTracker tracks connected Collector agents and handles cleanup on disconnect. +// It implements grpc.StatsHandler for disconnect detection. +type ConnTracker struct { + mu sync.RWMutex + agents map[string]*Agent +} + +func NewConnTracker() *ConnTracker { + return &ConnTracker{ + agents: make(map[string]*Agent), + } +} + +// Register adds an agent to the tracker. Called by Collector.Stream(). +func (t *ConnTracker) Register(agent *Agent) { + t.mu.Lock() + t.agents[agent.ID] = agent + t.mu.Unlock() + log.Printf("[collector] agent registered: %s", agent.ID) +} + +// Unregister removes an agent from the tracker. +func (t *ConnTracker) Unregister(id string) { + t.mu.Lock() + delete(t.agents, id) + t.mu.Unlock() + log.Printf("[collector] agent unregistered: %s", id) +} + +// GetAgent returns the agent for the given ID. +func (t *ConnTracker) GetAgent(id string) (*Agent, bool) { + t.mu.RLock() + defer t.mu.RUnlock() + a, ok := t.agents[id] + return a, ok +} + +// Agents returns all connected agents. +func (t *ConnTracker) Agents() []*Agent { + t.mu.RLock() + defer t.mu.RUnlock() + result := make([]*Agent, 0, len(t.agents)) + for _, a := range t.agents { + result = append(result, a) + } + return result +} + +// grpc.StatsHandler implementation. + +func (t *ConnTracker) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context { + return ctx +} + +func (t *ConnTracker) HandleRPC(ctx context.Context, _ stats.RPCStats) {} + +func (t *ConnTracker) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { + return ctx +} + +func (t *ConnTracker) HandleConn(ctx context.Context, s stats.ConnStats) { + switch s.(type) { + case *stats.ConnEnd: + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return + } + whoamiVals := md["whoami"] + if len(whoamiVals) == 0 { + return + } + t.Unregister(whoamiVals[0]) + } +} + +// UpdateServices updates the services list for the given agent. +func (t *ConnTracker) UpdateServices(id string, services []Service) bool { + t.mu.Lock() + defer t.mu.Unlock() + agent, ok := t.agents[id] + if !ok { + return false + } + agent.Services = services + return true +} + +// Service represents a named service with its current status. +type Service struct { + Name, Status string +} + +// Agent represents a connected agent streaming logs to the collector. +type Agent struct { + ID string + Label string + Services []Service + ConnectedAt time.Time +} diff --git a/backend/internal/grpcsrv/commander/commander.go b/backend/internal/grpcsrv/commander/commander.go index 4e08924..4e09455 100644 --- a/backend/internal/grpcsrv/commander/commander.go +++ b/backend/internal/grpcsrv/commander/commander.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "log" "sync" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/models" @@ -11,27 +12,30 @@ import ( "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/stats" ) +// Commander handles command execution on connected agents. type Commander struct { proto.UnimplementedCommanderServer - agents map[string]Agent - mu sync.RWMutex - jobber Jobber + tracker *ConnTracker + jobber Jobber } +// Jobber persists job state. type Jobber interface { InitJob(ctx context.Context, agentID string, job models.JobForInsert) (int64, error) UpdateJobInDB(ctx context.Context, jid int64, msg models.JobForUpdate) (models.Job, error) } -func New(jobber Jobber) *Commander { +func New(jobber Jobber, tracker *ConnTracker) *Commander { return &Commander{ - agents: make(map[string]Agent), - jobber: jobber, + jobber: jobber, + tracker: tracker, } } +// Agent represents a connected agent with an active bidirectional stream. type Agent struct { bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command] in chan *proto.Command @@ -40,10 +44,11 @@ type Agent struct { ctx context.Context aid string - Token string // agent id + Token string Label string Services []string } + type JobOut struct { fc models.Job err error @@ -53,61 +58,91 @@ type Job struct { out chan JobOut } -func (self *Commander) GetAgent(aid string) (agent Agent, ok bool) { - self.mu.RLock() - defer self.mu.RUnlock() - agent, ok = self.agents[aid] - return +// ConnTracker tracks connected agents and handles cleanup on disconnect. +// It implements grpc.StatsHandler for disconnect detection. +type ConnTracker struct { + mu sync.RWMutex + agents map[string]*Agent } // GetAgentByLabel searches for an agent by its human-readable label. -func (self *Commander) GetAgentByLabel(label string) (agent Agent, ok bool) { +func (self *ConnTracker) GetAgentByLabel(label string) (agent Agent, ok bool) { self.mu.RLock() defer self.mu.RUnlock() for _, a := range self.agents { if a.Label == label { - return a, true + return *a, true } } return } -func (self *Commander) Agents() []Agent { - self.mu.RLock() - defer self.mu.RUnlock() - result := make([]Agent, 0, len(self.agents)) - for _, a := range self.agents { +func NewConnTracker() *ConnTracker { + return &ConnTracker{ + agents: make(map[string]*Agent), + } +} + +func (t *ConnTracker) Register(aid string, agent *Agent) { + t.mu.Lock() + t.agents[aid] = agent + t.mu.Unlock() + log.Printf("[conntracker] agent registered: %s", aid) +} + +func (t *ConnTracker) Unregister(aid string) { + t.mu.Lock() + delete(t.agents, aid) + t.mu.Unlock() + log.Printf("[conntracker] agent unregistered: %s", aid) +} + +func (t *ConnTracker) GetAgent(aid string) (*Agent, bool) { + t.mu.RLock() + defer t.mu.RUnlock() + a, ok := t.agents[aid] + return a, ok +} + +func (t *ConnTracker) Agents() []*Agent { + t.mu.RLock() + defer t.mu.RUnlock() + result := make([]*Agent, 0, len(t.agents)) + for _, a := range t.agents { result = append(result, a) } return result } -func (self *Commander) removeAgent(aid string) { - self.mu.Lock() - defer self.mu.Unlock() - delete(self.agents, aid) +// grpc.StatsHandler implementation. + +func (t *ConnTracker) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context { + return ctx } -func (self *Agent) AddJob(job models.JobForInsert) (int64, error) { - jid, err := self.jobber.InitJob(self.ctx, self.aid, job) - if err != nil { - return 0, err +func (t *ConnTracker) HandleRPC(ctx context.Context, _ stats.RPCStats) {} + +func (t *ConnTracker) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { + return ctx +} + +func (t *ConnTracker) HandleConn(ctx context.Context, s stats.ConnStats) { + switch s.(type) { + case *stats.ConnEnd: + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return + } + aidVals := md["agentid"] + if len(aidVals) == 0 { + return + } + t.Unregister(aidVals[0]) } - self.jobs[jid] = newJob() - self.in <- &proto.Command{ - Id: jid, - Command: job.Command, - Stdin: job.Stdin, - } - return jid, err } -func (self *Agent) WaitJob(jid int64) (*models.Job, error) { - result := <-self.jobs[jid].out - return &result.fc, result.err -} - -func (self *Commander) Stream( +// Stream handles a new agent connection and runs the send/recv loops. +func (c *Commander) Stream( bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command], ) error { md, ok := metadata.FromIncomingContext(bidi.Context()) @@ -121,35 +156,58 @@ func (self *Commander) Stream( aid := aidVals[0] var label string - labelVals := md["label"] - if len(labelVals) > 0 { - label = labelVals[0] + if vals := md["label"]; len(vals) > 0 { + label = vals[0] } - agent := newAgent(bidi, self.jobber, aid, label) - self.mu.Lock() - self.agents[aid] = agent - self.mu.Unlock() + agent := NewAgent(bidi.Context(), c.jobber, aid, label) + agent.bidi = bidi + + c.tracker.Register(aid, agent) + defer c.tracker.Unregister(aid) - defer self.removeAgent(aid) return agent.run() } -func (self *Agent) run() error { +// GetAgent returns the agent by ID. Delegates to the tracker. +func (c *Commander) GetAgent(aid string) (*Agent, bool) { + return c.tracker.GetAgent(aid) +} + +func (a *Agent) AddJob(job models.JobForInsert) (int64, error) { + jid, err := a.jobber.InitJob(a.ctx, a.aid, job) + if err != nil { + return 0, err + } + a.jobs[jid] = newJob() + a.in <- &proto.Command{ + Id: jid, + Command: job.Command, + Stdin: job.Stdin, + } + return jid, nil +} + +func (a *Agent) WaitJob(jid int64) (*models.Job, error) { + result := <-a.jobs[jid].out + return &result.fc, result.err +} + +func (a *Agent) run() error { wg := new(errgroup.Group) - wg.Go(self.recv) - wg.Go(self.send) + wg.Go(a.recv) + wg.Go(a.send) return wg.Wait() } -func (self *Agent) recv() error { +func (a *Agent) recv() error { for { job, err := func() (job models.Job, err error) { - msg, err := self.bidi.Recv() + msg, err := a.bidi.Recv() if err != nil { return } - return self.jobber.UpdateJobInDB(self.ctx, msg.Id, models.JobForUpdate{ + return a.jobber.UpdateJobInDB(a.ctx, msg.Id, models.JobForUpdate{ Stdout: msg.Stdout, Stderr: msg.Stderr, Status: msg.Status, @@ -158,8 +216,7 @@ func (self *Agent) recv() error { if err == io.EOF { return nil } - // TODO: that would blow up at some point - out := self.jobs[job.ID].out + out := a.jobs[job.ID].out out <- JobOut{ fc: job, err: err, @@ -168,28 +225,26 @@ func (self *Agent) recv() error { } } -func (self *Agent) send() error { - for job := range self.in { - if err := self.bidi.Send(job); err != nil { +func (a *Agent) send() error { + for job := range a.in { + if err := a.bidi.Send(job); err != nil { return err } } return io.EOF - // self.jobs[] } -func newAgent( - bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command], +func NewAgent( + ctx context.Context, jobber Jobber, aid string, label string, -) Agent { - return Agent{ - bidi: bidi, - in: make(chan *proto.Command), +) *Agent { + return &Agent{ + in: make(chan *proto.Command, 10), jobs: make(map[int64]Job), jobber: jobber, - ctx: bidi.Context(), + ctx: ctx, aid: aid, Label: label, Token: aid, diff --git a/backend/internal/handlers/agents.go b/backend/internal/handlers/agents.go index 8f6cc2c..4850c90 100644 --- a/backend/internal/handlers/agents.go +++ b/backend/internal/handlers/agents.go @@ -1,6 +1,7 @@ package handlers import ( + "fmt" "net/http" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/grpcsrv/collector" @@ -36,10 +37,14 @@ func (ag *AgentsGroup) List(c *gin.Context) { agents := make([]AgentInfo, 0) for _, agent := range ag.collector.Agents() { + services := make([]string, 0, len(agent.Services)) + for _, s := range agent.Services { + services = append(services, fmt.Sprintf("%s:%s", s.Name, s.Status)) + } agents = append(agents, AgentInfo{ Token: agent.ID, Label: agent.Label, - Services: agent.Services, + Services: services, ConnectedAt: agent.ConnectedAt.Format("2006-01-02 15:04:05"), }) } diff --git a/backend/internal/handlers/jobs.go b/backend/internal/handlers/jobs.go index 63da467..1b371ff 100644 --- a/backend/internal/handlers/jobs.go +++ b/backend/internal/handlers/jobs.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "os/exec" + "strconv" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/grpcsrv/commander" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/models" @@ -13,21 +14,32 @@ import ( ) type JobsHandlers struct { - cmder *commander.Commander - svc *service.ScriptService + tracker *commander.ConnTracker + svc *service.ScriptService + whereami string } -func NewJobsHandlers(cmder *commander.Commander, svc *service.ScriptService) JobsHandlers { - return JobsHandlers{cmder: cmder, svc: svc} +func NewJobsHandlers(tracker *commander.ConnTracker, svc *service.ScriptService, whereami string) JobsHandlers { + return JobsHandlers{tracker: tracker, svc: svc, whereami: whereami} } +// AddJobIn is the request body for creating a job. type AddJobIn struct { Command string `json:"command" binding:"required"` InterpreterID int64 `json:"interpreter_id"` Stdin *string `json:"stdin"` AgentID string `json:"agent_id" binding:"required"` } + +// AddJobOut is the response body for a submitted job. type AddJobOut struct { + ID int64 `json:"id"` + Command []string `json:"command"` + WaitURL string `json:"wait_url"` +} + +// JobResult is the response body for a completed job. +type JobResult struct { ID int64 `json:"id"` Command []string `json:"command"` Stdin *string `json:"stdin"` @@ -36,65 +48,152 @@ type AddJobOut struct { Status int32 `json:"status"` } -// AddJob creates and executes a job on a target agent. -// @Summary Create and run a job on an agent -// @Description Sends a command to the specified agent, waits for execution, and returns the result +// WaitJobIn is the request body for waiting on a job. +type WaitJobIn struct { + AgentID string `json:"agent_id" binding:"required"` +} + +// AddJob submits a job to an agent and returns a wait_url for the result. +// @Summary Submit a job to an agent +// @Description Sends a command to the specified agent and returns a URL to wait for the result // @Tags jobs // @Accept json // @Produce json // @Param body body AddJobIn true "Job request" // @Success 201 {object} AddJobOut -// @Security Bearer // @Router /jobs [post] -func (self *JobsHandlers) AddJob(c *gin.Context) { - err := func() error { - var in AddJobIn - if err := c.Bind(&in); err != nil { - return err - } - agent, ok := self.cmder.GetAgent(in.AgentID) - if !ok { - c.Status(http.StatusNotFound) - return fmt.Errorf("agent not found") - } +func (h *JobsHandlers) AddJob(c *gin.Context) { + var in AddJobIn + if err := c.Bind(&in); err != nil { + c.Error(err) + return + } - var command []string - if in.InterpreterID == 0 { - command = []string{"sh", "-c", in.Command} - } else { - var err error - command, err = self.svc.ResolveCommand( - c.Request.Context(), - in.InterpreterID, - in.Command, - ) - if err != nil { - return err - } - } + agent, ok := h.tracker.GetAgent(in.AgentID) + if !ok { + c.Status(http.StatusNotFound) + c.Error(fmt.Errorf("agent not found")) + return + } - jid, err := agent.AddJob(models.JobForInsert{ - Command: command, - Stdin: in.Stdin, - }) - if err != nil { - return err - } - job, err := agent.WaitJob(jid) - if err != nil && !errors.As(err, &exec.ExitError{}) { - return err - } - c.JSON(http.StatusCreated, AddJobOut{ - ID: job.ID, - Command: job.Command, - Stdin: job.Stdin, - Stdout: job.Stdout, - Stderr: job.Stderr, - Status: job.Status, - }) - return nil - }() + command, err := resolveCommand(c, h.svc, in.InterpreterID, in.Command) if err != nil { c.Error(err) + return } + + jid, err := agent.AddJob(models.JobForInsert{ + Command: command, + Stdin: in.Stdin, + }) + if err != nil { + c.Error(err) + return + } + + waitURL := fmt.Sprintf("%s/api/v1/jobs/%d/wait", h.whereami, jid) + + c.JSON(http.StatusCreated, AddJobOut{ + ID: jid, + Command: command, + WaitURL: waitURL, + }) +} + +// WaitJob waits for a submitted job to complete (long-poll). +// If the job is already done, returns immediately. +// @Summary Wait for job result +// @Description Long-polls for a job result. Returns immediately if the job is already finished. +// @Tags jobs +// @Accept json +// @Produce json +// @Param id path int true "Job ID" +// @Param body body WaitJobIn true "Agent reference" +// @Success 200 {object} JobResult +// @Failure 400 {object} map[string]string +// @Failure 404 {object} map[string]string +// @Router /jobs/{id}/wait [post] +func (h *JobsHandlers) WaitJob(c *gin.Context) { + jid, err := strconv.ParseInt(c.Param("id"), 10, 64) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid job id"}) + return + } + + var in WaitJobIn + if err := c.Bind(&in); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"}) + return + } + + agent, ok := h.tracker.GetAgent(in.AgentID) + if !ok { + c.Status(http.StatusNotFound) + c.Error(fmt.Errorf("agent not found")) + return + } + + job, err := agent.WaitJob(jid) + if err != nil { + c.Error(err) + return + } + + c.JSON(http.StatusOK, JobResult{ + ID: job.ID, + Command: job.Command, + Stdin: job.Stdin, + Stdout: job.Stdout, + Stderr: job.Stderr, + Status: job.Status, + }) +} + +func resolveCommand(c *gin.Context, svc *service.ScriptService, interpID int64, cmd string) ([]string, error) { + if interpID == 0 { + return []string{"sh", "-c", cmd}, nil + } + + command, err := svc.ResolveCommand(c.Request.Context(), interpID, cmd) + if err != nil { + return nil, err + } + + return command, nil +} + +// @Summary Check command path +// @Description Validates that a command binary exists on the system +// @Tags jobs +// @Accept json +// @Param body body CheckCmdIn true "Command to check" +// @Success 200 {object} CheckCmdOut +// @Failure 404 {object} map[string]string +// @Router /jobs/check_cmd [post] +func (h *JobsHandlers) CheckCmd(c *gin.Context) { + var in struct { + Command string `json:"command" binding:"required"` + } + if err := c.Bind(&in); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request"}) + return + } + + if _, err := exec.LookPath(in.Command); err != nil { + if errors.Is(err, exec.ErrNotFound) { + c.JSON(http.StatusNotFound, gin.H{"error": "command not found"}) + return + } + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + c.JSON(http.StatusOK, CheckCmdOut{Exists: true}) +} + +type CheckCmdIn struct { + Command string `json:"command" binding:"required" example:"bash"` +} +type CheckCmdOut struct { + Exists bool `json:"exists"` } diff --git a/backend/internal/handlers/scripts.go b/backend/internal/handlers/scripts.go index 4bae34d..30299e5 100644 --- a/backend/internal/handlers/scripts.go +++ b/backend/internal/handlers/scripts.go @@ -13,12 +13,12 @@ import ( ) type ScriptHandlers struct { - svc *service.ScriptService - cmder *commander.Commander + svc *service.ScriptService + tracker *commander.ConnTracker } -func NewScriptHandlers(svc *service.ScriptService, cmder *commander.Commander) ScriptHandlers { - return ScriptHandlers{svc: svc, cmder: cmder} +func NewScriptHandlers(svc *service.ScriptService, tracker *commander.ConnTracker) ScriptHandlers { + return ScriptHandlers{svc: svc, tracker: tracker} } type RunScriptIn struct { @@ -47,14 +47,14 @@ type RunScriptOut struct { // @Success 201 {object} RunScriptOut // @Security Bearer // @Router /scripts/run [post] -func (self *ScriptHandlers) RunScript(c *gin.Context) { +func (h *ScriptHandlers) RunScript(c *gin.Context) { err := func() error { var in RunScriptIn if err := c.Bind(&in); err != nil { return err } - command, err := self.svc.ResolveCommand( + command, err := h.svc.ResolveCommand( c.Request.Context(), in.InterpreterID, in.ScriptText, @@ -63,7 +63,7 @@ func (self *ScriptHandlers) RunScript(c *gin.Context) { return err } - agent, ok := self.cmder.GetAgent(in.AgentID) + agent, ok := h.tracker.GetAgent(in.AgentID) if !ok { c.Status(http.StatusNotFound) return fmt.Errorf("agent not found") @@ -105,8 +105,8 @@ func (self *ScriptHandlers) RunScript(c *gin.Context) { // @Success 200 {array} repository.ScriptInterpreter // @Security Bearer // @Router /scripts/interpreters [get] -func (self *ScriptHandlers) ListInterpreters(c *gin.Context) { - interpreters, err := self.svc.List(c.Request.Context()) +func (h *ScriptHandlers) ListInterpreters(c *gin.Context) { + interpreters, err := h.svc.List(c.Request.Context()) if err != nil { c.Error(err) return @@ -124,14 +124,14 @@ func (self *ScriptHandlers) ListInterpreters(c *gin.Context) { // @Success 201 {object} repository.ScriptInterpreter // @Security Bearer // @Router /scripts/interpreters [post] -func (self *ScriptHandlers) CreateInterpreter(c *gin.Context) { +func (h *ScriptHandlers) CreateInterpreter(c *gin.Context) { var in repository.ScriptInterpreterCreate if err := c.BindJSON(&in); err != nil { c.Error(err) return } - si, err := self.svc.Create(c.Request.Context(), in) + si, err := h.svc.Create(c.Request.Context(), in) if err != nil { c.Error(err) return @@ -148,14 +148,14 @@ func (self *ScriptHandlers) CreateInterpreter(c *gin.Context) { // @Success 200 {object} repository.ScriptInterpreter // @Security Bearer // @Router /scripts/interpreters/:id [get] -func (self *ScriptHandlers) GetInterpreter(c *gin.Context) { +func (h *ScriptHandlers) GetInterpreter(c *gin.Context) { id, err := strconv.ParseInt(c.Param("id"), 10, 64) if err != nil { c.Error(err) return } - si, err := self.svc.GetByID(c.Request.Context(), id) + si, err := h.svc.GetByID(c.Request.Context(), id) if err != nil { c.Error(err) return @@ -174,7 +174,7 @@ func (self *ScriptHandlers) GetInterpreter(c *gin.Context) { // @Success 200 {object} repository.ScriptInterpreter // @Security Bearer // @Router /scripts/interpreters/:id [put] -func (self *ScriptHandlers) UpdateInterpreter(c *gin.Context) { +func (h *ScriptHandlers) UpdateInterpreter(c *gin.Context) { id, err := strconv.ParseInt(c.Param("id"), 10, 64) if err != nil { c.Error(err) @@ -187,7 +187,7 @@ func (self *ScriptHandlers) UpdateInterpreter(c *gin.Context) { return } - si, err := self.svc.Update(c.Request.Context(), id, in) + si, err := h.svc.Update(c.Request.Context(), id, in) if err != nil { c.Error(err) return @@ -203,14 +203,14 @@ func (self *ScriptHandlers) UpdateInterpreter(c *gin.Context) { // @Success 204 // @Security Bearer // @Router /scripts/interpreters/:id [delete] -func (self *ScriptHandlers) DeleteInterpreter(c *gin.Context) { +func (h *ScriptHandlers) DeleteInterpreter(c *gin.Context) { id, err := strconv.ParseInt(c.Param("id"), 10, 64) if err != nil { c.Error(err) return } - if err := self.svc.Delete(c.Request.Context(), id); err != nil { + if err := h.svc.Delete(c.Request.Context(), id); err != nil { c.Error(err) return }