314 lines
11 KiB
Go
314 lines
11 KiB
Go
package storage
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
)
|
|
|
|
type RepositoryInterface interface {
|
|
GetBales(ctx context.Context) ([]Bale, error)
|
|
GetBaleByID(ctx context.Context, id string) (*Bale, error)
|
|
CreateBale(ctx context.Context, input Bale) (*Bale, error)
|
|
UpdateBale(ctx context.Context, id string, input Bale) (*Bale, error)
|
|
DeleteBale(ctx context.Context, id string) error
|
|
GetBaleTypeByType(ctx context.Context, typeName string) (*BaleType, error)
|
|
GetBaleTypes(ctx context.Context) ([]BaleType, error)
|
|
GetBaleTypeByID(ctx context.Context, id int) (*BaleType, error)
|
|
CreateBaleType(ctx context.Context, input BaleType) (*BaleType, error)
|
|
UpdateBaleType(ctx context.Context, id int, input BaleType) (*BaleType, error)
|
|
DeleteBaleType(ctx context.Context, id int) error
|
|
GetLineStages(ctx context.Context) ([]LineStage, error)
|
|
GetLineStageByID(ctx context.Context, id int) (*LineStage, error)
|
|
CreateLineStage(ctx context.Context, input LineStage) (*LineStage, error)
|
|
UpdateLineStage(ctx context.Context, id int, input LineStage) (*LineStage, error)
|
|
DeleteLineStage(ctx context.Context, id int) error
|
|
CreateSensorReading(ctx context.Context, input SensorReading) (*SensorReading, error)
|
|
GetSensorReadingsByStage(ctx context.Context, stageID int) ([]SensorReading, error)
|
|
CreateProductionEvent(ctx context.Context, input ProductionEvent) (*ProductionEvent, error)
|
|
GetProductionEventsByStage(ctx context.Context, stageID int) ([]ProductionEvent, error)
|
|
GetLineStats(ctx context.Context) (*LineStats, error)
|
|
}
|
|
|
|
type Repository struct {
|
|
pool *pgxpool.Pool
|
|
}
|
|
|
|
var _ RepositoryInterface = (*Repository)(nil)
|
|
|
|
func NewRepository(pool *pgxpool.Pool) *Repository {
|
|
return &Repository{
|
|
pool: pool,
|
|
}
|
|
}
|
|
|
|
func (r *Repository) GetBales(ctx context.Context) ([]Bale, error) {
|
|
rows, err := r.pool.Query(ctx, `
|
|
SELECT b.id, b.type_id, COALESCE(bt.type, ''), b.timestamp
|
|
FROM bales b
|
|
LEFT JOIN bale_types bt ON b.type_id = bt.id
|
|
ORDER BY b.id`)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var bales []Bale
|
|
for rows.Next() {
|
|
var b Bale
|
|
if err := rows.Scan(&b.ID, &b.TypeID, &b.Type, &b.Timestamp); err != nil {
|
|
return nil, err
|
|
}
|
|
bales = append(bales, b)
|
|
}
|
|
return bales, nil
|
|
}
|
|
|
|
func (r *Repository) GetBaleByID(ctx context.Context, id string) (*Bale, error) {
|
|
var b Bale
|
|
err := r.pool.QueryRow(ctx, `
|
|
SELECT b.id, b.type_id, COALESCE(bt.type, ''), b.timestamp
|
|
FROM bales b
|
|
LEFT JOIN bale_types bt ON b.type_id = bt.id
|
|
WHERE b.id = $1`, id).Scan(&b.ID, &b.TypeID, &b.Type, &b.Timestamp)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &b, nil
|
|
}
|
|
|
|
func (r *Repository) CreateBale(ctx context.Context, input Bale) (*Bale, error) {
|
|
var b Bale
|
|
err := r.pool.QueryRow(ctx, `
|
|
INSERT INTO bales (type_id) VALUES ($1)
|
|
RETURNING id, type_id, timestamp`, input.TypeID).Scan(&b.ID, &b.TypeID, &b.Timestamp)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if input.TypeID > 0 {
|
|
var bt BaleType
|
|
r.pool.QueryRow(ctx, "SELECT id, type FROM bale_types WHERE id = $1", b.TypeID).Scan(&bt.ID, &bt.Type)
|
|
b.Type = bt.Type
|
|
}
|
|
return &b, nil
|
|
}
|
|
|
|
func (r *Repository) UpdateBale(ctx context.Context, id string, input Bale) (*Bale, error) {
|
|
var b Bale
|
|
err := r.pool.QueryRow(ctx, `
|
|
UPDATE bales SET type_id = $1 WHERE id = $2
|
|
RETURNING id, type_id, timestamp`, input.TypeID, id).Scan(&b.ID, &b.TypeID, &b.Timestamp)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if input.TypeID > 0 {
|
|
var bt BaleType
|
|
r.pool.QueryRow(ctx, "SELECT id, type FROM bale_types WHERE id = $1", b.TypeID).Scan(&bt.ID, &bt.Type)
|
|
b.Type = bt.Type
|
|
}
|
|
return &b, nil
|
|
}
|
|
|
|
func (r *Repository) DeleteBale(ctx context.Context, id string) error {
|
|
_, err := r.pool.Exec(ctx, "DELETE FROM bales WHERE id = $1", id)
|
|
return err
|
|
}
|
|
|
|
func (r *Repository) GetBaleTypes(ctx context.Context) ([]BaleType, error) {
|
|
rows, err := r.pool.Query(ctx, "SELECT id, type, weight, height, width, length FROM bale_types ORDER BY id")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var types []BaleType
|
|
for rows.Next() {
|
|
var bt BaleType
|
|
if err := rows.Scan(&bt.ID, &bt.Type, &bt.Weight, &bt.Height, &bt.Width, &bt.Length); err != nil {
|
|
return nil, err
|
|
}
|
|
types = append(types, bt)
|
|
}
|
|
return types, nil
|
|
}
|
|
|
|
func (r *Repository) GetBaleTypeByID(ctx context.Context, id int) (*BaleType, error) {
|
|
var bt BaleType
|
|
err := r.pool.QueryRow(ctx, "SELECT id, type, weight, height, width, length FROM bale_types WHERE id = $1", id).Scan(&bt.ID, &bt.Type, &bt.Weight, &bt.Height, &bt.Width, &bt.Length)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &bt, nil
|
|
}
|
|
|
|
func (r *Repository) GetBaleTypeByType(ctx context.Context, typeName string) (*BaleType, error) {
|
|
var bt BaleType
|
|
err := r.pool.QueryRow(ctx, "SELECT id, type, weight, height, width, length FROM bale_types WHERE type = $1", typeName).Scan(&bt.ID, &bt.Type, &bt.Weight, &bt.Height, &bt.Width, &bt.Length)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &bt, nil
|
|
}
|
|
|
|
func (r *Repository) CreateBaleType(ctx context.Context, input BaleType) (*BaleType, error) {
|
|
var bt BaleType
|
|
err := r.pool.QueryRow(ctx, `
|
|
INSERT INTO bale_types (type, weight, height, width, length)
|
|
VALUES ($1, $2, $3, $4, $5)
|
|
RETURNING id, type, weight, height, width, length`, input.Type, input.Weight, input.Height, input.Width, input.Length).Scan(&bt.ID, &bt.Type, &bt.Weight, &bt.Height, &bt.Width, &bt.Length)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &bt, nil
|
|
}
|
|
|
|
func (r *Repository) UpdateBaleType(ctx context.Context, id int, input BaleType) (*BaleType, error) {
|
|
var bt BaleType
|
|
err := r.pool.QueryRow(ctx, `
|
|
UPDATE bale_types SET type = $1, weight = $2, height = $3, width = $4, length = $5 WHERE id = $6
|
|
RETURNING id, type, weight, height, width, length`, input.Type, input.Weight, input.Height, input.Width, input.Length, id).Scan(&bt.ID, &bt.Type, &bt.Weight, &bt.Height, &bt.Width, &bt.Length)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &bt, nil
|
|
}
|
|
|
|
func (r *Repository) DeleteBaleType(ctx context.Context, id int) error {
|
|
_, err := r.pool.Exec(ctx, "DELETE FROM bale_types WHERE id = $1", id)
|
|
return err
|
|
}
|
|
|
|
func (r *Repository) GetLineStages(ctx context.Context) ([]LineStage, error) {
|
|
rows, err := r.pool.Query(ctx, "SELECT id, name, description, stage_order, equipment, mqtt_topic FROM line_stages ORDER BY stage_order")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var stages []LineStage
|
|
for rows.Next() {
|
|
var s LineStage
|
|
if err := rows.Scan(&s.ID, &s.Name, &s.Description, &s.Order, &s.Equipment, &s.MQTTTopic); err != nil {
|
|
return nil, err
|
|
}
|
|
stages = append(stages, s)
|
|
}
|
|
return stages, nil
|
|
}
|
|
|
|
func (r *Repository) GetLineStageByID(ctx context.Context, id int) (*LineStage, error) {
|
|
var s LineStage
|
|
err := r.pool.QueryRow(ctx, "SELECT id, name, description, stage_order, equipment, mqtt_topic FROM line_stages WHERE id = $1", id).Scan(&s.ID, &s.Name, &s.Description, &s.Order, &s.Equipment, &s.MQTTTopic)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &s, nil
|
|
}
|
|
|
|
func (r *Repository) CreateLineStage(ctx context.Context, input LineStage) (*LineStage, error) {
|
|
var s LineStage
|
|
err := r.pool.QueryRow(ctx, `
|
|
INSERT INTO line_stages (name, description, stage_order, equipment, mqtt_topic)
|
|
VALUES ($1, $2, $3, $4, $5)
|
|
RETURNING id, name, description, stage_order, equipment, mqtt_topic`, input.Name, input.Description, input.Order, input.Equipment, input.MQTTTopic).Scan(&s.ID, &s.Name, &s.Description, &s.Order, &s.Equipment, &s.MQTTTopic)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &s, nil
|
|
}
|
|
|
|
func (r *Repository) UpdateLineStage(ctx context.Context, id int, input LineStage) (*LineStage, error) {
|
|
var s LineStage
|
|
err := r.pool.QueryRow(ctx, `
|
|
UPDATE line_stages SET name = $1, description = $2, stage_order = $3, equipment = $4, mqtt_topic = $5 WHERE id = $6
|
|
RETURNING id, name, description, stage_order, equipment, mqtt_topic`, input.Name, input.Description, input.Order, input.Equipment, input.MQTTTopic, id).Scan(&s.ID, &s.Name, &s.Description, &s.Order, &s.Equipment, &s.MQTTTopic)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &s, nil
|
|
}
|
|
|
|
func (r *Repository) DeleteLineStage(ctx context.Context, id int) error {
|
|
_, err := r.pool.Exec(ctx, "DELETE FROM line_stages WHERE id = $1", id)
|
|
return err
|
|
}
|
|
|
|
func (r *Repository) CreateSensorReading(ctx context.Context, input SensorReading) (*SensorReading, error) {
|
|
var s SensorReading
|
|
err := r.pool.QueryRow(ctx, `
|
|
INSERT INTO sensor_readings (stage_id, sensor, value, unit, timestamp)
|
|
VALUES ($1, $2, $3, $4, $5)
|
|
RETURNING id, stage_id, sensor, value, unit, timestamp`, input.StageID, input.Sensor, input.Value, input.Unit, input.Timestamp).Scan(&s.ID, &s.StageID, &s.Sensor, &s.Value, &s.Unit, &s.Timestamp)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &s, nil
|
|
}
|
|
|
|
func (r *Repository) GetSensorReadingsByStage(ctx context.Context, stageID int) ([]SensorReading, error) {
|
|
rows, err := r.pool.Query(ctx, "SELECT id, stage_id, sensor, value, unit, timestamp FROM sensor_readings WHERE stage_id = $1 ORDER BY timestamp DESC LIMIT 100", stageID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var readings []SensorReading
|
|
for rows.Next() {
|
|
var r SensorReading
|
|
if err := rows.Scan(&r.ID, &r.StageID, &r.Sensor, &r.Value, &r.Unit, &r.Timestamp); err != nil {
|
|
return nil, err
|
|
}
|
|
readings = append(readings, r)
|
|
}
|
|
return readings, nil
|
|
}
|
|
|
|
func (r *Repository) CreateProductionEvent(ctx context.Context, input ProductionEvent) (*ProductionEvent, error) {
|
|
var e ProductionEvent
|
|
err := r.pool.QueryRow(ctx, `
|
|
INSERT INTO production_events (stage_id, event_type, data, timestamp)
|
|
VALUES ($1, $2, $3, $4)
|
|
RETURNING id, stage_id, event_type, data, timestamp`, input.StageID, input.EventType, input.Data, input.Timestamp).Scan(&e.ID, &e.StageID, &e.EventType, &e.Data, &e.Timestamp)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &e, nil
|
|
}
|
|
|
|
func (r *Repository) GetProductionEventsByStage(ctx context.Context, stageID int) ([]ProductionEvent, error) {
|
|
rows, err := r.pool.Query(ctx, "SELECT id, stage_id, event_type, data, timestamp FROM production_events WHERE stage_id = $1 ORDER BY timestamp DESC LIMIT 100", stageID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var events []ProductionEvent
|
|
for rows.Next() {
|
|
var e ProductionEvent
|
|
if err := rows.Scan(&e.ID, &e.StageID, &e.EventType, &e.Data, &e.Timestamp); err != nil {
|
|
return nil, err
|
|
}
|
|
events = append(events, e)
|
|
}
|
|
return events, nil
|
|
}
|
|
|
|
func (r *Repository) GetLineStats(ctx context.Context) (*LineStats, error) {
|
|
var stats LineStats
|
|
err := r.pool.QueryRow(ctx, `
|
|
SELECT
|
|
COUNT(CASE WHEN pe.event_type = 'item_processed' THEN 1 END) as total_items,
|
|
COUNT(CASE WHEN pe.event_type = 'item_rejected' THEN 1 END) as rejected_items,
|
|
COALESCE(AVG(sr.value) FILTER (WHERE sr.sensor = 'conveyor_speed'), 0) as avg_speed,
|
|
COALESCE((SELECT ls.name FROM line_stages ls ORDER BY ls.stage_order LIMIT 1), '') as current_stage,
|
|
'running' as status,
|
|
NOW() as last_update_time
|
|
FROM production_events pe
|
|
LEFT JOIN sensor_readings sr ON pe.stage_id = sr.stage_id
|
|
`).Scan(&stats.TotalItems, &stats.RejectedItems, &stats.AvgSpeed, &stats.CurrentStage, &stats.Status, &stats.LastUpdateTime)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &stats, nil
|
|
}
|