package commander import ( "context" "fmt" "io" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/models" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto/proto" "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) type Commander struct { proto.UnimplementedCommanderServer agents map[string]Agent } func New() *Commander { return &Commander{ agents: make(map[string]Agent), } } type Agent struct { bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command] in chan *proto.Command jobs map[int64]Job jobber interface { InitJob(ctx context.Context) (int64, error) UpdateJobInDB(ctx context.Context, jid int64, msg models.JobForUpdate) (models.Job, error) } ctx context.Context } type JobOut struct { fc models.Job err error } type Job struct { out chan JobOut } func (self *Commander) GetAgent(aid string) (agent Agent, ok bool) { agent, ok = self.agents[aid] return } func (self *Agent) AddJob(job models.JobForInsert) (int64, error) { jid, err := self.jobber.InitJob(self.ctx) if err != nil { return 0, err } self.in <- &proto.Command{ Id: 0, Command: []string{}, Stdin: new(string), } return jid, err } func (self *Agent) WaitJob(jid int64) (*models.Job, error) { result := <-self.jobs[jid].out return &result.fc, result.err } func (self *Commander) Stream(bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command]) error { md, ok := metadata.FromIncomingContext(bidi.Context()) if !ok { return fmt.Errorf("no metadata in context") } aidVals := md["agentid"] if len(aidVals) == 0 { return fmt.Errorf("agentid metadata missing") } aid := aidVals[0] agent := newAgent(bidi) self.agents[aid] = agent return agent.run() } func (self *Agent) run() error { wg := new(errgroup.Group) wg.Go(self.recv) wg.Go(self.send) return wg.Wait() } func (self *Agent) recv() error { for { job, err := func() (job models.Job, err error) { msg, err := self.bidi.Recv() if err != nil { return } return self.jobber.UpdateJobInDB(self.ctx, msg.Id, models.JobForUpdate{ Stdout: msg.Stdout, Stderr: msg.Stderr, Status: msg.Status, }) }() // TODO: that would blow up at some point out := self.jobs[job.ID].out out <- JobOut{ fc: job, err: err, } close(out) } } func (self *Agent) send() error { for job := range self.in { self.jobs[job.Id] = newJob() if err := self.bidi.Send(job); err != nil { return err } } return io.EOF // self.jobs[] } func newAgent(bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command]) Agent { return Agent{ bidi, make(chan *proto.Command), make(map[int64]Job), nil, bidi.Context(), } } func newJob() Job { return Job{make(chan JobOut, 1)} }