Files
HellreigN/backend/internal/grpcsrv/commander/commander.go
T
2026-04-04 17:33:36 +03:00

199 lines
4.7 KiB
Go

package commander
import (
"context"
"fmt"
"io"
"log"
"sync"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/models"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto/proto"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
type Commander struct {
proto.UnimplementedCommanderServer
agents map[string]Agent
mu sync.RWMutex
jobber Jobber
}
type Jobber interface {
InitJob(ctx context.Context, agentID string, job models.JobForInsert) (int64, error)
UpdateJobInDB(ctx context.Context, jid int64, msg models.JobForUpdate) (models.Job, error)
}
func New(jobber Jobber) *Commander {
return &Commander{
agents: make(map[string]Agent),
jobber: jobber,
}
}
type Agent struct {
bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command]
in chan *proto.Command
jobs map[int64]Job
jobber Jobber
ctx context.Context
aid string
Token string // agent id
Label string
Services []string
}
type JobOut struct {
fc models.Job
err error
}
type Job struct {
out chan JobOut
}
func (self *Commander) GetAgent(aid string) (agent Agent, ok bool) {
self.mu.RLock()
defer self.mu.RUnlock()
agent, ok = self.agents[aid]
return
}
func (self *Commander) Agents() []Agent {
self.mu.RLock()
defer self.mu.RUnlock()
result := make([]Agent, 0, len(self.agents))
for _, a := range self.agents {
result = append(result, a)
}
return result
}
func (self *Commander) removeAgent(aid string) {
self.mu.Lock()
defer self.mu.Unlock()
delete(self.agents, aid)
}
func (self *Agent) AddJob(job models.JobForInsert) (int64, error) {
log.Printf("[DEBUG] AddJob: agent=%s, command=%v", self.aid, job.Command)
jid, err := self.jobber.InitJob(self.ctx, self.aid, job)
if err != nil {
log.Printf("[DEBUG] AddJob: InitJob failed: %v", err)
return 0, err
}
log.Printf("[DEBUG] AddJob: InitJob returned jid=%d, sending to self.in channel", jid)
self.in <- &proto.Command{
Id: jid,
Command: job.Command,
Stdin: job.Stdin,
}
log.Printf("[DEBUG] AddJob: sent to self.in channel successfully")
return jid, err
}
func (self *Agent) WaitJob(jid int64) (*models.Job, error) {
log.Printf("[DEBUG] WaitJob: agent=%s, jid=%d, waiting on self.jobs[%d].out", self.aid, jid, jid)
result := <-self.jobs[jid].out
log.Printf("[DEBUG] WaitJob: agent=%s, jid=%d, received result", self.aid, jid)
return &result.fc, result.err
}
func (self *Commander) Stream(bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command]) error {
md, ok := metadata.FromIncomingContext(bidi.Context())
if !ok {
return fmt.Errorf("no metadata in context")
}
aidVals := md["agentid"]
if len(aidVals) == 0 {
return fmt.Errorf("agentid metadata missing")
}
aid := aidVals[0]
var label string
labelVals := md["label"]
if len(labelVals) > 0 {
label = labelVals[0]
}
agent := newAgent(bidi, self.jobber, aid, label)
self.mu.Lock()
self.agents[aid] = agent
self.mu.Unlock()
defer self.removeAgent(aid)
return agent.run()
}
func (self *Agent) run() error {
wg := new(errgroup.Group)
wg.Go(self.recv)
wg.Go(self.send)
return wg.Wait()
}
func (self *Agent) recv() error {
for {
job, err := func() (job models.Job, err error) {
msg, err := self.bidi.Recv()
if err != nil {
return
}
log.Printf("[DEBUG] recv: agent=%s, received finished job id=%d", self.aid, msg.Id)
return self.jobber.UpdateJobInDB(self.ctx, msg.Id, models.JobForUpdate{
Stdout: msg.Stdout,
Stderr: msg.Stderr,
Status: msg.Status,
})
}()
if err == io.EOF {
log.Printf("[DEBUG] recv: agent=%s, EOF received", self.aid)
return nil
}
if err != nil {
log.Printf("[DEBUG] recv: agent=%s, error: %v", self.aid, err)
}
out := self.jobs[job.ID].out
out <- JobOut{
fc: job,
err: err,
}
close(out)
log.Printf("[DEBUG] recv: agent=%s, sent result for job id=%d", self.aid, job.ID)
}
}
func (self *Agent) send() error {
for job := range self.in {
log.Printf("[DEBUG] send: agent=%s, job id=%d, command=%v", self.aid, job.Id, job.Command)
self.jobs[job.Id] = newJob()
if err := self.bidi.Send(job); err != nil {
log.Printf("[DEBUG] send: agent=%s, failed to send job id=%d: %v", self.aid, job.Id, err)
return err
}
log.Printf("[DEBUG] send: agent=%s, sent job id=%d to agent", self.aid, job.Id)
}
log.Printf("[DEBUG] send: agent=%s, self.in channel closed", self.aid)
return io.EOF
// self.jobs[]
}
func newAgent(bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command], jobber Jobber, aid string, label string) Agent {
return Agent{
bidi: bidi,
in: make(chan *proto.Command),
jobs: make(map[int64]Job),
jobber: jobber,
ctx: bidi.Context(),
aid: aid,
Label: label,
Token: aid,
}
}
func newJob() Job {
return Job{make(chan JobOut, 1)}
}