fix(backend/commander): synchronize; add cleanup
ci-agent / build (push) Failing after 1m33s

This commit is contained in:
2026-04-04 05:33:49 +03:00
parent 24cc11bc8d
commit f1fc52bd6b
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"sync"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/models"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto/proto"
@@ -15,6 +16,7 @@ import (
type Commander struct {
proto.UnimplementedCommanderServer
agents map[string]Agent
mu sync.RWMutex
jobber Jobber
}
@@ -52,11 +54,15 @@ type Job struct {
}
func (self *Commander) GetAgent(aid string) (agent Agent, ok bool) {
self.mu.RLock()
defer self.mu.RUnlock()
agent, ok = self.agents[aid]
return
}
func (self *Commander) Agents() []Agent {
self.mu.RLock()
defer self.mu.RUnlock()
result := make([]Agent, 0, len(self.agents))
for _, a := range self.agents {
result = append(result, a)
@@ -64,6 +70,12 @@ func (self *Commander) Agents() []Agent {
return result
}
func (self *Commander) removeAgent(aid string) {
self.mu.Lock()
defer self.mu.Unlock()
delete(self.agents, aid)
}
func (self *Agent) AddJob(job models.JobForInsert) (int64, error) {
jid, err := self.jobber.InitJob(self.ctx, self.aid, job)
if err != nil {
@@ -100,7 +112,11 @@ func (self *Commander) Stream(bidi grpc.BidiStreamingServer[proto.FinishedComman
}
agent := newAgent(bidi, self.jobber, aid, label)
self.mu.Lock()
self.agents[aid] = agent
self.mu.Unlock()
defer self.removeAgent(aid)
return agent.run()
}
@@ -124,6 +140,9 @@ func (self *Agent) recv() error {
Status: msg.Status,
})
}()
if err == io.EOF {
return nil
}
// TODO: that would blow up at some point
out := self.jobs[job.ID].out
out <- JobOut{