2 Commits

Author SHA1 Message Date
zero@thinky b88245e7d9 feat(backend/jobs): add agent_id parameter
ci-agent / build (push) Failing after 5m27s
2026-04-05 04:17:55 +03:00
zero@thinky fd01eecfcc feat!(backend): unify script run and ad-hoc job run 2026-04-05 04:17:43 +03:00
8 changed files with 110 additions and 162 deletions
+2 -1
View File
@@ -96,7 +96,8 @@ func main() {
log.Printf("Warning: failed to initialize script interpreters table: %v", err)
}
scriptSvc := service.NewScriptServiceWithInterpreters(h.Repo, scriptRepo)
scriptHandlers := handlers.NewScriptHandlers(scriptSvc, cmdTracker)
scriptHandlers := handlers.NewScriptHandlers(scriptSvc, cmdTracker,
os.Getenv("WHEREAMI"))
jobsHandlers := handlers.NewJobsHandlers(cmdTracker, scriptSvc,
os.Getenv("WHEREAMI"), /* our address for redirects */
jobRepo,
+8 -28
View File
@@ -1059,6 +1059,12 @@ const docTemplate = `{
"description": "Time period (e.g. 1h, 24h, 7d)",
"name": "period",
"in": "query"
},
{
"type": "string",
"description": "Filter by agent ID",
"name": "agent_id",
"in": "query"
}
],
"responses": {
@@ -1684,7 +1690,7 @@ const docTemplate = `{
"201": {
"description": "Created",
"schema": {
"$ref": "#/definitions/internal_handlers.RunScriptOut"
"$ref": "#/definitions/internal_handlers.JobResult"
}
},
"400": {
@@ -2118,7 +2124,7 @@ const docTemplate = `{
"201": {
"description": "Created",
"schema": {
"$ref": "#/definitions/internal_handlers.RunScriptOut"
"$ref": "#/definitions/internal_handlers.AddJobOut"
}
}
}
@@ -2904,32 +2910,6 @@ const docTemplate = `{
}
}
},
"internal_handlers.RunScriptOut": {
"type": "object",
"properties": {
"command": {
"type": "array",
"items": {
"type": "string"
}
},
"id": {
"type": "integer"
},
"status": {
"type": "integer"
},
"stderr": {
"type": "string"
},
"stdin": {
"type": "string"
},
"stdout": {
"type": "string"
}
}
},
"internal_handlers.RunStoredScriptIn": {
"type": "object",
"required": [
+8 -28
View File
@@ -1048,6 +1048,12 @@
"description": "Time period (e.g. 1h, 24h, 7d)",
"name": "period",
"in": "query"
},
{
"type": "string",
"description": "Filter by agent ID",
"name": "agent_id",
"in": "query"
}
],
"responses": {
@@ -1673,7 +1679,7 @@
"201": {
"description": "Created",
"schema": {
"$ref": "#/definitions/internal_handlers.RunScriptOut"
"$ref": "#/definitions/internal_handlers.JobResult"
}
},
"400": {
@@ -2107,7 +2113,7 @@
"201": {
"description": "Created",
"schema": {
"$ref": "#/definitions/internal_handlers.RunScriptOut"
"$ref": "#/definitions/internal_handlers.AddJobOut"
}
}
}
@@ -2893,32 +2899,6 @@
}
}
},
"internal_handlers.RunScriptOut": {
"type": "object",
"properties": {
"command": {
"type": "array",
"items": {
"type": "string"
}
},
"id": {
"type": "integer"
},
"status": {
"type": "integer"
},
"stderr": {
"type": "string"
},
"stdin": {
"type": "string"
},
"stdout": {
"type": "string"
}
}
},
"internal_handlers.RunStoredScriptIn": {
"type": "object",
"required": [
+6 -19
View File
@@ -504,23 +504,6 @@ definitions:
- interpreter_id
- script_text
type: object
internal_handlers.RunScriptOut:
properties:
command:
items:
type: string
type: array
id:
type: integer
status:
type: integer
stderr:
type: string
stdin:
type: string
stdout:
type: string
type: object
internal_handlers.RunStoredScriptIn:
properties:
stdin:
@@ -1246,6 +1229,10 @@ paths:
in: query
name: period
type: string
- description: Filter by agent ID
in: query
name: agent_id
type: string
produces:
- application/json
responses:
@@ -1608,7 +1595,7 @@ paths:
"201":
description: Created
schema:
$ref: '#/definitions/internal_handlers.RunScriptOut'
$ref: '#/definitions/internal_handlers.JobResult'
"400":
description: Bad Request
schema:
@@ -1882,7 +1869,7 @@ paths:
"201":
description: Created
schema:
$ref: '#/definitions/internal_handlers.RunScriptOut'
$ref: '#/definitions/internal_handlers.AddJobOut'
security:
- Bearer: []
summary: Run a script on an agent
+32 -16
View File
@@ -72,35 +72,49 @@ func (h *JobsHandlers) AddJob(c *gin.Context) {
return
}
agent, ok := h.tracker.GetAgent(in.AgentID)
if !ok {
c.Status(http.StatusNotFound)
c.Error(fmt.Errorf("agent not found"))
return
}
command, err := resolveCommand(c, h.svc, in.InterpreterID, in.Command)
result, err := h.runCommand(c, in.AgentID, in.InterpreterID, in.Command, in.Stdin)
if err != nil {
c.Error(err)
return
}
c.JSON(http.StatusCreated, result)
}
// runCommand resolves command, submits a job to the agent, and returns AddJobOut.
// Shared between jobs and scripts handlers.
func (h *JobsHandlers) runCommand(
c *gin.Context,
agentID string,
interpID int64,
command string,
stdin *string,
) (*AddJobOut, error) {
agent, ok := h.tracker.GetAgent(agentID)
if !ok {
return nil, fmt.Errorf("agent not found")
}
cmd, err := resolveCommand(c, h.svc, interpID, command)
if err != nil {
return nil, err
}
jid, err := agent.AddJob(models.JobForInsert{
Command: command,
Stdin: in.Stdin,
Command: cmd,
Stdin: stdin,
})
if err != nil {
c.Error(err)
return
return nil, err
}
waitURL := fmt.Sprintf("%s/api/v1/jobs/%d/wait", h.whereami, jid)
c.JSON(http.StatusCreated, AddJobOut{
return &AddJobOut{
ID: jid,
Command: command,
Command: cmd,
WaitURL: waitURL,
})
}, nil
}
// WaitJob waits for a submitted job to complete (long-poll).
@@ -216,6 +230,7 @@ type JobMetricsOut struct {
// @Tags jobs
// @Produce json
// @Param period query string false "Time period (e.g. 1h, 24h, 7d)" default(24h)
// @Param agent_id query string false "Filter by agent ID"
// @Success 200 {object} JobMetricsOut
// @Failure 400 {object} map[string]string
// @Security Bearer
@@ -228,8 +243,9 @@ func (h *JobsHandlers) GetJobMetrics(c *gin.Context) {
return
}
agentID := c.Query("agent_id")
since := time.Now().Add(-period)
metrics, err := h.jobRepo.GetJobMetrics(c.Request.Context(), since)
metrics, err := h.jobRepo.GetJobMetrics(c.Request.Context(), since, agentID)
if err != nil {
c.Error(err)
return
+24 -44
View File
@@ -15,10 +15,11 @@ import (
type ScriptHandlers struct {
svc *service.ScriptService
tracker *commander.ConnTracker
whereami string
}
func NewScriptHandlers(svc *service.ScriptService, tracker *commander.ConnTracker) ScriptHandlers {
return ScriptHandlers{svc: svc, tracker: tracker}
func NewScriptHandlers(svc *service.ScriptService, tracker *commander.ConnTracker, whereami string) ScriptHandlers {
return ScriptHandlers{svc: svc, tracker: tracker, whereami: whereami}
}
type RunScriptIn struct {
@@ -28,73 +29,52 @@ type RunScriptIn struct {
Stdin *string `json:"stdin"`
}
type RunScriptOut struct {
ID int64 `json:"id"`
Command []string `json:"command"`
Stdin *string `json:"stdin"`
Stdout string `json:"stdout"`
Stderr string `json:"stderr"`
Status int32 `json:"status"`
}
// RunScript executes a script on a target agent.
// RunScript submits a script as a job and returns a wait_url for the result.
// @Summary Run a script on an agent
// @Description Resolves interpreter argv[] and sends the full command to the agent
// @Tags scripts
// @Accept json
// @Produce json
// @Param body body RunScriptIn true "Script request"
// @Success 201 {object} RunScriptOut
// @Success 201 {object} AddJobOut
// @Security Bearer
// @Router /scripts/run [post]
func (h *ScriptHandlers) RunScript(c *gin.Context) {
err := func() error {
var in RunScriptIn
if err := c.Bind(&in); err != nil {
return err
}
command, err := h.svc.ResolveCommand(
c.Request.Context(),
in.InterpreterID,
in.ScriptText,
)
if err != nil {
return err
c.Error(err)
return
}
agent, ok := h.tracker.GetAgent(in.AgentID)
if !ok {
c.Status(http.StatusNotFound)
return fmt.Errorf("agent not found")
c.Error(fmt.Errorf("agent not found"))
return
}
command, err := h.svc.ResolveCommand(c.Request.Context(), in.InterpreterID, in.ScriptText)
if err != nil {
c.Error(err)
return
}
jid, err := agent.AddJob(models.JobForInsert{
Command: command,
Stdin: in.Stdin,
})
if err != nil {
return err
}
job, err := agent.WaitJob(jid)
if err != nil {
return err
}
c.JSON(http.StatusCreated, RunScriptOut{
ID: job.ID,
Command: job.Command,
Stdin: job.Stdin,
Stdout: job.Stdout,
Stderr: job.Stderr,
Status: job.Status,
})
return nil
}()
if err != nil {
c.Error(err)
return
}
waitURL := fmt.Sprintf("%s/api/v1/jobs/%d/wait", h.whereami, jid)
c.JSON(http.StatusCreated, AddJobOut{
ID: jid,
Command: command,
WaitURL: waitURL,
})
}
// ListInterpreters returns all registered script interpreters.
+2 -2
View File
@@ -198,7 +198,7 @@ func (sh *ScriptHandlersGroup) DeleteScript(c *gin.Context) {
// @Produce json
// @Param id path int true "Script ID"
// @Param body body RunStoredScriptIn true "Agent token and optional stdin"
// @Success 201 {object} RunScriptOut
// @Success 201 {object} JobResult
// @Failure 400 {object} map[string]string
// @Failure 404 {object} map[string]string
// @Failure 500 {object} map[string]string
@@ -254,7 +254,7 @@ func (sh *ScriptHandlersGroup) RunScriptByID(c *gin.Context) {
return
}
c.JSON(http.StatusCreated, RunScriptOut{
c.JSON(http.StatusCreated, JobResult{
ID: job.ID,
Command: job.Command,
Stdin: job.Stdin,
+11 -7
View File
@@ -113,18 +113,22 @@ type JobMetrics struct {
}
// GetJobMetrics returns job success metrics for jobs updated since the given time.
// A successful job has status == 0, failed has status != 0, pending has status == 0 with empty stdout/stderr.
func (r *JobRepository) GetJobMetrics(ctx context.Context, since time.Time) (JobMetrics, error) {
// If agentID is non-empty, results are filtered to that agent only.
func (r *JobRepository) GetJobMetrics(ctx context.Context, since time.Time, agentID string) (JobMetrics, error) {
var m JobMetrics
err := r.DB.QueryRowContext(ctx,
`SELECT
query := `SELECT
COUNT(*),
SUM(CASE WHEN status = 0 AND (stdout != '' OR stderr != '') THEN 1 ELSE 0 END),
SUM(CASE WHEN status != 0 THEN 1 ELSE 0 END),
SUM(CASE WHEN status = 0 AND stdout = '' AND stderr = '' THEN 1 ELSE 0 END)
FROM jobs WHERE updated_at >= ?`,
since,
).Scan(&m.Total, &m.Success, &m.Failed, &m.Pending)
FROM jobs WHERE updated_at >= ?`
args := []any{since}
if agentID != "" {
query += " AND agent_id = ?"
args = append(args, agentID)
}
err := r.DB.QueryRowContext(ctx, query, args...).Scan(&m.Total, &m.Success, &m.Failed, &m.Pending)
if err != nil {
return JobMetrics{}, err
}