feat: big ahh commit

- agent+proto+backend: transfer service status
- agent: fix returning empty message on nonzero exit status
- backend: refactor collector+commander and handlers dependent on them: implement agent accounting via grpc stats handler
This commit is contained in:
2026-04-04 22:55:39 +03:00
parent b516a54c17
commit 7be99f8e91
12 changed files with 541 additions and 190 deletions
+12 -41
View File
@@ -4,7 +4,6 @@ import (
"fmt"
"io"
"log"
"sync"
"time"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/repository"
@@ -13,26 +12,19 @@ import (
"google.golang.org/grpc/metadata"
)
// Collector handles log streaming from connected agents.
type Collector struct {
proto.UnimplementedCollectorServer
logRepo *repository.LogRepository
agents map[string]*Agent
mu sync.RWMutex
tracker *ConnTracker
batchSize int
flushInterval time.Duration
}
type Agent struct {
ID string
Label string
Services []string
ConnectedAt time.Time
}
func New(logRepo *repository.LogRepository) *Collector {
func New(logRepo *repository.LogRepository, tracker *ConnTracker) *Collector {
return &Collector{
logRepo: logRepo,
agents: make(map[string]*Agent),
tracker: tracker,
batchSize: 100,
flushInterval: 2 * time.Second,
}
@@ -56,27 +48,15 @@ func (c *Collector) Stream(stream proto.Collector_StreamServer) error {
}
service := serviceVals[0]
servicesVals := md["services"]
var services []string
if len(servicesVals) > 0 {
services = servicesVals
}
// Register agent
c.mu.Lock()
c.agents[agentName] = &Agent{
agent := &Agent{
ID: agentName,
Label: agentName,
Services: services,
Services: make([]Service, 0),
ConnectedAt: time.Now(),
}
c.mu.Unlock()
defer func() {
c.mu.Lock()
delete(c.agents, agentName)
c.mu.Unlock()
}()
c.tracker.Register(agent)
defer c.tracker.Unregister(agent.ID)
log.Printf("Agent %s connected, streaming logs for service: %s", agentName, service)
@@ -139,7 +119,6 @@ func (c *Collector) Stream(stream proto.Collector_StreamServer) error {
for {
select {
case <-stream.Context().Done():
// Context cancelled, flush remaining
_ = flush()
return stream.Context().Err()
case <-ticker.C:
@@ -162,7 +141,6 @@ func (c *Collector) Stream(stream proto.Collector_StreamServer) error {
}
case err := <-errCh:
if err == io.EOF {
// Client closed stream
return flush()
}
return fmt.Errorf("failed to receive: %w", err)
@@ -170,19 +148,12 @@ func (c *Collector) Stream(stream proto.Collector_StreamServer) error {
}
}
// GetAgent delegates to the tracker.
func (c *Collector) GetAgent(name string) (*Agent, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
a, ok := c.agents[name]
return a, ok
return c.tracker.GetAgent(name)
}
// Agents delegates to the tracker.
func (c *Collector) Agents() []*Agent {
c.mu.RLock()
defer c.mu.RUnlock()
result := make([]*Agent, 0, len(c.agents))
for _, a := range c.agents {
result = append(result, a)
}
return result
return c.tracker.Agents()
}
@@ -0,0 +1,38 @@
package collector
import (
"context"
"fmt"
"log"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto/proto"
"google.golang.org/grpc/metadata"
)
// ReportServices handles a unary service status update from an agent.
// Agents send their current services list, which is stored in the collector.
func (c *Collector) ReportServices(ctx context.Context, req *proto.ServicesUpdate) (*proto.ServicesUpdateResp, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, fmt.Errorf("no metadata in context")
}
whoamiVals := md["whoami"]
if len(whoamiVals) == 0 {
return nil, fmt.Errorf("whoami metadata missing")
}
agentName := whoamiVals[0]
services := make([]Service, 0, len(req.Services))
for _, s := range req.Services {
services = append(services, Service{s.Name, s.Status})
}
if ok := c.tracker.UpdateServices(agentName, services); ok {
log.Printf("Updated services for agent %s: %v", agentName, services)
} else {
log.Printf("Warning: received services update for unknown agent %s", agentName)
}
return &proto.ServicesUpdateResp{}, nil
}
@@ -0,0 +1,111 @@
package collector
import (
"context"
"log"
"sync"
"time"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
)
// ConnTracker tracks connected Collector agents and handles cleanup on disconnect.
// It implements grpc.StatsHandler for disconnect detection.
type ConnTracker struct {
mu sync.RWMutex
agents map[string]*Agent
}
func NewConnTracker() *ConnTracker {
return &ConnTracker{
agents: make(map[string]*Agent),
}
}
// Register adds an agent to the tracker. Called by Collector.Stream().
func (t *ConnTracker) Register(agent *Agent) {
t.mu.Lock()
t.agents[agent.ID] = agent
t.mu.Unlock()
log.Printf("[collector] agent registered: %s", agent.ID)
}
// Unregister removes an agent from the tracker.
func (t *ConnTracker) Unregister(id string) {
t.mu.Lock()
delete(t.agents, id)
t.mu.Unlock()
log.Printf("[collector] agent unregistered: %s", id)
}
// GetAgent returns the agent for the given ID.
func (t *ConnTracker) GetAgent(id string) (*Agent, bool) {
t.mu.RLock()
defer t.mu.RUnlock()
a, ok := t.agents[id]
return a, ok
}
// Agents returns all connected agents.
func (t *ConnTracker) Agents() []*Agent {
t.mu.RLock()
defer t.mu.RUnlock()
result := make([]*Agent, 0, len(t.agents))
for _, a := range t.agents {
result = append(result, a)
}
return result
}
// grpc.StatsHandler implementation.
func (t *ConnTracker) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
return ctx
}
func (t *ConnTracker) HandleRPC(ctx context.Context, _ stats.RPCStats) {}
func (t *ConnTracker) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}
func (t *ConnTracker) HandleConn(ctx context.Context, s stats.ConnStats) {
switch s.(type) {
case *stats.ConnEnd:
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return
}
whoamiVals := md["whoami"]
if len(whoamiVals) == 0 {
return
}
t.Unregister(whoamiVals[0])
}
}
// UpdateServices updates the services list for the given agent.
func (t *ConnTracker) UpdateServices(id string, services []Service) bool {
t.mu.Lock()
defer t.mu.Unlock()
agent, ok := t.agents[id]
if !ok {
return false
}
agent.Services = services
return true
}
// Service represents a named service with its current status.
type Service struct {
Name, Status string
}
// Agent represents a connected agent streaming logs to the collector.
type Agent struct {
ID string
Label string
Services []Service
ConnectedAt time.Time
}
+122 -67
View File
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"log"
"sync"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/models"
@@ -11,27 +12,30 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
)
// Commander handles command execution on connected agents.
type Commander struct {
proto.UnimplementedCommanderServer
agents map[string]Agent
mu sync.RWMutex
jobber Jobber
tracker *ConnTracker
jobber Jobber
}
// Jobber persists job state.
type Jobber interface {
InitJob(ctx context.Context, agentID string, job models.JobForInsert) (int64, error)
UpdateJobInDB(ctx context.Context, jid int64, msg models.JobForUpdate) (models.Job, error)
}
func New(jobber Jobber) *Commander {
func New(jobber Jobber, tracker *ConnTracker) *Commander {
return &Commander{
agents: make(map[string]Agent),
jobber: jobber,
jobber: jobber,
tracker: tracker,
}
}
// Agent represents a connected agent with an active bidirectional stream.
type Agent struct {
bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command]
in chan *proto.Command
@@ -40,10 +44,11 @@ type Agent struct {
ctx context.Context
aid string
Token string // agent id
Token string
Label string
Services []string
}
type JobOut struct {
fc models.Job
err error
@@ -53,61 +58,91 @@ type Job struct {
out chan JobOut
}
func (self *Commander) GetAgent(aid string) (agent Agent, ok bool) {
self.mu.RLock()
defer self.mu.RUnlock()
agent, ok = self.agents[aid]
return
// ConnTracker tracks connected agents and handles cleanup on disconnect.
// It implements grpc.StatsHandler for disconnect detection.
type ConnTracker struct {
mu sync.RWMutex
agents map[string]*Agent
}
// GetAgentByLabel searches for an agent by its human-readable label.
func (self *Commander) GetAgentByLabel(label string) (agent Agent, ok bool) {
func (self *ConnTracker) GetAgentByLabel(label string) (agent Agent, ok bool) {
self.mu.RLock()
defer self.mu.RUnlock()
for _, a := range self.agents {
if a.Label == label {
return a, true
return *a, true
}
}
return
}
func (self *Commander) Agents() []Agent {
self.mu.RLock()
defer self.mu.RUnlock()
result := make([]Agent, 0, len(self.agents))
for _, a := range self.agents {
func NewConnTracker() *ConnTracker {
return &ConnTracker{
agents: make(map[string]*Agent),
}
}
func (t *ConnTracker) Register(aid string, agent *Agent) {
t.mu.Lock()
t.agents[aid] = agent
t.mu.Unlock()
log.Printf("[conntracker] agent registered: %s", aid)
}
func (t *ConnTracker) Unregister(aid string) {
t.mu.Lock()
delete(t.agents, aid)
t.mu.Unlock()
log.Printf("[conntracker] agent unregistered: %s", aid)
}
func (t *ConnTracker) GetAgent(aid string) (*Agent, bool) {
t.mu.RLock()
defer t.mu.RUnlock()
a, ok := t.agents[aid]
return a, ok
}
func (t *ConnTracker) Agents() []*Agent {
t.mu.RLock()
defer t.mu.RUnlock()
result := make([]*Agent, 0, len(t.agents))
for _, a := range t.agents {
result = append(result, a)
}
return result
}
func (self *Commander) removeAgent(aid string) {
self.mu.Lock()
defer self.mu.Unlock()
delete(self.agents, aid)
// grpc.StatsHandler implementation.
func (t *ConnTracker) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
return ctx
}
func (self *Agent) AddJob(job models.JobForInsert) (int64, error) {
jid, err := self.jobber.InitJob(self.ctx, self.aid, job)
if err != nil {
return 0, err
func (t *ConnTracker) HandleRPC(ctx context.Context, _ stats.RPCStats) {}
func (t *ConnTracker) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}
func (t *ConnTracker) HandleConn(ctx context.Context, s stats.ConnStats) {
switch s.(type) {
case *stats.ConnEnd:
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return
}
aidVals := md["agentid"]
if len(aidVals) == 0 {
return
}
t.Unregister(aidVals[0])
}
self.jobs[jid] = newJob()
self.in <- &proto.Command{
Id: jid,
Command: job.Command,
Stdin: job.Stdin,
}
return jid, err
}
func (self *Agent) WaitJob(jid int64) (*models.Job, error) {
result := <-self.jobs[jid].out
return &result.fc, result.err
}
func (self *Commander) Stream(
// Stream handles a new agent connection and runs the send/recv loops.
func (c *Commander) Stream(
bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command],
) error {
md, ok := metadata.FromIncomingContext(bidi.Context())
@@ -121,35 +156,58 @@ func (self *Commander) Stream(
aid := aidVals[0]
var label string
labelVals := md["label"]
if len(labelVals) > 0 {
label = labelVals[0]
if vals := md["label"]; len(vals) > 0 {
label = vals[0]
}
agent := newAgent(bidi, self.jobber, aid, label)
self.mu.Lock()
self.agents[aid] = agent
self.mu.Unlock()
agent := NewAgent(bidi.Context(), c.jobber, aid, label)
agent.bidi = bidi
c.tracker.Register(aid, agent)
defer c.tracker.Unregister(aid)
defer self.removeAgent(aid)
return agent.run()
}
func (self *Agent) run() error {
// GetAgent returns the agent by ID. Delegates to the tracker.
func (c *Commander) GetAgent(aid string) (*Agent, bool) {
return c.tracker.GetAgent(aid)
}
func (a *Agent) AddJob(job models.JobForInsert) (int64, error) {
jid, err := a.jobber.InitJob(a.ctx, a.aid, job)
if err != nil {
return 0, err
}
a.jobs[jid] = newJob()
a.in <- &proto.Command{
Id: jid,
Command: job.Command,
Stdin: job.Stdin,
}
return jid, nil
}
func (a *Agent) WaitJob(jid int64) (*models.Job, error) {
result := <-a.jobs[jid].out
return &result.fc, result.err
}
func (a *Agent) run() error {
wg := new(errgroup.Group)
wg.Go(self.recv)
wg.Go(self.send)
wg.Go(a.recv)
wg.Go(a.send)
return wg.Wait()
}
func (self *Agent) recv() error {
func (a *Agent) recv() error {
for {
job, err := func() (job models.Job, err error) {
msg, err := self.bidi.Recv()
msg, err := a.bidi.Recv()
if err != nil {
return
}
return self.jobber.UpdateJobInDB(self.ctx, msg.Id, models.JobForUpdate{
return a.jobber.UpdateJobInDB(a.ctx, msg.Id, models.JobForUpdate{
Stdout: msg.Stdout,
Stderr: msg.Stderr,
Status: msg.Status,
@@ -158,8 +216,7 @@ func (self *Agent) recv() error {
if err == io.EOF {
return nil
}
// TODO: that would blow up at some point
out := self.jobs[job.ID].out
out := a.jobs[job.ID].out
out <- JobOut{
fc: job,
err: err,
@@ -168,28 +225,26 @@ func (self *Agent) recv() error {
}
}
func (self *Agent) send() error {
for job := range self.in {
if err := self.bidi.Send(job); err != nil {
func (a *Agent) send() error {
for job := range a.in {
if err := a.bidi.Send(job); err != nil {
return err
}
}
return io.EOF
// self.jobs[]
}
func newAgent(
bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command],
func NewAgent(
ctx context.Context,
jobber Jobber,
aid string,
label string,
) Agent {
return Agent{
bidi: bidi,
in: make(chan *proto.Command),
) *Agent {
return &Agent{
in: make(chan *proto.Command, 10),
jobs: make(map[int64]Job),
jobber: jobber,
ctx: bidi.Context(),
ctx: ctx,
aid: aid,
Label: label,
Token: aid,