package commander import ( "context" "fmt" "io" "log" "sync" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/models" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto/proto" "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) type Commander struct { proto.UnimplementedCommanderServer agents map[string]Agent mu sync.RWMutex jobber Jobber } type Jobber interface { InitJob(ctx context.Context, agentID string, job models.JobForInsert) (int64, error) UpdateJobInDB(ctx context.Context, jid int64, msg models.JobForUpdate) (models.Job, error) } func New(jobber Jobber) *Commander { return &Commander{ agents: make(map[string]Agent), jobber: jobber, } } type Agent struct { bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command] in chan *proto.Command jobs map[int64]Job jobber Jobber ctx context.Context aid string Token string // agent id Label string Services []string } type JobOut struct { fc models.Job err error } type Job struct { out chan JobOut } func (self *Commander) GetAgent(aid string) (agent Agent, ok bool) { self.mu.RLock() defer self.mu.RUnlock() agent, ok = self.agents[aid] return } func (self *Commander) Agents() []Agent { self.mu.RLock() defer self.mu.RUnlock() result := make([]Agent, 0, len(self.agents)) for _, a := range self.agents { result = append(result, a) } return result } func (self *Commander) removeAgent(aid string) { self.mu.Lock() defer self.mu.Unlock() delete(self.agents, aid) } func (self *Agent) AddJob(job models.JobForInsert) (int64, error) { log.Printf("[DEBUG] AddJob: agent=%s, command=%v", self.aid, job.Command) jid, err := self.jobber.InitJob(self.ctx, self.aid, job) if err != nil { log.Printf("[DEBUG] AddJob: InitJob failed: %v", err) return 0, err } log.Printf("[DEBUG] AddJob: InitJob returned jid=%d, sending to self.in channel", jid) self.in <- &proto.Command{ Id: jid, Command: job.Command, Stdin: job.Stdin, } log.Printf("[DEBUG] AddJob: sent to self.in channel successfully") return jid, err } func (self *Agent) WaitJob(jid int64) (*models.Job, error) { log.Printf("[DEBUG] WaitJob: agent=%s, jid=%d, waiting on self.jobs[%d].out", self.aid, jid, jid) result := <-self.jobs[jid].out log.Printf("[DEBUG] WaitJob: agent=%s, jid=%d, received result", self.aid, jid) return &result.fc, result.err } func (self *Commander) Stream(bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command]) error { md, ok := metadata.FromIncomingContext(bidi.Context()) if !ok { return fmt.Errorf("no metadata in context") } aidVals := md["agentid"] if len(aidVals) == 0 { return fmt.Errorf("agentid metadata missing") } aid := aidVals[0] var label string labelVals := md["label"] if len(labelVals) > 0 { label = labelVals[0] } agent := newAgent(bidi, self.jobber, aid, label) self.mu.Lock() self.agents[aid] = agent self.mu.Unlock() defer self.removeAgent(aid) return agent.run() } func (self *Agent) run() error { wg := new(errgroup.Group) wg.Go(self.recv) wg.Go(self.send) return wg.Wait() } func (self *Agent) recv() error { for { job, err := func() (job models.Job, err error) { msg, err := self.bidi.Recv() if err != nil { return } log.Printf("[DEBUG] recv: agent=%s, received finished job id=%d", self.aid, msg.Id) return self.jobber.UpdateJobInDB(self.ctx, msg.Id, models.JobForUpdate{ Stdout: msg.Stdout, Stderr: msg.Stderr, Status: msg.Status, }) }() if err == io.EOF { log.Printf("[DEBUG] recv: agent=%s, EOF received", self.aid) return nil } if err != nil { log.Printf("[DEBUG] recv: agent=%s, error: %v", self.aid, err) } out := self.jobs[job.ID].out out <- JobOut{ fc: job, err: err, } close(out) log.Printf("[DEBUG] recv: agent=%s, sent result for job id=%d", self.aid, job.ID) } } func (self *Agent) send() error { for job := range self.in { log.Printf("[DEBUG] send: agent=%s, job id=%d, command=%v", self.aid, job.Id, job.Command) self.jobs[job.Id] = newJob() if err := self.bidi.Send(job); err != nil { log.Printf("[DEBUG] send: agent=%s, failed to send job id=%d: %v", self.aid, job.Id, err) return err } log.Printf("[DEBUG] send: agent=%s, sent job id=%d to agent", self.aid, job.Id) } log.Printf("[DEBUG] send: agent=%s, self.in channel closed", self.aid) return io.EOF // self.jobs[] } func newAgent(bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command], jobber Jobber, aid string, label string) Agent { return Agent{ bidi: bidi, in: make(chan *proto.Command), jobs: make(map[int64]Job), jobber: jobber, ctx: bidi.Context(), aid: aid, Label: label, Token: aid, } } func newJob() Job { return Job{make(chan JobOut, 1)} }