package repository import ( "context" "database/sql" "encoding/json" "fmt" "time" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/models" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/storage" ) type JobRepository struct { DB *sql.DB } func NewJobRepository(db *sql.DB) *JobRepository { return &JobRepository{DB: db} } func (r *JobRepository) Init(ctx context.Context) error { _, err := r.DB.ExecContext(ctx, storage.CreateJobsTable) return err } func (r *JobRepository) InitJob( ctx context.Context, agentID string, job models.JobForInsert, ) (int64, error) { commandJSON, err := json.Marshal(job.Command) if err != nil { return 0, fmt.Errorf("marshal command: %w", err) } var stdinVal *string if job.Stdin != nil { stdinVal = job.Stdin } result, err := r.DB.ExecContext( ctx, `INSERT INTO jobs (agent_id, command, stdin, stdout, stderr, status) VALUES (?, ?, ?, '', '', 0)`, agentID, string(commandJSON), stdinVal, ) if err != nil { return 0, err } return result.LastInsertId() } func (r *JobRepository) UpdateJobInDB( ctx context.Context, jid int64, msg models.JobForUpdate, ) (models.Job, error) { result, err := r.DB.ExecContext( ctx, `UPDATE jobs SET stdout = ?, stderr = ?, status = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?`, msg.Stdout, msg.Stderr, msg.Status, jid, ) if err != nil { return models.Job{}, err } affected, err := result.RowsAffected() if err != nil { return models.Job{}, err } if affected == 0 { return models.Job{}, ErrNotFound } return r.GetJobByID(ctx, jid) } func (r *JobRepository) GetJobByID(ctx context.Context, jid int64) (models.Job, error) { var job models.Job var commandJSON string var stdinVal *string err := r.DB.QueryRowContext(ctx, `SELECT id, agent_id, command, stdin, stdout, stderr, status FROM jobs WHERE id = ?`, jid, ).Scan(&job.ID, &job.AgentID, &commandJSON, &stdinVal, &job.Stdout, &job.Stderr, &job.Status) if err != nil { if err == sql.ErrNoRows { return models.Job{}, ErrNotFound } return models.Job{}, err } if err := json.Unmarshal([]byte(commandJSON), &job.Command); err != nil { return models.Job{}, fmt.Errorf("unmarshal command: %w", err) } job.Stdin = stdinVal return job, nil } type JobMetrics struct { Total int Success int Failed int Pending int } // GetJobMetrics returns job success metrics for jobs updated since the given time. // If agentID is non-empty, results are filtered to that agent only. func (r *JobRepository) GetJobMetrics(ctx context.Context, since time.Time, agentID string) (JobMetrics, error) { var m JobMetrics query := `SELECT COUNT(*), SUM(CASE WHEN status = 0 AND (stdout != '' OR stderr != '') THEN 1 ELSE 0 END), SUM(CASE WHEN status != 0 THEN 1 ELSE 0 END), SUM(CASE WHEN status = 0 AND stdout = '' AND stderr = '' THEN 1 ELSE 0 END) FROM jobs WHERE updated_at >= ?` args := []any{since} if agentID != "" { query += " AND agent_id = ?" args = append(args, agentID) } err := r.DB.QueryRowContext(ctx, query, args...).Scan(&m.Total, &m.Success, &m.Failed, &m.Pending) if err != nil { return JobMetrics{}, err } return m, nil }