2 Commits

Author SHA1 Message Date
zero@thinky 428140ff15 feat(backend): add job metrics
ci-agent / build (push) Failing after 3m1s
2026-04-05 00:44:57 +03:00
zero@thinky 7be99f8e91 feat: big ahh commit
- agent+proto+backend: transfer service status
- agent: fix returning empty message on nonzero exit status
- backend: refactor collector+commander and handlers dependent on them: implement agent accounting via grpc stats handler
2026-04-05 00:44:56 +03:00
13 changed files with 618 additions and 190 deletions
+1 -1
View File
@@ -3,7 +3,7 @@ module gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent
go 1.26.1 go 1.26.1
require ( require (
gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto v0.0.0-20260403214837-94be9799f47d gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto v0.0.0-20260404174628-3389df740c20
github.com/hpcloud/tail v1.0.0 github.com/hpcloud/tail v1.0.0
github.com/samber/lo v1.53.0 github.com/samber/lo v1.53.0
golang.org/x/sync v0.20.0 golang.org/x/sync v0.20.0
+8 -2
View File
@@ -47,8 +47,14 @@ func (*CommandExecutor) Execute(command *proto.Command) (fc *proto.FinishedComma
_, err := io.Copy(stderrbuf, stderr) _, err := io.Copy(stderrbuf, stderr)
return err return err
}) })
if err := cmd.Wait(); err != nil { if waitErr := cmd.Wait(); waitErr != nil {
return nil, err var exitErr *exec.ExitError
if !errors.As(waitErr, &exitErr) {
return nil, waitErr
}
fc.Status = int32(exitErr.ExitCode())
} else {
fc.Status = int32(cmd.ProcessState.ExitCode())
} }
if err := eg.Wait(); err != nil { if err := eg.Wait(); err != nil {
return nil, err return nil, err
+57
View File
@@ -22,6 +22,7 @@ import (
"github.com/samber/lo" "github.com/samber/lo"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
) )
@@ -110,6 +111,13 @@ func main() {
return ccli.HandleCommands(ctx, grpcAddr, creds) return ccli.HandleCommands(ctx, grpcAddr, creds)
}) })
// Start services update stream
if len(cfg.Services) > 0 {
wg.Go(func() error {
return reportServices(ctx, grpcAddr, creds, cfg.Label, cfg.Services, lgr)
})
}
// Start log collectors // Start log collectors
if len(cfg.Services) > 0 { if len(cfg.Services) > 0 {
wg.Go(func() error { wg.Go(func() error {
@@ -301,3 +309,52 @@ func reconnectStream(
return fmt.Errorf("failed to reconnect after 5 attempts for service %s", service) return fmt.Errorf("failed to reconnect after 5 attempts for service %s", service)
} }
// reportServices periodically sends service status updates to the backend via gRPC.
// For now, all configured services are reported as "up" every 5 seconds.
func reportServices(
ctx context.Context,
grpcAddr string,
creds credentials.TransportCredentials,
label string,
services []config.ServiceConfig,
lgr *logger.Logger,
) error {
conn, err := grpc.NewClient(grpcAddr, grpc.WithTransportCredentials(creds))
if err != nil {
return fmt.Errorf("failed to connect for services report: %w", err)
}
defer conn.Close()
ccli := proto.NewCollectorClient(conn)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
// Send immediately on start, then every 5 seconds
for {
svcUpdates := make([]*proto.ServicesUpdate_ServiceUpdate, 0, len(services))
for _, svc := range services {
svcUpdates = append(svcUpdates, &proto.ServicesUpdate_ServiceUpdate{
Name: svc.Name,
Status: "up",
})
}
md := metadata.New(map[string]string{"whoami": label})
_, err := ccli.ReportServices(
metadata.NewOutgoingContext(ctx, md),
&proto.ServicesUpdate{Services: svcUpdates},
)
if err != nil {
lgr.Warn("Failed to report services", "err", err)
} else {
lgr.Debug("Services reported successfully", "count", len(services))
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
}
}
+19 -6
View File
@@ -82,10 +82,13 @@ func main() {
}() }()
} }
// Initialize Collector gRPC service // Initialize Collector (log streaming) with its own ConnTracker
coll := collector.New(logRepo) collTracker := collector.NewConnTracker()
coll := collector.New(logRepo, collTracker)
cmdr := commander.New(jobRepo) // Initialize ConnTracker for Commander agent lifecycle
cmdTracker := commander.NewConnTracker()
cmdr := commander.New(jobRepo, cmdTracker)
// Initialize script interpreter repository and service // Initialize script interpreter repository and service
scriptRepo := repository.NewScriptInterpreterRepo(db) scriptRepo := repository.NewScriptInterpreterRepo(db)
@@ -93,8 +96,11 @@ func main() {
log.Printf("Warning: failed to initialize script interpreters table: %v", err) log.Printf("Warning: failed to initialize script interpreters table: %v", err)
} }
scriptSvc := service.NewScriptServiceWithInterpreters(h.Repo, scriptRepo) scriptSvc := service.NewScriptServiceWithInterpreters(h.Repo, scriptRepo)
scriptHandlers := handlers.NewScriptHandlers(scriptSvc, cmdr) scriptHandlers := handlers.NewScriptHandlers(scriptSvc, cmdTracker)
jobsHandlers := handlers.NewJobsHandlers(cmdr, scriptSvc) jobsHandlers := handlers.NewJobsHandlers(cmdTracker, scriptSvc,
os.Getenv("WHEREAMI"), /* our address for redirects */
jobRepo,
)
// Initialize script management service and handlers // Initialize script management service and handlers
scriptManageSvc := service.NewScriptService(h.Repo) scriptManageSvc := service.NewScriptService(h.Repo)
@@ -201,6 +207,9 @@ func main() {
jobsGroup.Use(auth.AuthMiddleware(), handlers.RequireAdmin()) jobsGroup.Use(auth.AuthMiddleware(), handlers.RequireAdmin())
{ {
jobsGroup.POST("", jobsHandlers.AddJob) jobsGroup.POST("", jobsHandlers.AddJob)
jobsGroup.POST("/:id/wait", jobsHandlers.WaitJob)
jobsGroup.GET("/metrics", jobsHandlers.GetJobMetrics)
jobsGroup.POST("/check_cmd", jobsHandlers.CheckCmd)
} }
// Agent registration // Agent registration
@@ -290,7 +299,11 @@ func main() {
MinVersion: tls.VersionTLS12, MinVersion: tls.VersionTLS12,
} }
grpcServer := grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsConfig))) grpcServer := grpc.NewServer(
grpc.Creds(credentials.NewTLS(tlsConfig)),
grpc.StatsHandler(collTracker),
grpc.StatsHandler(cmdTracker),
)
proto.RegisterCommanderServer(grpcServer, cmdr) proto.RegisterCommanderServer(grpcServer, cmdr)
proto.RegisterCollectorServer(grpcServer, coll) proto.RegisterCollectorServer(grpcServer, coll)
+1 -1
View File
@@ -3,7 +3,7 @@ module gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend
go 1.26.1 go 1.26.1
require ( require (
gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto v0.0.0-20260404165608-1d75935a08a5 gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto v0.0.0-20260404174628-3389df740c20
github.com/ClickHouse/clickhouse-go/v2 v2.44.0 github.com/ClickHouse/clickhouse-go/v2 v2.44.0
github.com/gin-gonic/gin v1.12.0 github.com/gin-gonic/gin v1.12.0
github.com/samber/lo v1.53.0 github.com/samber/lo v1.53.0
+12 -41
View File
@@ -4,7 +4,6 @@ import (
"fmt" "fmt"
"io" "io"
"log" "log"
"sync"
"time" "time"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/repository" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/repository"
@@ -13,26 +12,19 @@ import (
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
) )
// Collector handles log streaming from connected agents.
type Collector struct { type Collector struct {
proto.UnimplementedCollectorServer proto.UnimplementedCollectorServer
logRepo *repository.LogRepository logRepo *repository.LogRepository
agents map[string]*Agent tracker *ConnTracker
mu sync.RWMutex
batchSize int batchSize int
flushInterval time.Duration flushInterval time.Duration
} }
type Agent struct { func New(logRepo *repository.LogRepository, tracker *ConnTracker) *Collector {
ID string
Label string
Services []string
ConnectedAt time.Time
}
func New(logRepo *repository.LogRepository) *Collector {
return &Collector{ return &Collector{
logRepo: logRepo, logRepo: logRepo,
agents: make(map[string]*Agent), tracker: tracker,
batchSize: 100, batchSize: 100,
flushInterval: 2 * time.Second, flushInterval: 2 * time.Second,
} }
@@ -56,27 +48,15 @@ func (c *Collector) Stream(stream proto.Collector_StreamServer) error {
} }
service := serviceVals[0] service := serviceVals[0]
servicesVals := md["services"] agent := &Agent{
var services []string
if len(servicesVals) > 0 {
services = servicesVals
}
// Register agent
c.mu.Lock()
c.agents[agentName] = &Agent{
ID: agentName, ID: agentName,
Label: agentName, Label: agentName,
Services: services, Services: make([]Service, 0),
ConnectedAt: time.Now(), ConnectedAt: time.Now(),
} }
c.mu.Unlock()
defer func() { c.tracker.Register(agent)
c.mu.Lock() defer c.tracker.Unregister(agent.ID)
delete(c.agents, agentName)
c.mu.Unlock()
}()
log.Printf("Agent %s connected, streaming logs for service: %s", agentName, service) log.Printf("Agent %s connected, streaming logs for service: %s", agentName, service)
@@ -139,7 +119,6 @@ func (c *Collector) Stream(stream proto.Collector_StreamServer) error {
for { for {
select { select {
case <-stream.Context().Done(): case <-stream.Context().Done():
// Context cancelled, flush remaining
_ = flush() _ = flush()
return stream.Context().Err() return stream.Context().Err()
case <-ticker.C: case <-ticker.C:
@@ -162,7 +141,6 @@ func (c *Collector) Stream(stream proto.Collector_StreamServer) error {
} }
case err := <-errCh: case err := <-errCh:
if err == io.EOF { if err == io.EOF {
// Client closed stream
return flush() return flush()
} }
return fmt.Errorf("failed to receive: %w", err) return fmt.Errorf("failed to receive: %w", err)
@@ -170,19 +148,12 @@ func (c *Collector) Stream(stream proto.Collector_StreamServer) error {
} }
} }
// GetAgent delegates to the tracker.
func (c *Collector) GetAgent(name string) (*Agent, bool) { func (c *Collector) GetAgent(name string) (*Agent, bool) {
c.mu.RLock() return c.tracker.GetAgent(name)
defer c.mu.RUnlock()
a, ok := c.agents[name]
return a, ok
} }
// Agents delegates to the tracker.
func (c *Collector) Agents() []*Agent { func (c *Collector) Agents() []*Agent {
c.mu.RLock() return c.tracker.Agents()
defer c.mu.RUnlock()
result := make([]*Agent, 0, len(c.agents))
for _, a := range c.agents {
result = append(result, a)
}
return result
} }
@@ -0,0 +1,38 @@
package collector
import (
"context"
"fmt"
"log"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto/proto"
"google.golang.org/grpc/metadata"
)
// ReportServices handles a unary service status update from an agent.
// Agents send their current services list, which is stored in the collector.
func (c *Collector) ReportServices(ctx context.Context, req *proto.ServicesUpdate) (*proto.ServicesUpdateResp, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, fmt.Errorf("no metadata in context")
}
whoamiVals := md["whoami"]
if len(whoamiVals) == 0 {
return nil, fmt.Errorf("whoami metadata missing")
}
agentName := whoamiVals[0]
services := make([]Service, 0, len(req.Services))
for _, s := range req.Services {
services = append(services, Service{s.Name, s.Status})
}
if ok := c.tracker.UpdateServices(agentName, services); ok {
log.Printf("Updated services for agent %s: %v", agentName, services)
} else {
log.Printf("Warning: received services update for unknown agent %s", agentName)
}
return &proto.ServicesUpdateResp{}, nil
}
@@ -0,0 +1,111 @@
package collector
import (
"context"
"log"
"sync"
"time"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
)
// ConnTracker tracks connected Collector agents and handles cleanup on disconnect.
// It implements grpc.StatsHandler for disconnect detection.
type ConnTracker struct {
mu sync.RWMutex
agents map[string]*Agent
}
func NewConnTracker() *ConnTracker {
return &ConnTracker{
agents: make(map[string]*Agent),
}
}
// Register adds an agent to the tracker. Called by Collector.Stream().
func (t *ConnTracker) Register(agent *Agent) {
t.mu.Lock()
t.agents[agent.ID] = agent
t.mu.Unlock()
log.Printf("[collector] agent registered: %s", agent.ID)
}
// Unregister removes an agent from the tracker.
func (t *ConnTracker) Unregister(id string) {
t.mu.Lock()
delete(t.agents, id)
t.mu.Unlock()
log.Printf("[collector] agent unregistered: %s", id)
}
// GetAgent returns the agent for the given ID.
func (t *ConnTracker) GetAgent(id string) (*Agent, bool) {
t.mu.RLock()
defer t.mu.RUnlock()
a, ok := t.agents[id]
return a, ok
}
// Agents returns all connected agents.
func (t *ConnTracker) Agents() []*Agent {
t.mu.RLock()
defer t.mu.RUnlock()
result := make([]*Agent, 0, len(t.agents))
for _, a := range t.agents {
result = append(result, a)
}
return result
}
// grpc.StatsHandler implementation.
func (t *ConnTracker) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
return ctx
}
func (t *ConnTracker) HandleRPC(ctx context.Context, _ stats.RPCStats) {}
func (t *ConnTracker) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}
func (t *ConnTracker) HandleConn(ctx context.Context, s stats.ConnStats) {
switch s.(type) {
case *stats.ConnEnd:
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return
}
whoamiVals := md["whoami"]
if len(whoamiVals) == 0 {
return
}
t.Unregister(whoamiVals[0])
}
}
// UpdateServices updates the services list for the given agent.
func (t *ConnTracker) UpdateServices(id string, services []Service) bool {
t.mu.Lock()
defer t.mu.Unlock()
agent, ok := t.agents[id]
if !ok {
return false
}
agent.Services = services
return true
}
// Service represents a named service with its current status.
type Service struct {
Name, Status string
}
// Agent represents a connected agent streaming logs to the collector.
type Agent struct {
ID string
Label string
Services []Service
ConnectedAt time.Time
}
+119 -64
View File
@@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"log"
"sync" "sync"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/models" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/models"
@@ -11,27 +12,30 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
) )
// Commander handles command execution on connected agents.
type Commander struct { type Commander struct {
proto.UnimplementedCommanderServer proto.UnimplementedCommanderServer
agents map[string]Agent tracker *ConnTracker
mu sync.RWMutex
jobber Jobber jobber Jobber
} }
// Jobber persists job state.
type Jobber interface { type Jobber interface {
InitJob(ctx context.Context, agentID string, job models.JobForInsert) (int64, error) InitJob(ctx context.Context, agentID string, job models.JobForInsert) (int64, error)
UpdateJobInDB(ctx context.Context, jid int64, msg models.JobForUpdate) (models.Job, error) UpdateJobInDB(ctx context.Context, jid int64, msg models.JobForUpdate) (models.Job, error)
} }
func New(jobber Jobber) *Commander { func New(jobber Jobber, tracker *ConnTracker) *Commander {
return &Commander{ return &Commander{
agents: make(map[string]Agent),
jobber: jobber, jobber: jobber,
tracker: tracker,
} }
} }
// Agent represents a connected agent with an active bidirectional stream.
type Agent struct { type Agent struct {
bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command] bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command]
in chan *proto.Command in chan *proto.Command
@@ -40,10 +44,11 @@ type Agent struct {
ctx context.Context ctx context.Context
aid string aid string
Token string // agent id Token string
Label string Label string
Services []string Services []string
} }
type JobOut struct { type JobOut struct {
fc models.Job fc models.Job
err error err error
@@ -53,61 +58,91 @@ type Job struct {
out chan JobOut out chan JobOut
} }
func (self *Commander) GetAgent(aid string) (agent Agent, ok bool) { // ConnTracker tracks connected agents and handles cleanup on disconnect.
self.mu.RLock() // It implements grpc.StatsHandler for disconnect detection.
defer self.mu.RUnlock() type ConnTracker struct {
agent, ok = self.agents[aid] mu sync.RWMutex
return agents map[string]*Agent
} }
// GetAgentByLabel searches for an agent by its human-readable label. // GetAgentByLabel searches for an agent by its human-readable label.
func (self *Commander) GetAgentByLabel(label string) (agent Agent, ok bool) { func (self *ConnTracker) GetAgentByLabel(label string) (agent Agent, ok bool) {
self.mu.RLock() self.mu.RLock()
defer self.mu.RUnlock() defer self.mu.RUnlock()
for _, a := range self.agents { for _, a := range self.agents {
if a.Label == label { if a.Label == label {
return a, true return *a, true
} }
} }
return return
} }
func (self *Commander) Agents() []Agent { func NewConnTracker() *ConnTracker {
self.mu.RLock() return &ConnTracker{
defer self.mu.RUnlock() agents: make(map[string]*Agent),
result := make([]Agent, 0, len(self.agents)) }
for _, a := range self.agents { }
func (t *ConnTracker) Register(aid string, agent *Agent) {
t.mu.Lock()
t.agents[aid] = agent
t.mu.Unlock()
log.Printf("[conntracker] agent registered: %s", aid)
}
func (t *ConnTracker) Unregister(aid string) {
t.mu.Lock()
delete(t.agents, aid)
t.mu.Unlock()
log.Printf("[conntracker] agent unregistered: %s", aid)
}
func (t *ConnTracker) GetAgent(aid string) (*Agent, bool) {
t.mu.RLock()
defer t.mu.RUnlock()
a, ok := t.agents[aid]
return a, ok
}
func (t *ConnTracker) Agents() []*Agent {
t.mu.RLock()
defer t.mu.RUnlock()
result := make([]*Agent, 0, len(t.agents))
for _, a := range t.agents {
result = append(result, a) result = append(result, a)
} }
return result return result
} }
func (self *Commander) removeAgent(aid string) { // grpc.StatsHandler implementation.
self.mu.Lock()
defer self.mu.Unlock() func (t *ConnTracker) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
delete(self.agents, aid) return ctx
} }
func (self *Agent) AddJob(job models.JobForInsert) (int64, error) { func (t *ConnTracker) HandleRPC(ctx context.Context, _ stats.RPCStats) {}
jid, err := self.jobber.InitJob(self.ctx, self.aid, job)
if err != nil { func (t *ConnTracker) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return 0, err return ctx
}
self.jobs[jid] = newJob()
self.in <- &proto.Command{
Id: jid,
Command: job.Command,
Stdin: job.Stdin,
}
return jid, err
} }
func (self *Agent) WaitJob(jid int64) (*models.Job, error) { func (t *ConnTracker) HandleConn(ctx context.Context, s stats.ConnStats) {
result := <-self.jobs[jid].out switch s.(type) {
return &result.fc, result.err case *stats.ConnEnd:
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return
}
aidVals := md["agentid"]
if len(aidVals) == 0 {
return
}
t.Unregister(aidVals[0])
}
} }
func (self *Commander) Stream( // Stream handles a new agent connection and runs the send/recv loops.
func (c *Commander) Stream(
bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command], bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command],
) error { ) error {
md, ok := metadata.FromIncomingContext(bidi.Context()) md, ok := metadata.FromIncomingContext(bidi.Context())
@@ -121,35 +156,58 @@ func (self *Commander) Stream(
aid := aidVals[0] aid := aidVals[0]
var label string var label string
labelVals := md["label"] if vals := md["label"]; len(vals) > 0 {
if len(labelVals) > 0 { label = vals[0]
label = labelVals[0]
} }
agent := newAgent(bidi, self.jobber, aid, label) agent := NewAgent(bidi.Context(), c.jobber, aid, label)
self.mu.Lock() agent.bidi = bidi
self.agents[aid] = agent
self.mu.Unlock() c.tracker.Register(aid, agent)
defer c.tracker.Unregister(aid)
defer self.removeAgent(aid)
return agent.run() return agent.run()
} }
func (self *Agent) run() error { // GetAgent returns the agent by ID. Delegates to the tracker.
func (c *Commander) GetAgent(aid string) (*Agent, bool) {
return c.tracker.GetAgent(aid)
}
func (a *Agent) AddJob(job models.JobForInsert) (int64, error) {
jid, err := a.jobber.InitJob(a.ctx, a.aid, job)
if err != nil {
return 0, err
}
a.jobs[jid] = newJob()
a.in <- &proto.Command{
Id: jid,
Command: job.Command,
Stdin: job.Stdin,
}
return jid, nil
}
func (a *Agent) WaitJob(jid int64) (*models.Job, error) {
result := <-a.jobs[jid].out
return &result.fc, result.err
}
func (a *Agent) run() error {
wg := new(errgroup.Group) wg := new(errgroup.Group)
wg.Go(self.recv) wg.Go(a.recv)
wg.Go(self.send) wg.Go(a.send)
return wg.Wait() return wg.Wait()
} }
func (self *Agent) recv() error { func (a *Agent) recv() error {
for { for {
job, err := func() (job models.Job, err error) { job, err := func() (job models.Job, err error) {
msg, err := self.bidi.Recv() msg, err := a.bidi.Recv()
if err != nil { if err != nil {
return return
} }
return self.jobber.UpdateJobInDB(self.ctx, msg.Id, models.JobForUpdate{ return a.jobber.UpdateJobInDB(a.ctx, msg.Id, models.JobForUpdate{
Stdout: msg.Stdout, Stdout: msg.Stdout,
Stderr: msg.Stderr, Stderr: msg.Stderr,
Status: msg.Status, Status: msg.Status,
@@ -158,8 +216,7 @@ func (self *Agent) recv() error {
if err == io.EOF { if err == io.EOF {
return nil return nil
} }
// TODO: that would blow up at some point out := a.jobs[job.ID].out
out := self.jobs[job.ID].out
out <- JobOut{ out <- JobOut{
fc: job, fc: job,
err: err, err: err,
@@ -168,28 +225,26 @@ func (self *Agent) recv() error {
} }
} }
func (self *Agent) send() error { func (a *Agent) send() error {
for job := range self.in { for job := range a.in {
if err := self.bidi.Send(job); err != nil { if err := a.bidi.Send(job); err != nil {
return err return err
} }
} }
return io.EOF return io.EOF
// self.jobs[]
} }
func newAgent( func NewAgent(
bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command], ctx context.Context,
jobber Jobber, jobber Jobber,
aid string, aid string,
label string, label string,
) Agent { ) *Agent {
return Agent{ return &Agent{
bidi: bidi, in: make(chan *proto.Command, 10),
in: make(chan *proto.Command),
jobs: make(map[int64]Job), jobs: make(map[int64]Job),
jobber: jobber, jobber: jobber,
ctx: bidi.Context(), ctx: ctx,
aid: aid, aid: aid,
Label: label, Label: label,
Token: aid, Token: aid,
+6 -1
View File
@@ -1,6 +1,7 @@
package handlers package handlers
import ( import (
"fmt"
"net/http" "net/http"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/grpcsrv/collector" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/grpcsrv/collector"
@@ -36,10 +37,14 @@ func (ag *AgentsGroup) List(c *gin.Context) {
agents := make([]AgentInfo, 0) agents := make([]AgentInfo, 0)
for _, agent := range ag.collector.Agents() { for _, agent := range ag.collector.Agents() {
services := make([]string, 0, len(agent.Services))
for _, s := range agent.Services {
services = append(services, fmt.Sprintf("%s:%s", s.Name, s.Status))
}
agents = append(agents, AgentInfo{ agents = append(agents, AgentInfo{
Token: agent.ID, Token: agent.ID,
Label: agent.Label, Label: agent.Label,
Services: agent.Services, Services: services,
ConnectedAt: agent.ConnectedAt.Format("2006-01-02 15:04:05"), ConnectedAt: agent.ConnectedAt.Format("2006-01-02 15:04:05"),
}) })
} }
+179 -34
View File
@@ -5,29 +5,44 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"os/exec" "os/exec"
"strconv"
"time"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/grpcsrv/commander" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/grpcsrv/commander"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/models" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/models"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/repository"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/service" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/service"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
type JobsHandlers struct { type JobsHandlers struct {
cmder *commander.Commander tracker *commander.ConnTracker
svc *service.ScriptService svc *service.ScriptService
whereami string
jobRepo *repository.JobRepository
} }
func NewJobsHandlers(cmder *commander.Commander, svc *service.ScriptService) JobsHandlers { func NewJobsHandlers(tracker *commander.ConnTracker, svc *service.ScriptService, whereami string, jobRepo *repository.JobRepository) JobsHandlers {
return JobsHandlers{cmder: cmder, svc: svc} return JobsHandlers{tracker: tracker, svc: svc, whereami: whereami, jobRepo: jobRepo}
} }
// AddJobIn is the request body for creating a job.
type AddJobIn struct { type AddJobIn struct {
Command string `json:"command" binding:"required"` Command string `json:"command" binding:"required"`
InterpreterID int64 `json:"interpreter_id"` InterpreterID int64 `json:"interpreter_id"`
Stdin *string `json:"stdin"` Stdin *string `json:"stdin"`
AgentID string `json:"agent_id" binding:"required"` AgentID string `json:"agent_id" binding:"required"`
} }
// AddJobOut is the response body for a submitted job.
type AddJobOut struct { type AddJobOut struct {
ID int64 `json:"id"`
Command []string `json:"command"`
WaitURL string `json:"wait_url"`
}
// JobResult is the response body for a completed job.
type JobResult struct {
ID int64 `json:"id"` ID int64 `json:"id"`
Command []string `json:"command"` Command []string `json:"command"`
Stdin *string `json:"stdin"` Stdin *string `json:"stdin"`
@@ -36,41 +51,38 @@ type AddJobOut struct {
Status int32 `json:"status"` Status int32 `json:"status"`
} }
// AddJob creates and executes a job on a target agent. // WaitJobIn is the request body for waiting on a job.
// @Summary Create and run a job on an agent type WaitJobIn struct {
// @Description Sends a command to the specified agent, waits for execution, and returns the result AgentID string `json:"agent_id" binding:"required"`
}
// AddJob submits a job to an agent and returns a wait_url for the result.
// @Summary Submit a job to an agent
// @Description Sends a command to the specified agent and returns a URL to wait for the result
// @Tags jobs // @Tags jobs
// @Accept json // @Accept json
// @Produce json // @Produce json
// @Param body body AddJobIn true "Job request" // @Param body body AddJobIn true "Job request"
// @Success 201 {object} AddJobOut // @Success 201 {object} AddJobOut
// @Security Bearer
// @Router /jobs [post] // @Router /jobs [post]
func (self *JobsHandlers) AddJob(c *gin.Context) { func (h *JobsHandlers) AddJob(c *gin.Context) {
err := func() error {
var in AddJobIn var in AddJobIn
if err := c.Bind(&in); err != nil { if err := c.Bind(&in); err != nil {
return err c.Error(err)
} return
agent, ok := self.cmder.GetAgent(in.AgentID)
if !ok {
c.Status(http.StatusNotFound)
return fmt.Errorf("agent not found")
} }
var command []string agent, ok := h.tracker.GetAgent(in.AgentID)
if in.InterpreterID == 0 { if !ok {
command = []string{"sh", "-c", in.Command} c.Status(http.StatusNotFound)
} else { c.Error(fmt.Errorf("agent not found"))
var err error return
command, err = self.svc.ResolveCommand(
c.Request.Context(),
in.InterpreterID,
in.Command,
)
if err != nil {
return err
} }
command, err := resolveCommand(c, h.svc, in.InterpreterID, in.Command)
if err != nil {
c.Error(err)
return
} }
jid, err := agent.AddJob(models.JobForInsert{ jid, err := agent.AddJob(models.JobForInsert{
@@ -78,13 +90,59 @@ func (self *JobsHandlers) AddJob(c *gin.Context) {
Stdin: in.Stdin, Stdin: in.Stdin,
}) })
if err != nil { if err != nil {
return err c.Error(err)
} return
job, err := agent.WaitJob(jid)
if err != nil && !errors.As(err, &exec.ExitError{}) {
return err
} }
waitURL := fmt.Sprintf("%s/api/v1/jobs/%d/wait", h.whereami, jid)
c.JSON(http.StatusCreated, AddJobOut{ c.JSON(http.StatusCreated, AddJobOut{
ID: jid,
Command: command,
WaitURL: waitURL,
})
}
// WaitJob waits for a submitted job to complete (long-poll).
// If the job is already done, returns immediately.
// @Summary Wait for job result
// @Description Long-polls for a job result. Returns immediately if the job is already finished.
// @Tags jobs
// @Accept json
// @Produce json
// @Param id path int true "Job ID"
// @Param body body WaitJobIn true "Agent reference"
// @Success 200 {object} JobResult
// @Failure 400 {object} map[string]string
// @Failure 404 {object} map[string]string
// @Router /jobs/{id}/wait [post]
func (h *JobsHandlers) WaitJob(c *gin.Context) {
jid, err := strconv.ParseInt(c.Param("id"), 10, 64)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid job id"})
return
}
var in WaitJobIn
if err := c.Bind(&in); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
return
}
agent, ok := h.tracker.GetAgent(in.AgentID)
if !ok {
c.Status(http.StatusNotFound)
c.Error(fmt.Errorf("agent not found"))
return
}
job, err := agent.WaitJob(jid)
if err != nil {
c.Error(err)
return
}
c.JSON(http.StatusOK, JobResult{
ID: job.ID, ID: job.ID,
Command: job.Command, Command: job.Command,
Stdin: job.Stdin, Stdin: job.Stdin,
@@ -92,9 +150,96 @@ func (self *JobsHandlers) AddJob(c *gin.Context) {
Stderr: job.Stderr, Stderr: job.Stderr,
Status: job.Status, Status: job.Status,
}) })
return nil }
}()
func resolveCommand(c *gin.Context, svc *service.ScriptService, interpID int64, cmd string) ([]string, error) {
if interpID == 0 {
return []string{"sh", "-c", cmd}, nil
}
command, err := svc.ResolveCommand(c.Request.Context(), interpID, cmd)
if err != nil {
return nil, err
}
return command, nil
}
// @Summary Check command path
// @Description Validates that a command binary exists on the system
// @Tags jobs
// @Accept json
// @Param body body CheckCmdIn true "Command to check"
// @Success 200 {object} CheckCmdOut
// @Failure 404 {object} map[string]string
// @Router /jobs/check_cmd [post]
func (h *JobsHandlers) CheckCmd(c *gin.Context) {
var in struct {
Command string `json:"command" binding:"required"`
}
if err := c.Bind(&in); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request"})
return
}
if _, err := exec.LookPath(in.Command); err != nil {
if errors.Is(err, exec.ErrNotFound) {
c.JSON(http.StatusNotFound, gin.H{"error": "command not found"})
return
}
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, CheckCmdOut{Exists: true})
}
type CheckCmdIn struct {
Command string `json:"command" binding:"required" example:"bash"`
}
type CheckCmdOut struct {
Exists bool `json:"exists"`
}
// JobMetricsOut is the response body for the job metrics endpoint.
type JobMetricsOut struct {
Total int `json:"total"`
Success int `json:"success"`
Failed int `json:"failed"`
Pending int `json:"pending"`
Period string `json:"period"`
}
// GetJobMetrics returns job success metrics over a parameterized period.
// @Summary Get job metrics
// @Description Returns total, successful, failed, and pending job counts over the given period
// @Tags jobs
// @Produce json
// @Param period query string false "Time period (e.g. 1h, 24h, 7d)" default(24h)
// @Success 200 {object} JobMetricsOut
// @Failure 400 {object} map[string]string
// @Security Bearer
// @Router /jobs/metrics [get]
func (h *JobsHandlers) GetJobMetrics(c *gin.Context) {
periodStr := c.DefaultQuery("period", "24h")
period, err := time.ParseDuration(periodStr)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid period, use Go duration format (e.g. 1h, 24h, 7d)"})
return
}
since := time.Now().Add(-period)
metrics, err := h.jobRepo.GetJobMetrics(c.Request.Context(), since)
if err != nil { if err != nil {
c.Error(err) c.Error(err)
return
} }
c.JSON(http.StatusOK, JobMetricsOut{
Total: metrics.Total,
Success: metrics.Success,
Failed: metrics.Failed,
Pending: metrics.Pending,
Period: periodStr,
})
} }
+16 -16
View File
@@ -14,11 +14,11 @@ import (
type ScriptHandlers struct { type ScriptHandlers struct {
svc *service.ScriptService svc *service.ScriptService
cmder *commander.Commander tracker *commander.ConnTracker
} }
func NewScriptHandlers(svc *service.ScriptService, cmder *commander.Commander) ScriptHandlers { func NewScriptHandlers(svc *service.ScriptService, tracker *commander.ConnTracker) ScriptHandlers {
return ScriptHandlers{svc: svc, cmder: cmder} return ScriptHandlers{svc: svc, tracker: tracker}
} }
type RunScriptIn struct { type RunScriptIn struct {
@@ -47,14 +47,14 @@ type RunScriptOut struct {
// @Success 201 {object} RunScriptOut // @Success 201 {object} RunScriptOut
// @Security Bearer // @Security Bearer
// @Router /scripts/run [post] // @Router /scripts/run [post]
func (self *ScriptHandlers) RunScript(c *gin.Context) { func (h *ScriptHandlers) RunScript(c *gin.Context) {
err := func() error { err := func() error {
var in RunScriptIn var in RunScriptIn
if err := c.Bind(&in); err != nil { if err := c.Bind(&in); err != nil {
return err return err
} }
command, err := self.svc.ResolveCommand( command, err := h.svc.ResolveCommand(
c.Request.Context(), c.Request.Context(),
in.InterpreterID, in.InterpreterID,
in.ScriptText, in.ScriptText,
@@ -63,7 +63,7 @@ func (self *ScriptHandlers) RunScript(c *gin.Context) {
return err return err
} }
agent, ok := self.cmder.GetAgent(in.AgentID) agent, ok := h.tracker.GetAgent(in.AgentID)
if !ok { if !ok {
c.Status(http.StatusNotFound) c.Status(http.StatusNotFound)
return fmt.Errorf("agent not found") return fmt.Errorf("agent not found")
@@ -105,8 +105,8 @@ func (self *ScriptHandlers) RunScript(c *gin.Context) {
// @Success 200 {array} repository.ScriptInterpreter // @Success 200 {array} repository.ScriptInterpreter
// @Security Bearer // @Security Bearer
// @Router /scripts/interpreters [get] // @Router /scripts/interpreters [get]
func (self *ScriptHandlers) ListInterpreters(c *gin.Context) { func (h *ScriptHandlers) ListInterpreters(c *gin.Context) {
interpreters, err := self.svc.List(c.Request.Context()) interpreters, err := h.svc.List(c.Request.Context())
if err != nil { if err != nil {
c.Error(err) c.Error(err)
return return
@@ -124,14 +124,14 @@ func (self *ScriptHandlers) ListInterpreters(c *gin.Context) {
// @Success 201 {object} repository.ScriptInterpreter // @Success 201 {object} repository.ScriptInterpreter
// @Security Bearer // @Security Bearer
// @Router /scripts/interpreters [post] // @Router /scripts/interpreters [post]
func (self *ScriptHandlers) CreateInterpreter(c *gin.Context) { func (h *ScriptHandlers) CreateInterpreter(c *gin.Context) {
var in repository.ScriptInterpreterCreate var in repository.ScriptInterpreterCreate
if err := c.BindJSON(&in); err != nil { if err := c.BindJSON(&in); err != nil {
c.Error(err) c.Error(err)
return return
} }
si, err := self.svc.Create(c.Request.Context(), in) si, err := h.svc.Create(c.Request.Context(), in)
if err != nil { if err != nil {
c.Error(err) c.Error(err)
return return
@@ -148,14 +148,14 @@ func (self *ScriptHandlers) CreateInterpreter(c *gin.Context) {
// @Success 200 {object} repository.ScriptInterpreter // @Success 200 {object} repository.ScriptInterpreter
// @Security Bearer // @Security Bearer
// @Router /scripts/interpreters/:id [get] // @Router /scripts/interpreters/:id [get]
func (self *ScriptHandlers) GetInterpreter(c *gin.Context) { func (h *ScriptHandlers) GetInterpreter(c *gin.Context) {
id, err := strconv.ParseInt(c.Param("id"), 10, 64) id, err := strconv.ParseInt(c.Param("id"), 10, 64)
if err != nil { if err != nil {
c.Error(err) c.Error(err)
return return
} }
si, err := self.svc.GetByID(c.Request.Context(), id) si, err := h.svc.GetByID(c.Request.Context(), id)
if err != nil { if err != nil {
c.Error(err) c.Error(err)
return return
@@ -174,7 +174,7 @@ func (self *ScriptHandlers) GetInterpreter(c *gin.Context) {
// @Success 200 {object} repository.ScriptInterpreter // @Success 200 {object} repository.ScriptInterpreter
// @Security Bearer // @Security Bearer
// @Router /scripts/interpreters/:id [put] // @Router /scripts/interpreters/:id [put]
func (self *ScriptHandlers) UpdateInterpreter(c *gin.Context) { func (h *ScriptHandlers) UpdateInterpreter(c *gin.Context) {
id, err := strconv.ParseInt(c.Param("id"), 10, 64) id, err := strconv.ParseInt(c.Param("id"), 10, 64)
if err != nil { if err != nil {
c.Error(err) c.Error(err)
@@ -187,7 +187,7 @@ func (self *ScriptHandlers) UpdateInterpreter(c *gin.Context) {
return return
} }
si, err := self.svc.Update(c.Request.Context(), id, in) si, err := h.svc.Update(c.Request.Context(), id, in)
if err != nil { if err != nil {
c.Error(err) c.Error(err)
return return
@@ -203,14 +203,14 @@ func (self *ScriptHandlers) UpdateInterpreter(c *gin.Context) {
// @Success 204 // @Success 204
// @Security Bearer // @Security Bearer
// @Router /scripts/interpreters/:id [delete] // @Router /scripts/interpreters/:id [delete]
func (self *ScriptHandlers) DeleteInterpreter(c *gin.Context) { func (h *ScriptHandlers) DeleteInterpreter(c *gin.Context) {
id, err := strconv.ParseInt(c.Param("id"), 10, 64) id, err := strconv.ParseInt(c.Param("id"), 10, 64)
if err != nil { if err != nil {
c.Error(err) c.Error(err)
return return
} }
if err := self.svc.Delete(c.Request.Context(), id); err != nil { if err := h.svc.Delete(c.Request.Context(), id); err != nil {
c.Error(err) c.Error(err)
return return
} }
@@ -5,6 +5,7 @@ import (
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"fmt" "fmt"
"time"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/models" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/models"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/storage" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/storage"
@@ -103,3 +104,29 @@ func (r *JobRepository) GetJobByID(ctx context.Context, jid int64) (models.Job,
job.Stdin = stdinVal job.Stdin = stdinVal
return job, nil return job, nil
} }
type JobMetrics struct {
Total int
Success int
Failed int
Pending int
}
// GetJobMetrics returns job success metrics for jobs updated since the given time.
// A successful job has status == 0, failed has status != 0, pending has status == 0 with empty stdout/stderr.
func (r *JobRepository) GetJobMetrics(ctx context.Context, since time.Time) (JobMetrics, error) {
var m JobMetrics
err := r.DB.QueryRowContext(ctx,
`SELECT
COUNT(*),
SUM(CASE WHEN status = 0 AND (stdout != '' OR stderr != '') THEN 1 ELSE 0 END),
SUM(CASE WHEN status != 0 THEN 1 ELSE 0 END),
SUM(CASE WHEN status = 0 AND stdout = '' AND stderr = '' THEN 1 ELSE 0 END)
FROM jobs WHERE updated_at >= ?`,
since,
).Scan(&m.Total, &m.Success, &m.Failed, &m.Pending)
if err != nil {
return JobMetrics{}, err
}
return m, nil
}