package storage import ( "context" "github.com/jackc/pgx/v5/pgxpool" ) type RepositoryInterface interface { GetBales(ctx context.Context) ([]Bale, error) GetBaleByID(ctx context.Context, id string) (*Bale, error) CreateBale(ctx context.Context, input Bale) (*Bale, error) UpdateBale(ctx context.Context, id string, input Bale) (*Bale, error) DeleteBale(ctx context.Context, id string) error GetBaleTypeByType(ctx context.Context, typeName string) (*BaleType, error) GetBaleTypes(ctx context.Context) ([]BaleType, error) GetBaleTypeByID(ctx context.Context, id int) (*BaleType, error) CreateBaleType(ctx context.Context, input BaleType) (*BaleType, error) UpdateBaleType(ctx context.Context, id int, input BaleType) (*BaleType, error) DeleteBaleType(ctx context.Context, id int) error GetLineStages(ctx context.Context) ([]LineStage, error) GetLineStageByID(ctx context.Context, id int) (*LineStage, error) CreateLineStage(ctx context.Context, input LineStage) (*LineStage, error) UpdateLineStage(ctx context.Context, id int, input LineStage) (*LineStage, error) DeleteLineStage(ctx context.Context, id int) error CreateSensorReading(ctx context.Context, input SensorReading) (*SensorReading, error) GetSensorReadingsByStage(ctx context.Context, stageID int) ([]SensorReading, error) CreateProductionEvent(ctx context.Context, input ProductionEvent) (*ProductionEvent, error) GetProductionEventsByStage(ctx context.Context, stageID int) ([]ProductionEvent, error) GetLineStats(ctx context.Context) (*LineStats, error) } type Repository struct { pool *pgxpool.Pool } var _ RepositoryInterface = (*Repository)(nil) func NewRepository(pool *pgxpool.Pool) *Repository { return &Repository{ pool: pool, } } func (r *Repository) GetBales(ctx context.Context) ([]Bale, error) { rows, err := r.pool.Query(ctx, ` SELECT b.id, b.type_id, COALESCE(bt.type, ''), b.timestamp FROM bales b LEFT JOIN bale_types bt ON b.type_id = bt.id ORDER BY b.id`) if err != nil { return nil, err } defer rows.Close() var bales []Bale for rows.Next() { var b Bale if err := rows.Scan(&b.ID, &b.TypeID, &b.Type, &b.Timestamp); err != nil { return nil, err } bales = append(bales, b) } return bales, nil } func (r *Repository) GetBaleByID(ctx context.Context, id string) (*Bale, error) { var b Bale err := r.pool.QueryRow(ctx, ` SELECT b.id, b.type_id, COALESCE(bt.type, ''), b.timestamp FROM bales b LEFT JOIN bale_types bt ON b.type_id = bt.id WHERE b.id = $1`, id).Scan(&b.ID, &b.TypeID, &b.Type, &b.Timestamp) if err != nil { return nil, err } return &b, nil } func (r *Repository) CreateBale(ctx context.Context, input Bale) (*Bale, error) { var b Bale err := r.pool.QueryRow(ctx, ` INSERT INTO bales (type_id) VALUES ($1) RETURNING id, type_id, timestamp`, input.TypeID).Scan(&b.ID, &b.TypeID, &b.Timestamp) if err != nil { return nil, err } if input.TypeID > 0 { var bt BaleType r.pool.QueryRow(ctx, "SELECT id, type FROM bale_types WHERE id = $1", b.TypeID).Scan(&bt.ID, &bt.Type) b.Type = bt.Type } return &b, nil } func (r *Repository) UpdateBale(ctx context.Context, id string, input Bale) (*Bale, error) { var b Bale err := r.pool.QueryRow(ctx, ` UPDATE bales SET type_id = $1 WHERE id = $2 RETURNING id, type_id, timestamp`, input.TypeID, id).Scan(&b.ID, &b.TypeID, &b.Timestamp) if err != nil { return nil, err } if input.TypeID > 0 { var bt BaleType r.pool.QueryRow(ctx, "SELECT id, type FROM bale_types WHERE id = $1", b.TypeID).Scan(&bt.ID, &bt.Type) b.Type = bt.Type } return &b, nil } func (r *Repository) DeleteBale(ctx context.Context, id string) error { _, err := r.pool.Exec(ctx, "DELETE FROM bales WHERE id = $1", id) return err } func (r *Repository) GetBaleTypes(ctx context.Context) ([]BaleType, error) { rows, err := r.pool.Query(ctx, "SELECT id, type, weight, height, width, length FROM bale_types ORDER BY id") if err != nil { return nil, err } defer rows.Close() var types []BaleType for rows.Next() { var bt BaleType if err := rows.Scan(&bt.ID, &bt.Type, &bt.Weight, &bt.Height, &bt.Width, &bt.Length); err != nil { return nil, err } types = append(types, bt) } return types, nil } func (r *Repository) GetBaleTypeByID(ctx context.Context, id int) (*BaleType, error) { var bt BaleType err := r.pool.QueryRow(ctx, "SELECT id, type, weight, height, width, length FROM bale_types WHERE id = $1", id).Scan(&bt.ID, &bt.Type, &bt.Weight, &bt.Height, &bt.Width, &bt.Length) if err != nil { return nil, err } return &bt, nil } func (r *Repository) GetBaleTypeByType(ctx context.Context, typeName string) (*BaleType, error) { var bt BaleType err := r.pool.QueryRow(ctx, "SELECT id, type, weight, height, width, length FROM bale_types WHERE type = $1", typeName).Scan(&bt.ID, &bt.Type, &bt.Weight, &bt.Height, &bt.Width, &bt.Length) if err != nil { return nil, err } return &bt, nil } func (r *Repository) CreateBaleType(ctx context.Context, input BaleType) (*BaleType, error) { var bt BaleType err := r.pool.QueryRow(ctx, ` INSERT INTO bale_types (type, weight, height, width, length) VALUES ($1, $2, $3, $4, $5) RETURNING id, type, weight, height, width, length`, input.Type, input.Weight, input.Height, input.Width, input.Length).Scan(&bt.ID, &bt.Type, &bt.Weight, &bt.Height, &bt.Width, &bt.Length) if err != nil { return nil, err } return &bt, nil } func (r *Repository) UpdateBaleType(ctx context.Context, id int, input BaleType) (*BaleType, error) { var bt BaleType err := r.pool.QueryRow(ctx, ` UPDATE bale_types SET type = $1, weight = $2, height = $3, width = $4, length = $5 WHERE id = $6 RETURNING id, type, weight, height, width, length`, input.Type, input.Weight, input.Height, input.Width, input.Length, id).Scan(&bt.ID, &bt.Type, &bt.Weight, &bt.Height, &bt.Width, &bt.Length) if err != nil { return nil, err } return &bt, nil } func (r *Repository) DeleteBaleType(ctx context.Context, id int) error { _, err := r.pool.Exec(ctx, "DELETE FROM bale_types WHERE id = $1", id) return err } func (r *Repository) GetLineStages(ctx context.Context) ([]LineStage, error) { rows, err := r.pool.Query(ctx, "SELECT id, name, description, stage_order, equipment, mqtt_topic FROM line_stages ORDER BY stage_order") if err != nil { return nil, err } defer rows.Close() var stages []LineStage for rows.Next() { var s LineStage if err := rows.Scan(&s.ID, &s.Name, &s.Description, &s.Order, &s.Equipment, &s.MQTTTopic); err != nil { return nil, err } stages = append(stages, s) } return stages, nil } func (r *Repository) GetLineStageByID(ctx context.Context, id int) (*LineStage, error) { var s LineStage err := r.pool.QueryRow(ctx, "SELECT id, name, description, stage_order, equipment, mqtt_topic FROM line_stages WHERE id = $1", id).Scan(&s.ID, &s.Name, &s.Description, &s.Order, &s.Equipment, &s.MQTTTopic) if err != nil { return nil, err } return &s, nil } func (r *Repository) CreateLineStage(ctx context.Context, input LineStage) (*LineStage, error) { var s LineStage err := r.pool.QueryRow(ctx, ` INSERT INTO line_stages (name, description, stage_order, equipment, mqtt_topic) VALUES ($1, $2, $3, $4, $5) RETURNING id, name, description, stage_order, equipment, mqtt_topic`, input.Name, input.Description, input.Order, input.Equipment, input.MQTTTopic).Scan(&s.ID, &s.Name, &s.Description, &s.Order, &s.Equipment, &s.MQTTTopic) if err != nil { return nil, err } return &s, nil } func (r *Repository) UpdateLineStage(ctx context.Context, id int, input LineStage) (*LineStage, error) { var s LineStage err := r.pool.QueryRow(ctx, ` UPDATE line_stages SET name = $1, description = $2, stage_order = $3, equipment = $4, mqtt_topic = $5 WHERE id = $6 RETURNING id, name, description, stage_order, equipment, mqtt_topic`, input.Name, input.Description, input.Order, input.Equipment, input.MQTTTopic, id).Scan(&s.ID, &s.Name, &s.Description, &s.Order, &s.Equipment, &s.MQTTTopic) if err != nil { return nil, err } return &s, nil } func (r *Repository) DeleteLineStage(ctx context.Context, id int) error { _, err := r.pool.Exec(ctx, "DELETE FROM line_stages WHERE id = $1", id) return err } func (r *Repository) CreateSensorReading(ctx context.Context, input SensorReading) (*SensorReading, error) { var s SensorReading err := r.pool.QueryRow(ctx, ` INSERT INTO sensor_readings (stage_id, sensor, value, unit, timestamp) VALUES ($1, $2, $3, $4, $5) RETURNING id, stage_id, sensor, value, unit, timestamp`, input.StageID, input.Sensor, input.Value, input.Unit, input.Timestamp).Scan(&s.ID, &s.StageID, &s.Sensor, &s.Value, &s.Unit, &s.Timestamp) if err != nil { return nil, err } return &s, nil } func (r *Repository) GetSensorReadingsByStage(ctx context.Context, stageID int) ([]SensorReading, error) { rows, err := r.pool.Query(ctx, "SELECT id, stage_id, sensor, value, unit, timestamp FROM sensor_readings WHERE stage_id = $1 ORDER BY timestamp DESC LIMIT 100", stageID) if err != nil { return nil, err } defer rows.Close() var readings []SensorReading for rows.Next() { var r SensorReading if err := rows.Scan(&r.ID, &r.StageID, &r.Sensor, &r.Value, &r.Unit, &r.Timestamp); err != nil { return nil, err } readings = append(readings, r) } return readings, nil } func (r *Repository) CreateProductionEvent(ctx context.Context, input ProductionEvent) (*ProductionEvent, error) { var e ProductionEvent err := r.pool.QueryRow(ctx, ` INSERT INTO production_events (stage_id, event_type, data, timestamp) VALUES ($1, $2, $3, $4) RETURNING id, stage_id, event_type, data, timestamp`, input.StageID, input.EventType, input.Data, input.Timestamp).Scan(&e.ID, &e.StageID, &e.EventType, &e.Data, &e.Timestamp) if err != nil { return nil, err } return &e, nil } func (r *Repository) GetProductionEventsByStage(ctx context.Context, stageID int) ([]ProductionEvent, error) { rows, err := r.pool.Query(ctx, "SELECT id, stage_id, event_type, data, timestamp FROM production_events WHERE stage_id = $1 ORDER BY timestamp DESC LIMIT 100", stageID) if err != nil { return nil, err } defer rows.Close() var events []ProductionEvent for rows.Next() { var e ProductionEvent if err := rows.Scan(&e.ID, &e.StageID, &e.EventType, &e.Data, &e.Timestamp); err != nil { return nil, err } events = append(events, e) } return events, nil } func (r *Repository) GetLineStats(ctx context.Context) (*LineStats, error) { var stats LineStats err := r.pool.QueryRow(ctx, ` SELECT COUNT(CASE WHEN pe.event_type = 'item_processed' THEN 1 END) as total_items, COUNT(CASE WHEN pe.event_type = 'item_rejected' THEN 1 END) as rejected_items, COALESCE(AVG(sr.value) FILTER (WHERE sr.sensor = 'conveyor_speed'), 0) as avg_speed, COALESCE((SELECT ls.name FROM line_stages ls ORDER BY ls.stage_order LIMIT 1), '') as current_stage, 'running' as status, NOW() as last_update_time FROM production_events pe LEFT JOIN sensor_readings sr ON pe.stage_id = sr.stage_id `).Scan(&stats.TotalItems, &stats.RejectedItems, &stats.AvgSpeed, &stats.CurrentStage, &stats.Status, &stats.LastUpdateTime) if err != nil { return nil, err } return &stats, nil }