diff --git a/backend/go.mod b/backend/go.mod index a3f5887..e73c1ce 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -3,11 +3,14 @@ module gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend go 1.26.1 require ( + gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto v0.0.0-20260403210401-a6212c89fc0e github.com/ClickHouse/clickhouse-go/v2 v2.44.0 github.com/gin-gonic/gin v1.12.0 github.com/swaggo/files v1.0.1 github.com/swaggo/gin-swagger v1.6.1 github.com/swaggo/swag v1.16.6 + golang.org/x/sync v0.20.0 + google.golang.org/grpc v1.80.0 gopkg.in/yaml.v3 v3.0.1 modernc.org/sqlite v1.48.1 ) @@ -68,10 +71,10 @@ require ( golang.org/x/crypto v0.49.0 // indirect golang.org/x/mod v0.34.0 // indirect golang.org/x/net v0.52.0 // indirect - golang.org/x/sync v0.20.0 // indirect golang.org/x/sys v0.42.0 // indirect golang.org/x/text v0.35.0 // indirect golang.org/x/tools v0.43.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 // indirect google.golang.org/protobuf v1.36.11 // indirect modernc.org/libc v1.70.0 // indirect modernc.org/mathutil v1.7.1 // indirect diff --git a/backend/go.sum b/backend/go.sum index 1372e6b..e78bda7 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -1,3 +1,5 @@ +gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto v0.0.0-20260403210401-a6212c89fc0e h1:Z/2Mjc9NU0CWIduj8Wd9ClnZt4dqmQUUXl1VlyGQe6U= +gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto v0.0.0-20260403210401-a6212c89fc0e/go.mod h1:1DByetpOnW2+AjM8ZWbJ1Xfzprus8fBie2AMUP/YHHA= github.com/ClickHouse/ch-go v0.71.0 h1:bUdZ/EZj/LcVHsMqaRUP2holqygrPWQKeMjc6nZoyRM= github.com/ClickHouse/ch-go v0.71.0/go.mod h1:NwbNc+7jaqfY58dmdDUbG4Jl22vThgx1cYjBw0vtgXw= github.com/ClickHouse/clickhouse-go/v2 v2.44.0 h1:9pxs5pRwIvhni5BDRPn/n5A8DeUod5TnBaeulFBX8EQ= @@ -33,6 +35,10 @@ github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw= github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw= github.com/go-faster/errors v0.7.1 h1:MkJTnDoEdi9pDabt1dpWf7AA8/BaSYZqibYyhZ20AYg= github.com/go-faster/errors v0.7.1/go.mod h1:5ySTjWFiphBs07IKuiL69nxdfd5+fzh1u7FPGZP2quo= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-openapi/jsonpointer v0.22.5 h1:8on/0Yp4uTb9f4XvTrM2+1CPrV05QPZXu+rvu2o9jcA= github.com/go-openapi/jsonpointer v0.22.5/go.mod h1:gyUR3sCvGSWchA2sUBJGluYMbe1zazrYWIkWPjjMUY0= github.com/go-openapi/jsonreference v0.21.5 h1:6uCGVXU/aNF13AQNggxfysJ+5ZcU4nEAe+pJyVWRdiE= @@ -74,6 +80,8 @@ github.com/goccy/go-yaml v1.19.2 h1:PmFC1S6h8ljIz6gMRBopkjP1TVT7xuwrButHID66PoM= github.com/goccy/go-yaml v1.19.2/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -171,8 +179,16 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g= go.mongodb.org/mongo-driver/v2 v2.5.0 h1:yXUhImUjjAInNcpTcAlPHiT7bIXhshCTL3jVBkF3xaE= go.mongodb.org/mongo-driver/v2 v2.5.0/go.mod h1:yOI9kBsufol30iFsl1slpdq1I0eHPzybRWdyYUs8K/0= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/otel v1.41.0 h1:YlEwVsGAlCvczDILpUXpIpPSL/VPugt7zHThEMLce1c= go.opentelemetry.io/otel v1.41.0/go.mod h1:Yt4UwgEKeT05QbLwbyHXEwhnjxNO6D8L5PQP51/46dE= +go.opentelemetry.io/otel/metric v1.41.0 h1:rFnDcs4gRzBcsO9tS8LCpgR0dxg4aaxWlJxCno7JlTQ= +go.opentelemetry.io/otel/metric v1.41.0/go.mod h1:xPvCwd9pU0VN8tPZYzDZV/BMj9CM9vs00GuBjeKhJps= +go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18= +go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE= +go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8= +go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= go.opentelemetry.io/otel/trace v1.41.0 h1:Vbk2co6bhj8L59ZJ6/xFTskY+tGAbOnCtQGVVa9TIN0= go.opentelemetry.io/otel/trace v1.41.0/go.mod h1:U1NU4ULCoxeDKc09yCWdWe+3QoyweJcISEVa1RBzOis= go.uber.org/mock v0.6.0 h1:hyF9dfmbgIX5EfOdasqLsWD6xqpNZlXblLB/Dbnwv3Y= @@ -243,6 +259,12 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= +gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 h1:sNrWoksmOyF5bvJUcnmbeAmQi8baNhqg5IWaI3llQqU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= +google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM= +google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= diff --git a/backend/internal/grpcsrv/commander/commander.go b/backend/internal/grpcsrv/commander/commander.go new file mode 100644 index 0000000..1ef35c3 --- /dev/null +++ b/backend/internal/grpcsrv/commander/commander.go @@ -0,0 +1,130 @@ +package commander + +import ( + "context" + "fmt" + "io" + + "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/models" + "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto/proto" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +type Commander struct { + proto.UnimplementedCommanderServer + agents map[string]Agent +} + +type Agent struct { + bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command] + in chan *proto.Command + jobs map[int64]Job + jobber interface { + InitJob(ctx context.Context) (int64, error) + UpdateJobInDB(ctx context.Context, jid int64, msg models.JobForUpdate) (models.Job, error) + } + ctx context.Context +} +type JobOut struct { + fc models.Job + err error +} + +type Job struct { + out chan JobOut +} + +func (self *Commander) GetAgent(aid string) (agent Agent, ok bool) { + agent, ok = self.agents[aid] + return +} + +func (self *Agent) AddJob(job models.JobForInsert) (int64, error) { + jid, err := self.jobber.InitJob(self.ctx) + if err != nil { + return 0, err + } + self.in <- &proto.Command{ + Id: 0, + Command: []string{}, + Stdin: new(string), + } + return jid, err +} + +func (self *Agent) WaitJob(jid int64) (*models.Job, error) { + result := <-self.jobs[jid].out + return &result.fc, result.err +} + +func (self *Commander) Stream(bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command]) error { + md, ok := metadata.FromIncomingContext(bidi.Context()) + if !ok { + return fmt.Errorf("no metadata in context") + } + aidVals := md["agentid"] + if len(aidVals) == 0 { + return fmt.Errorf("agentid metadata missing") + } + aid := aidVals[0] + agent := newAgent(bidi) + self.agents[aid] = agent + return agent.run() +} + +func (self *Agent) run() error { + wg := new(errgroup.Group) + wg.Go(self.recv) + wg.Go(self.send) + return wg.Wait() +} + +func (self *Agent) recv() error { + for { + job, err := func() (job models.Job, err error) { + msg, err := self.bidi.Recv() + if err != nil { + return + } + return self.jobber.UpdateJobInDB(self.ctx, msg.Id, models.JobForUpdate{ + Stdout: msg.Stdout, + Stderr: msg.Stderr, + Status: msg.Status, + }) + }() + // TODO: that would blow up at some point + out := self.jobs[job.ID].out + out <- JobOut{ + fc: job, + err: err, + } + close(out) + } +} + +func (self *Agent) send() error { + for job := range self.in { + self.jobs[job.Id] = newJob() + if err := self.bidi.Send(job); err != nil { + return err + } + } + return io.EOF + // self.jobs[] +} + +func newAgent(bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command]) Agent { + return Agent{ + bidi, + make(chan *proto.Command), + make(map[int64]Job), + nil, + bidi.Context(), + } +} + +func newJob() Job { + return Job{make(chan JobOut, 1)} +} diff --git a/backend/internal/models/job.go b/backend/internal/models/job.go new file mode 100644 index 0000000..b8ab63a --- /dev/null +++ b/backend/internal/models/job.go @@ -0,0 +1,17 @@ +package models + +type Job struct { + ID int64 + + JobForInsert + JobForUpdate +} +type JobForInsert struct { + Command []string + Stdin *string +} +type JobForUpdate struct { + Stdout string + Stderr string + Status int32 +}