2 Commits

Author SHA1 Message Date
zero@thinky b88245e7d9 feat(backend/jobs): add agent_id parameter
ci-agent / build (push) Failing after 5m27s
2026-04-05 04:17:55 +03:00
zero@thinky fd01eecfcc feat!(backend): unify script run and ad-hoc job run 2026-04-05 04:17:43 +03:00
8 changed files with 110 additions and 162 deletions
+2 -1
View File
@@ -96,7 +96,8 @@ func main() {
log.Printf("Warning: failed to initialize script interpreters table: %v", err) log.Printf("Warning: failed to initialize script interpreters table: %v", err)
} }
scriptSvc := service.NewScriptServiceWithInterpreters(h.Repo, scriptRepo) scriptSvc := service.NewScriptServiceWithInterpreters(h.Repo, scriptRepo)
scriptHandlers := handlers.NewScriptHandlers(scriptSvc, cmdTracker) scriptHandlers := handlers.NewScriptHandlers(scriptSvc, cmdTracker,
os.Getenv("WHEREAMI"))
jobsHandlers := handlers.NewJobsHandlers(cmdTracker, scriptSvc, jobsHandlers := handlers.NewJobsHandlers(cmdTracker, scriptSvc,
os.Getenv("WHEREAMI"), /* our address for redirects */ os.Getenv("WHEREAMI"), /* our address for redirects */
jobRepo, jobRepo,
+8 -28
View File
@@ -1059,6 +1059,12 @@ const docTemplate = `{
"description": "Time period (e.g. 1h, 24h, 7d)", "description": "Time period (e.g. 1h, 24h, 7d)",
"name": "period", "name": "period",
"in": "query" "in": "query"
},
{
"type": "string",
"description": "Filter by agent ID",
"name": "agent_id",
"in": "query"
} }
], ],
"responses": { "responses": {
@@ -1684,7 +1690,7 @@ const docTemplate = `{
"201": { "201": {
"description": "Created", "description": "Created",
"schema": { "schema": {
"$ref": "#/definitions/internal_handlers.RunScriptOut" "$ref": "#/definitions/internal_handlers.JobResult"
} }
}, },
"400": { "400": {
@@ -2118,7 +2124,7 @@ const docTemplate = `{
"201": { "201": {
"description": "Created", "description": "Created",
"schema": { "schema": {
"$ref": "#/definitions/internal_handlers.RunScriptOut" "$ref": "#/definitions/internal_handlers.AddJobOut"
} }
} }
} }
@@ -2904,32 +2910,6 @@ const docTemplate = `{
} }
} }
}, },
"internal_handlers.RunScriptOut": {
"type": "object",
"properties": {
"command": {
"type": "array",
"items": {
"type": "string"
}
},
"id": {
"type": "integer"
},
"status": {
"type": "integer"
},
"stderr": {
"type": "string"
},
"stdin": {
"type": "string"
},
"stdout": {
"type": "string"
}
}
},
"internal_handlers.RunStoredScriptIn": { "internal_handlers.RunStoredScriptIn": {
"type": "object", "type": "object",
"required": [ "required": [
+8 -28
View File
@@ -1048,6 +1048,12 @@
"description": "Time period (e.g. 1h, 24h, 7d)", "description": "Time period (e.g. 1h, 24h, 7d)",
"name": "period", "name": "period",
"in": "query" "in": "query"
},
{
"type": "string",
"description": "Filter by agent ID",
"name": "agent_id",
"in": "query"
} }
], ],
"responses": { "responses": {
@@ -1673,7 +1679,7 @@
"201": { "201": {
"description": "Created", "description": "Created",
"schema": { "schema": {
"$ref": "#/definitions/internal_handlers.RunScriptOut" "$ref": "#/definitions/internal_handlers.JobResult"
} }
}, },
"400": { "400": {
@@ -2107,7 +2113,7 @@
"201": { "201": {
"description": "Created", "description": "Created",
"schema": { "schema": {
"$ref": "#/definitions/internal_handlers.RunScriptOut" "$ref": "#/definitions/internal_handlers.AddJobOut"
} }
} }
} }
@@ -2893,32 +2899,6 @@
} }
} }
}, },
"internal_handlers.RunScriptOut": {
"type": "object",
"properties": {
"command": {
"type": "array",
"items": {
"type": "string"
}
},
"id": {
"type": "integer"
},
"status": {
"type": "integer"
},
"stderr": {
"type": "string"
},
"stdin": {
"type": "string"
},
"stdout": {
"type": "string"
}
}
},
"internal_handlers.RunStoredScriptIn": { "internal_handlers.RunStoredScriptIn": {
"type": "object", "type": "object",
"required": [ "required": [
+6 -19
View File
@@ -504,23 +504,6 @@ definitions:
- interpreter_id - interpreter_id
- script_text - script_text
type: object type: object
internal_handlers.RunScriptOut:
properties:
command:
items:
type: string
type: array
id:
type: integer
status:
type: integer
stderr:
type: string
stdin:
type: string
stdout:
type: string
type: object
internal_handlers.RunStoredScriptIn: internal_handlers.RunStoredScriptIn:
properties: properties:
stdin: stdin:
@@ -1246,6 +1229,10 @@ paths:
in: query in: query
name: period name: period
type: string type: string
- description: Filter by agent ID
in: query
name: agent_id
type: string
produces: produces:
- application/json - application/json
responses: responses:
@@ -1608,7 +1595,7 @@ paths:
"201": "201":
description: Created description: Created
schema: schema:
$ref: '#/definitions/internal_handlers.RunScriptOut' $ref: '#/definitions/internal_handlers.JobResult'
"400": "400":
description: Bad Request description: Bad Request
schema: schema:
@@ -1882,7 +1869,7 @@ paths:
"201": "201":
description: Created description: Created
schema: schema:
$ref: '#/definitions/internal_handlers.RunScriptOut' $ref: '#/definitions/internal_handlers.AddJobOut'
security: security:
- Bearer: [] - Bearer: []
summary: Run a script on an agent summary: Run a script on an agent
+32 -16
View File
@@ -72,35 +72,49 @@ func (h *JobsHandlers) AddJob(c *gin.Context) {
return return
} }
agent, ok := h.tracker.GetAgent(in.AgentID) result, err := h.runCommand(c, in.AgentID, in.InterpreterID, in.Command, in.Stdin)
if !ok {
c.Status(http.StatusNotFound)
c.Error(fmt.Errorf("agent not found"))
return
}
command, err := resolveCommand(c, h.svc, in.InterpreterID, in.Command)
if err != nil { if err != nil {
c.Error(err) c.Error(err)
return return
} }
c.JSON(http.StatusCreated, result)
}
// runCommand resolves command, submits a job to the agent, and returns AddJobOut.
// Shared between jobs and scripts handlers.
func (h *JobsHandlers) runCommand(
c *gin.Context,
agentID string,
interpID int64,
command string,
stdin *string,
) (*AddJobOut, error) {
agent, ok := h.tracker.GetAgent(agentID)
if !ok {
return nil, fmt.Errorf("agent not found")
}
cmd, err := resolveCommand(c, h.svc, interpID, command)
if err != nil {
return nil, err
}
jid, err := agent.AddJob(models.JobForInsert{ jid, err := agent.AddJob(models.JobForInsert{
Command: command, Command: cmd,
Stdin: in.Stdin, Stdin: stdin,
}) })
if err != nil { if err != nil {
c.Error(err) return nil, err
return
} }
waitURL := fmt.Sprintf("%s/api/v1/jobs/%d/wait", h.whereami, jid) waitURL := fmt.Sprintf("%s/api/v1/jobs/%d/wait", h.whereami, jid)
c.JSON(http.StatusCreated, AddJobOut{ return &AddJobOut{
ID: jid, ID: jid,
Command: command, Command: cmd,
WaitURL: waitURL, WaitURL: waitURL,
}) }, nil
} }
// WaitJob waits for a submitted job to complete (long-poll). // WaitJob waits for a submitted job to complete (long-poll).
@@ -216,6 +230,7 @@ type JobMetricsOut struct {
// @Tags jobs // @Tags jobs
// @Produce json // @Produce json
// @Param period query string false "Time period (e.g. 1h, 24h, 7d)" default(24h) // @Param period query string false "Time period (e.g. 1h, 24h, 7d)" default(24h)
// @Param agent_id query string false "Filter by agent ID"
// @Success 200 {object} JobMetricsOut // @Success 200 {object} JobMetricsOut
// @Failure 400 {object} map[string]string // @Failure 400 {object} map[string]string
// @Security Bearer // @Security Bearer
@@ -228,8 +243,9 @@ func (h *JobsHandlers) GetJobMetrics(c *gin.Context) {
return return
} }
agentID := c.Query("agent_id")
since := time.Now().Add(-period) since := time.Now().Add(-period)
metrics, err := h.jobRepo.GetJobMetrics(c.Request.Context(), since) metrics, err := h.jobRepo.GetJobMetrics(c.Request.Context(), since, agentID)
if err != nil { if err != nil {
c.Error(err) c.Error(err)
return return
+37 -57
View File
@@ -13,12 +13,13 @@ import (
) )
type ScriptHandlers struct { type ScriptHandlers struct {
svc *service.ScriptService svc *service.ScriptService
tracker *commander.ConnTracker tracker *commander.ConnTracker
whereami string
} }
func NewScriptHandlers(svc *service.ScriptService, tracker *commander.ConnTracker) ScriptHandlers { func NewScriptHandlers(svc *service.ScriptService, tracker *commander.ConnTracker, whereami string) ScriptHandlers {
return ScriptHandlers{svc: svc, tracker: tracker} return ScriptHandlers{svc: svc, tracker: tracker, whereami: whereami}
} }
type RunScriptIn struct { type RunScriptIn struct {
@@ -28,73 +29,52 @@ type RunScriptIn struct {
Stdin *string `json:"stdin"` Stdin *string `json:"stdin"`
} }
type RunScriptOut struct { // RunScript submits a script as a job and returns a wait_url for the result.
ID int64 `json:"id"`
Command []string `json:"command"`
Stdin *string `json:"stdin"`
Stdout string `json:"stdout"`
Stderr string `json:"stderr"`
Status int32 `json:"status"`
}
// RunScript executes a script on a target agent.
// @Summary Run a script on an agent // @Summary Run a script on an agent
// @Description Resolves interpreter argv[] and sends the full command to the agent // @Description Resolves interpreter argv[] and sends the full command to the agent
// @Tags scripts // @Tags scripts
// @Accept json // @Accept json
// @Produce json // @Produce json
// @Param body body RunScriptIn true "Script request" // @Param body body RunScriptIn true "Script request"
// @Success 201 {object} RunScriptOut // @Success 201 {object} AddJobOut
// @Security Bearer // @Security Bearer
// @Router /scripts/run [post] // @Router /scripts/run [post]
func (h *ScriptHandlers) RunScript(c *gin.Context) { func (h *ScriptHandlers) RunScript(c *gin.Context) {
err := func() error { var in RunScriptIn
var in RunScriptIn if err := c.Bind(&in); err != nil {
if err := c.Bind(&in); err != nil { c.Error(err)
return err return
} }
command, err := h.svc.ResolveCommand( agent, ok := h.tracker.GetAgent(in.AgentID)
c.Request.Context(), if !ok {
in.InterpreterID, c.Status(http.StatusNotFound)
in.ScriptText, c.Error(fmt.Errorf("agent not found"))
) return
if err != nil { }
return err
}
agent, ok := h.tracker.GetAgent(in.AgentID) command, err := h.svc.ResolveCommand(c.Request.Context(), in.InterpreterID, in.ScriptText)
if !ok {
c.Status(http.StatusNotFound)
return fmt.Errorf("agent not found")
}
jid, err := agent.AddJob(models.JobForInsert{
Command: command,
Stdin: in.Stdin,
})
if err != nil {
return err
}
job, err := agent.WaitJob(jid)
if err != nil {
return err
}
c.JSON(http.StatusCreated, RunScriptOut{
ID: job.ID,
Command: job.Command,
Stdin: job.Stdin,
Stdout: job.Stdout,
Stderr: job.Stderr,
Status: job.Status,
})
return nil
}()
if err != nil { if err != nil {
c.Error(err) c.Error(err)
return
} }
jid, err := agent.AddJob(models.JobForInsert{
Command: command,
Stdin: in.Stdin,
})
if err != nil {
c.Error(err)
return
}
waitURL := fmt.Sprintf("%s/api/v1/jobs/%d/wait", h.whereami, jid)
c.JSON(http.StatusCreated, AddJobOut{
ID: jid,
Command: command,
WaitURL: waitURL,
})
} }
// ListInterpreters returns all registered script interpreters. // ListInterpreters returns all registered script interpreters.
+2 -2
View File
@@ -198,7 +198,7 @@ func (sh *ScriptHandlersGroup) DeleteScript(c *gin.Context) {
// @Produce json // @Produce json
// @Param id path int true "Script ID" // @Param id path int true "Script ID"
// @Param body body RunStoredScriptIn true "Agent token and optional stdin" // @Param body body RunStoredScriptIn true "Agent token and optional stdin"
// @Success 201 {object} RunScriptOut // @Success 201 {object} JobResult
// @Failure 400 {object} map[string]string // @Failure 400 {object} map[string]string
// @Failure 404 {object} map[string]string // @Failure 404 {object} map[string]string
// @Failure 500 {object} map[string]string // @Failure 500 {object} map[string]string
@@ -254,7 +254,7 @@ func (sh *ScriptHandlersGroup) RunScriptByID(c *gin.Context) {
return return
} }
c.JSON(http.StatusCreated, RunScriptOut{ c.JSON(http.StatusCreated, JobResult{
ID: job.ID, ID: job.ID,
Command: job.Command, Command: job.Command,
Stdin: job.Stdin, Stdin: job.Stdin,
+15 -11
View File
@@ -113,18 +113,22 @@ type JobMetrics struct {
} }
// GetJobMetrics returns job success metrics for jobs updated since the given time. // GetJobMetrics returns job success metrics for jobs updated since the given time.
// A successful job has status == 0, failed has status != 0, pending has status == 0 with empty stdout/stderr. // If agentID is non-empty, results are filtered to that agent only.
func (r *JobRepository) GetJobMetrics(ctx context.Context, since time.Time) (JobMetrics, error) { func (r *JobRepository) GetJobMetrics(ctx context.Context, since time.Time, agentID string) (JobMetrics, error) {
var m JobMetrics var m JobMetrics
err := r.DB.QueryRowContext(ctx, query := `SELECT
`SELECT COUNT(*),
COUNT(*), SUM(CASE WHEN status = 0 AND (stdout != '' OR stderr != '') THEN 1 ELSE 0 END),
SUM(CASE WHEN status = 0 AND (stdout != '' OR stderr != '') THEN 1 ELSE 0 END), SUM(CASE WHEN status != 0 THEN 1 ELSE 0 END),
SUM(CASE WHEN status != 0 THEN 1 ELSE 0 END), SUM(CASE WHEN status = 0 AND stdout = '' AND stderr = '' THEN 1 ELSE 0 END)
SUM(CASE WHEN status = 0 AND stdout = '' AND stderr = '' THEN 1 ELSE 0 END) FROM jobs WHERE updated_at >= ?`
FROM jobs WHERE updated_at >= ?`, args := []any{since}
since, if agentID != "" {
).Scan(&m.Total, &m.Success, &m.Failed, &m.Pending) query += " AND agent_id = ?"
args = append(args, agentID)
}
err := r.DB.QueryRowContext(ctx, query, args...).Scan(&m.Total, &m.Success, &m.Failed, &m.Pending)
if err != nil { if err != nil {
return JobMetrics{}, err return JobMetrics{}, err
} }