feat(backend): implement grpc commander, add job dispatcher
This commit is contained in:
@@ -0,0 +1,130 @@
|
||||
package commander
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/models"
|
||||
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto/proto"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
type Commander struct {
|
||||
proto.UnimplementedCommanderServer
|
||||
agents map[string]Agent
|
||||
}
|
||||
|
||||
type Agent struct {
|
||||
bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command]
|
||||
in chan *proto.Command
|
||||
jobs map[int64]Job
|
||||
jobber interface {
|
||||
InitJob(ctx context.Context) (int64, error)
|
||||
UpdateJobInDB(ctx context.Context, jid int64, msg models.JobForUpdate) (models.Job, error)
|
||||
}
|
||||
ctx context.Context
|
||||
}
|
||||
type JobOut struct {
|
||||
fc models.Job
|
||||
err error
|
||||
}
|
||||
|
||||
type Job struct {
|
||||
out chan JobOut
|
||||
}
|
||||
|
||||
func (self *Commander) GetAgent(aid string) (agent Agent, ok bool) {
|
||||
agent, ok = self.agents[aid]
|
||||
return
|
||||
}
|
||||
|
||||
func (self *Agent) AddJob(job models.JobForInsert) (int64, error) {
|
||||
jid, err := self.jobber.InitJob(self.ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
self.in <- &proto.Command{
|
||||
Id: 0,
|
||||
Command: []string{},
|
||||
Stdin: new(string),
|
||||
}
|
||||
return jid, err
|
||||
}
|
||||
|
||||
func (self *Agent) WaitJob(jid int64) (*models.Job, error) {
|
||||
result := <-self.jobs[jid].out
|
||||
return &result.fc, result.err
|
||||
}
|
||||
|
||||
func (self *Commander) Stream(bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command]) error {
|
||||
md, ok := metadata.FromIncomingContext(bidi.Context())
|
||||
if !ok {
|
||||
return fmt.Errorf("no metadata in context")
|
||||
}
|
||||
aidVals := md["agentid"]
|
||||
if len(aidVals) == 0 {
|
||||
return fmt.Errorf("agentid metadata missing")
|
||||
}
|
||||
aid := aidVals[0]
|
||||
agent := newAgent(bidi)
|
||||
self.agents[aid] = agent
|
||||
return agent.run()
|
||||
}
|
||||
|
||||
func (self *Agent) run() error {
|
||||
wg := new(errgroup.Group)
|
||||
wg.Go(self.recv)
|
||||
wg.Go(self.send)
|
||||
return wg.Wait()
|
||||
}
|
||||
|
||||
func (self *Agent) recv() error {
|
||||
for {
|
||||
job, err := func() (job models.Job, err error) {
|
||||
msg, err := self.bidi.Recv()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return self.jobber.UpdateJobInDB(self.ctx, msg.Id, models.JobForUpdate{
|
||||
Stdout: msg.Stdout,
|
||||
Stderr: msg.Stderr,
|
||||
Status: msg.Status,
|
||||
})
|
||||
}()
|
||||
// TODO: that would blow up at some point
|
||||
out := self.jobs[job.ID].out
|
||||
out <- JobOut{
|
||||
fc: job,
|
||||
err: err,
|
||||
}
|
||||
close(out)
|
||||
}
|
||||
}
|
||||
|
||||
func (self *Agent) send() error {
|
||||
for job := range self.in {
|
||||
self.jobs[job.Id] = newJob()
|
||||
if err := self.bidi.Send(job); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return io.EOF
|
||||
// self.jobs[]
|
||||
}
|
||||
|
||||
func newAgent(bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command]) Agent {
|
||||
return Agent{
|
||||
bidi,
|
||||
make(chan *proto.Command),
|
||||
make(map[int64]Job),
|
||||
nil,
|
||||
bidi.Context(),
|
||||
}
|
||||
}
|
||||
|
||||
func newJob() Job {
|
||||
return Job{make(chan JobOut, 1)}
|
||||
}
|
||||
Reference in New Issue
Block a user