feat(backend): implement job storage; tie everything up
ci-agent / build (push) Failing after 1m55s

This commit is contained in:
2026-04-04 05:09:16 +03:00
parent f578b6eb51
commit c5e35b4c12
5 changed files with 209 additions and 33 deletions
+45 -15
View File
@@ -16,10 +16,11 @@ import (
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/storage"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto/proto"
"github.com/gin-gonic/gin"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
swaggerFiles "github.com/swaggo/files"
ginSwagger "github.com/swaggo/gin-swagger"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// @securityDefinitions.apikey Bearer
@@ -44,15 +45,24 @@ func main() {
defer db.Close()
h := handlers.New(db)
agents := handlers.AgentsGroup{Handlers: h}
auth := handlers.AuthGroup{Handlers: h}
agentReg := handlers.NewAgentRegistrationGroup(h)
// Initialize registration tokens table
if err := h.Repo.InitRegistrationTokens(); err != nil {
log.Printf("Warning: failed to initialize registration tokens table: %v", err)
}
// Initialize jobs table
jobRepo := repository.NewJobRepository(db)
if err := jobRepo.Init(context.Background()); err != nil {
log.Printf("Warning: failed to initialize jobs table: %v", err)
}
cmdr := commander.New(jobRepo)
agents := handlers.NewAgentsGroup(h, cmdr)
auth := handlers.AuthGroup{Handlers: h}
agentReg := handlers.NewAgentRegistrationGroup(h)
// Create admin user from config if not exists
if cfg.Admin.Admin_login != "" && cfg.Admin.Admin_password != "" {
if !h.Repo.ExistsByLogin(cfg.Admin.Admin_login) {
@@ -188,7 +198,6 @@ func main() {
}
grpcServer := grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsConfig)))
cmdr := commander.New()
proto.RegisterCommanderServer(grpcServer, cmdr)
lis, err := net.Listen("tcp", ":"+grpcPort)
@@ -196,13 +205,34 @@ func main() {
log.Fatalf("Failed to listen on gRPC port %s: %v", grpcPort, err)
}
go func() {
log.Printf("gRPC server starting on port %s with mTLS", grpcPort)
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("gRPC server error: %v", err)
}
}()
defer grpcServer.GracefulStop()
g, ctx := errgroup.WithContext(context.Background())
log.Fatal(router.Run(":8080"))
}
g.Go(func() error {
log.Printf("gRPC server starting on port %s with mTLS", grpcPort)
errCh := make(chan error, 1)
go func() { errCh <- grpcServer.Serve(lis) }()
select {
case err := <-errCh:
return err
case <-ctx.Done():
grpcServer.GracefulStop()
return nil
}
})
g.Go(func() error {
log.Printf("HTTP server starting on :8080")
errCh := make(chan error, 1)
go func() { errCh <- router.Run(":8080") }()
select {
case err := <-errCh:
return err
case <-ctx.Done():
return nil
}
})
if err := g.Wait(); err != nil {
log.Fatalf("Server error: %v", err)
}
}
+44 -17
View File
@@ -15,11 +15,18 @@ import (
type Commander struct {
proto.UnimplementedCommanderServer
agents map[string]Agent
jobber Jobber
}
func New() *Commander {
type Jobber interface {
InitJob(ctx context.Context, agentID string, job models.JobForInsert) (int64, error)
UpdateJobInDB(ctx context.Context, jid int64, msg models.JobForUpdate) (models.Job, error)
}
func New(jobber Jobber) *Commander {
return &Commander{
agents: make(map[string]Agent),
jobber: jobber,
}
}
@@ -27,11 +34,13 @@ type Agent struct {
bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command]
in chan *proto.Command
jobs map[int64]Job
jobber interface {
InitJob(ctx context.Context) (int64, error)
UpdateJobInDB(ctx context.Context, jid int64, msg models.JobForUpdate) (models.Job, error)
}
ctx context.Context
jobber Jobber
ctx context.Context
aid string
Token string // agent id
Label string
Services []string
}
type JobOut struct {
fc models.Job
@@ -47,15 +56,23 @@ func (self *Commander) GetAgent(aid string) (agent Agent, ok bool) {
return
}
func (self *Commander) Agents() []Agent {
result := make([]Agent, 0, len(self.agents))
for _, a := range self.agents {
result = append(result, a)
}
return result
}
func (self *Agent) AddJob(job models.JobForInsert) (int64, error) {
jid, err := self.jobber.InitJob(self.ctx)
jid, err := self.jobber.InitJob(self.ctx, self.aid, job)
if err != nil {
return 0, err
}
self.in <- &proto.Command{
Id: 0,
Command: []string{},
Stdin: new(string),
Id: jid,
Command: job.Command,
Stdin: job.Stdin,
}
return jid, err
}
@@ -75,7 +92,14 @@ func (self *Commander) Stream(bidi grpc.BidiStreamingServer[proto.FinishedComman
return fmt.Errorf("agentid metadata missing")
}
aid := aidVals[0]
agent := newAgent(bidi)
var label string
labelVals := md["label"]
if len(labelVals) > 0 {
label = labelVals[0]
}
agent := newAgent(bidi, self.jobber, aid, label)
self.agents[aid] = agent
return agent.run()
}
@@ -121,13 +145,16 @@ func (self *Agent) send() error {
// self.jobs[]
}
func newAgent(bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command]) Agent {
func newAgent(bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command], jobber Jobber, aid string, label string) Agent {
return Agent{
bidi,
make(chan *proto.Command),
make(map[int64]Job),
nil,
bidi.Context(),
bidi: bidi,
in: make(chan *proto.Command),
jobs: make(map[int64]Job),
jobber: jobber,
ctx: bidi.Context(),
aid: aid,
Label: label,
Token: aid,
}
}
+16 -1
View File
@@ -1,12 +1,18 @@
package handlers
import (
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/grpcsrv/commander"
"github.com/gin-gonic/gin"
"net/http"
)
type AgentsGroup struct {
*Handlers
cmder *commander.Commander
}
func NewAgentsGroup(h *Handlers, cmder *commander.Commander) AgentsGroup {
return AgentsGroup{Handlers: h, cmder: cmder}
}
type AgentInfo struct {
@@ -22,5 +28,14 @@ type AgentInfo struct {
// @Success 200 {array} AgentInfo
// @Router /agents [get]
func (ag *AgentsGroup) List(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"message": "Agents list"})
agents := make([]AgentInfo, 0)
// iterate over the commander's agents map
for _, agent := range ag.cmder.Agents() {
agents = append(agents, AgentInfo{
Token: agent.Token,
Label: agent.Label,
Services: agent.Services,
})
}
c.JSON(http.StatusOK, agents)
}
@@ -0,0 +1,90 @@
package repository
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/models"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/storage"
)
type JobRepository struct {
DB *sql.DB
}
func NewJobRepository(db *sql.DB) *JobRepository {
return &JobRepository{DB: db}
}
func (r *JobRepository) Init(ctx context.Context) error {
_, err := r.DB.ExecContext(ctx, storage.CreateJobsTable)
return err
}
func (r *JobRepository) InitJob(ctx context.Context, agentID string, job models.JobForInsert) (int64, error) {
commandJSON, err := json.Marshal(job.Command)
if err != nil {
return 0, fmt.Errorf("marshal command: %w", err)
}
var stdinVal *string
if job.Stdin != nil {
stdinVal = job.Stdin
}
result, err := r.DB.ExecContext(ctx,
`INSERT INTO jobs (agent_id, command, stdin, stdout, stderr, status) VALUES (?, ?, ?, '', '', 0)`,
agentID, string(commandJSON), stdinVal,
)
if err != nil {
return 0, err
}
return result.LastInsertId()
}
func (r *JobRepository) UpdateJobInDB(ctx context.Context, jid int64, msg models.JobForUpdate) (models.Job, error) {
result, err := r.DB.ExecContext(ctx,
`UPDATE jobs SET stdout = ?, stderr = ?, status = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?`,
msg.Stdout, msg.Stderr, msg.Status, jid,
)
if err != nil {
return models.Job{}, err
}
affected, err := result.RowsAffected()
if err != nil {
return models.Job{}, err
}
if affected == 0 {
return models.Job{}, ErrNotFound
}
return r.GetJobByID(ctx, jid)
}
func (r *JobRepository) GetJobByID(ctx context.Context, jid int64) (models.Job, error) {
var job models.Job
var commandJSON string
var stdinVal *string
err := r.DB.QueryRowContext(ctx,
`SELECT id, command, stdin, stdout, stderr, status FROM jobs WHERE id = ?`,
jid,
).Scan(&job.ID, &commandJSON, &stdinVal, &job.Stdout, &job.Stderr, &job.Status)
if err != nil {
if err == sql.ErrNoRows {
return models.Job{}, ErrNotFound
}
return models.Job{}, err
}
if err := json.Unmarshal([]byte(commandJSON), &job.JobForInsert.Command); err != nil {
return models.Job{}, fmt.Errorf("unmarshal command: %w", err)
}
job.JobForInsert.Stdin = stdinVal
return job, nil
}
+14
View File
@@ -25,6 +25,20 @@ CREATE TABLE IF NOT EXISTS registration_tokens (
);
`
const CreateJobsTable = `
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_id TEXT NOT NULL,
command TEXT NOT NULL,
stdin TEXT,
stdout TEXT DEFAULT '',
stderr TEXT DEFAULT '',
status INTEGER DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
`
const CreateLogsTable = `
CREATE TABLE IF NOT EXISTS logs (
timestamp DateTime64(3) DEFAULT now(),