From c5e35b4c12b48e82e1a54cb1776e41d9f3b00b0c Mon Sep 17 00:00:00 2001 From: "zero@thinky" Date: Sat, 4 Apr 2026 05:09:16 +0300 Subject: [PATCH] feat(backend): implement job storage; tie everything up --- backend/cmd/main.go | 60 +++++++++---- .../internal/grpcsrv/commander/commander.go | 61 +++++++++---- backend/internal/handlers/agents.go | 17 +++- backend/internal/repository/job_repository.go | 90 +++++++++++++++++++ backend/internal/storage/migrations.go | 14 +++ 5 files changed, 209 insertions(+), 33 deletions(-) create mode 100644 backend/internal/repository/job_repository.go diff --git a/backend/cmd/main.go b/backend/cmd/main.go index 93b950e..a45ab83 100644 --- a/backend/cmd/main.go +++ b/backend/cmd/main.go @@ -16,10 +16,11 @@ import ( "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/storage" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto/proto" "github.com/gin-gonic/gin" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" swaggerFiles "github.com/swaggo/files" ginSwagger "github.com/swaggo/gin-swagger" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" ) // @securityDefinitions.apikey Bearer @@ -44,15 +45,24 @@ func main() { defer db.Close() h := handlers.New(db) - agents := handlers.AgentsGroup{Handlers: h} - auth := handlers.AuthGroup{Handlers: h} - agentReg := handlers.NewAgentRegistrationGroup(h) // Initialize registration tokens table if err := h.Repo.InitRegistrationTokens(); err != nil { log.Printf("Warning: failed to initialize registration tokens table: %v", err) } + // Initialize jobs table + jobRepo := repository.NewJobRepository(db) + if err := jobRepo.Init(context.Background()); err != nil { + log.Printf("Warning: failed to initialize jobs table: %v", err) + } + + cmdr := commander.New(jobRepo) + + agents := handlers.NewAgentsGroup(h, cmdr) + auth := handlers.AuthGroup{Handlers: h} + agentReg := handlers.NewAgentRegistrationGroup(h) + // Create admin user from config if not exists if cfg.Admin.Admin_login != "" && cfg.Admin.Admin_password != "" { if !h.Repo.ExistsByLogin(cfg.Admin.Admin_login) { @@ -188,7 +198,6 @@ func main() { } grpcServer := grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsConfig))) - cmdr := commander.New() proto.RegisterCommanderServer(grpcServer, cmdr) lis, err := net.Listen("tcp", ":"+grpcPort) @@ -196,13 +205,34 @@ func main() { log.Fatalf("Failed to listen on gRPC port %s: %v", grpcPort, err) } - go func() { - log.Printf("gRPC server starting on port %s with mTLS", grpcPort) - if err := grpcServer.Serve(lis); err != nil { - log.Fatalf("gRPC server error: %v", err) - } - }() - defer grpcServer.GracefulStop() + g, ctx := errgroup.WithContext(context.Background()) - log.Fatal(router.Run(":8080")) -} + g.Go(func() error { + log.Printf("gRPC server starting on port %s with mTLS", grpcPort) + errCh := make(chan error, 1) + go func() { errCh <- grpcServer.Serve(lis) }() + select { + case err := <-errCh: + return err + case <-ctx.Done(): + grpcServer.GracefulStop() + return nil + } + }) + + g.Go(func() error { + log.Printf("HTTP server starting on :8080") + errCh := make(chan error, 1) + go func() { errCh <- router.Run(":8080") }() + select { + case err := <-errCh: + return err + case <-ctx.Done(): + return nil + } + }) + + if err := g.Wait(); err != nil { + log.Fatalf("Server error: %v", err) + } +} \ No newline at end of file diff --git a/backend/internal/grpcsrv/commander/commander.go b/backend/internal/grpcsrv/commander/commander.go index d809ad0..02befa3 100644 --- a/backend/internal/grpcsrv/commander/commander.go +++ b/backend/internal/grpcsrv/commander/commander.go @@ -15,11 +15,18 @@ import ( type Commander struct { proto.UnimplementedCommanderServer agents map[string]Agent + jobber Jobber } -func New() *Commander { +type Jobber interface { + InitJob(ctx context.Context, agentID string, job models.JobForInsert) (int64, error) + UpdateJobInDB(ctx context.Context, jid int64, msg models.JobForUpdate) (models.Job, error) +} + +func New(jobber Jobber) *Commander { return &Commander{ agents: make(map[string]Agent), + jobber: jobber, } } @@ -27,11 +34,13 @@ type Agent struct { bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command] in chan *proto.Command jobs map[int64]Job - jobber interface { - InitJob(ctx context.Context) (int64, error) - UpdateJobInDB(ctx context.Context, jid int64, msg models.JobForUpdate) (models.Job, error) - } - ctx context.Context + jobber Jobber + ctx context.Context + aid string + + Token string // agent id + Label string + Services []string } type JobOut struct { fc models.Job @@ -47,15 +56,23 @@ func (self *Commander) GetAgent(aid string) (agent Agent, ok bool) { return } +func (self *Commander) Agents() []Agent { + result := make([]Agent, 0, len(self.agents)) + for _, a := range self.agents { + result = append(result, a) + } + return result +} + func (self *Agent) AddJob(job models.JobForInsert) (int64, error) { - jid, err := self.jobber.InitJob(self.ctx) + jid, err := self.jobber.InitJob(self.ctx, self.aid, job) if err != nil { return 0, err } self.in <- &proto.Command{ - Id: 0, - Command: []string{}, - Stdin: new(string), + Id: jid, + Command: job.Command, + Stdin: job.Stdin, } return jid, err } @@ -75,7 +92,14 @@ func (self *Commander) Stream(bidi grpc.BidiStreamingServer[proto.FinishedComman return fmt.Errorf("agentid metadata missing") } aid := aidVals[0] - agent := newAgent(bidi) + + var label string + labelVals := md["label"] + if len(labelVals) > 0 { + label = labelVals[0] + } + + agent := newAgent(bidi, self.jobber, aid, label) self.agents[aid] = agent return agent.run() } @@ -121,13 +145,16 @@ func (self *Agent) send() error { // self.jobs[] } -func newAgent(bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command]) Agent { +func newAgent(bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command], jobber Jobber, aid string, label string) Agent { return Agent{ - bidi, - make(chan *proto.Command), - make(map[int64]Job), - nil, - bidi.Context(), + bidi: bidi, + in: make(chan *proto.Command), + jobs: make(map[int64]Job), + jobber: jobber, + ctx: bidi.Context(), + aid: aid, + Label: label, + Token: aid, } } diff --git a/backend/internal/handlers/agents.go b/backend/internal/handlers/agents.go index 4848346..34c16a4 100644 --- a/backend/internal/handlers/agents.go +++ b/backend/internal/handlers/agents.go @@ -1,12 +1,18 @@ package handlers import ( + "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/grpcsrv/commander" "github.com/gin-gonic/gin" "net/http" ) type AgentsGroup struct { *Handlers + cmder *commander.Commander +} + +func NewAgentsGroup(h *Handlers, cmder *commander.Commander) AgentsGroup { + return AgentsGroup{Handlers: h, cmder: cmder} } type AgentInfo struct { @@ -22,5 +28,14 @@ type AgentInfo struct { // @Success 200 {array} AgentInfo // @Router /agents [get] func (ag *AgentsGroup) List(c *gin.Context) { - c.JSON(http.StatusOK, gin.H{"message": "Agents list"}) + agents := make([]AgentInfo, 0) + // iterate over the commander's agents map + for _, agent := range ag.cmder.Agents() { + agents = append(agents, AgentInfo{ + Token: agent.Token, + Label: agent.Label, + Services: agent.Services, + }) + } + c.JSON(http.StatusOK, agents) } diff --git a/backend/internal/repository/job_repository.go b/backend/internal/repository/job_repository.go new file mode 100644 index 0000000..84015e8 --- /dev/null +++ b/backend/internal/repository/job_repository.go @@ -0,0 +1,90 @@ +package repository + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + + "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/models" + "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/storage" +) + +type JobRepository struct { + DB *sql.DB +} + +func NewJobRepository(db *sql.DB) *JobRepository { + return &JobRepository{DB: db} +} + +func (r *JobRepository) Init(ctx context.Context) error { + _, err := r.DB.ExecContext(ctx, storage.CreateJobsTable) + return err +} + +func (r *JobRepository) InitJob(ctx context.Context, agentID string, job models.JobForInsert) (int64, error) { + commandJSON, err := json.Marshal(job.Command) + if err != nil { + return 0, fmt.Errorf("marshal command: %w", err) + } + + var stdinVal *string + if job.Stdin != nil { + stdinVal = job.Stdin + } + + result, err := r.DB.ExecContext(ctx, + `INSERT INTO jobs (agent_id, command, stdin, stdout, stderr, status) VALUES (?, ?, ?, '', '', 0)`, + agentID, string(commandJSON), stdinVal, + ) + if err != nil { + return 0, err + } + + return result.LastInsertId() +} + +func (r *JobRepository) UpdateJobInDB(ctx context.Context, jid int64, msg models.JobForUpdate) (models.Job, error) { + result, err := r.DB.ExecContext(ctx, + `UPDATE jobs SET stdout = ?, stderr = ?, status = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?`, + msg.Stdout, msg.Stderr, msg.Status, jid, + ) + if err != nil { + return models.Job{}, err + } + + affected, err := result.RowsAffected() + if err != nil { + return models.Job{}, err + } + if affected == 0 { + return models.Job{}, ErrNotFound + } + + return r.GetJobByID(ctx, jid) +} + +func (r *JobRepository) GetJobByID(ctx context.Context, jid int64) (models.Job, error) { + var job models.Job + var commandJSON string + var stdinVal *string + + err := r.DB.QueryRowContext(ctx, + `SELECT id, command, stdin, stdout, stderr, status FROM jobs WHERE id = ?`, + jid, + ).Scan(&job.ID, &commandJSON, &stdinVal, &job.Stdout, &job.Stderr, &job.Status) + if err != nil { + if err == sql.ErrNoRows { + return models.Job{}, ErrNotFound + } + return models.Job{}, err + } + + if err := json.Unmarshal([]byte(commandJSON), &job.JobForInsert.Command); err != nil { + return models.Job{}, fmt.Errorf("unmarshal command: %w", err) + } + + job.JobForInsert.Stdin = stdinVal + return job, nil +} diff --git a/backend/internal/storage/migrations.go b/backend/internal/storage/migrations.go index 06d1989..1698570 100644 --- a/backend/internal/storage/migrations.go +++ b/backend/internal/storage/migrations.go @@ -25,6 +25,20 @@ CREATE TABLE IF NOT EXISTS registration_tokens ( ); ` +const CreateJobsTable = ` +CREATE TABLE IF NOT EXISTS jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + agent_id TEXT NOT NULL, + command TEXT NOT NULL, + stdin TEXT, + stdout TEXT DEFAULT '', + stderr TEXT DEFAULT '', + status INTEGER DEFAULT 0, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP +); +` + const CreateLogsTable = ` CREATE TABLE IF NOT EXISTS logs ( timestamp DateTime64(3) DEFAULT now(),