diff --git a/backend/internal/grpcsrv/commander/commander.go b/backend/internal/grpcsrv/commander/commander.go index 02befa3..ba0cec0 100644 --- a/backend/internal/grpcsrv/commander/commander.go +++ b/backend/internal/grpcsrv/commander/commander.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "sync" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/models" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto/proto" @@ -15,6 +16,7 @@ import ( type Commander struct { proto.UnimplementedCommanderServer agents map[string]Agent + mu sync.RWMutex jobber Jobber } @@ -52,11 +54,15 @@ type Job struct { } func (self *Commander) GetAgent(aid string) (agent Agent, ok bool) { + self.mu.RLock() + defer self.mu.RUnlock() agent, ok = self.agents[aid] return } func (self *Commander) Agents() []Agent { + self.mu.RLock() + defer self.mu.RUnlock() result := make([]Agent, 0, len(self.agents)) for _, a := range self.agents { result = append(result, a) @@ -64,6 +70,12 @@ func (self *Commander) Agents() []Agent { return result } +func (self *Commander) removeAgent(aid string) { + self.mu.Lock() + defer self.mu.Unlock() + delete(self.agents, aid) +} + func (self *Agent) AddJob(job models.JobForInsert) (int64, error) { jid, err := self.jobber.InitJob(self.ctx, self.aid, job) if err != nil { @@ -100,7 +112,11 @@ func (self *Commander) Stream(bidi grpc.BidiStreamingServer[proto.FinishedComman } agent := newAgent(bidi, self.jobber, aid, label) + self.mu.Lock() self.agents[aid] = agent + self.mu.Unlock() + + defer self.removeAgent(aid) return agent.run() } @@ -124,6 +140,9 @@ func (self *Agent) recv() error { Status: msg.Status, }) }() + if err == io.EOF { + return nil + } // TODO: that would blow up at some point out := self.jobs[job.ID].out out <- JobOut{