package buffer import ( "database/sql" "encoding/json" "fmt" "time" _ "modernc.org/sqlite" ) // BufferedLog represents a log entry stored for later delivery type BufferedLog struct { ID int64 Service string Message string CreatedAt time.Time } // LogBuffer provides SQLite-backed log buffering type LogBuffer struct { db *sql.DB } // NewLogBuffer creates a new log buffer with the given database path func NewLogBuffer(dbPath string) (*LogBuffer, error) { db, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL&_busy_timeout=5000") if err != nil { return nil, fmt.Errorf("failed to open database: %w", err) } // Create table if not exists _, err = db.Exec(` CREATE TABLE IF NOT EXISTS buffered_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, service TEXT NOT NULL, message TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) `) if err != nil { _ = db.Close() return nil, fmt.Errorf("failed to create table: %w", err) } // Create index for efficient ordering _, _ = db.Exec(`CREATE INDEX IF NOT EXISTS idx_created_at ON buffered_logs(created_at ASC)`) return &LogBuffer{db: db}, nil } // Close closes the database connection func (b *LogBuffer) Close() error { return b.db.Close() } // Store stores a log entry in the buffer func (b *LogBuffer) Store(service, message string) error { _, err := b.db.Exec( "INSERT INTO buffered_logs (service, message) VALUES (?, ?)", service, message, ) return err } // StoreBatch stores multiple log entries in a single transaction func (b *LogBuffer) StoreBatch(entries []BufferedLog) error { tx, err := b.db.Begin() if err != nil { return err } defer tx.Rollback() stmt, err := tx.Prepare("INSERT INTO buffered_logs (service, message) VALUES (?, ?)") if err != nil { return err } defer stmt.Close() for _, entry := range entries { if _, err := stmt.Exec(entry.Service, entry.Message); err != nil { return err } } return tx.Commit() } // GetPending retrieves pending logs in order of arrival, limited to batchSize func (b *LogBuffer) GetPending(batchSize int) ([]BufferedLog, error) { rows, err := b.db.Query( "SELECT id, service, message, created_at FROM buffered_logs ORDER BY created_at ASC LIMIT ?", batchSize, ) if err != nil { return nil, err } defer rows.Close() var logs []BufferedLog for rows.Next() { var log BufferedLog var createdAt string if err := rows.Scan(&log.ID, &log.Service, &log.Message, &createdAt); err != nil { return nil, err } log.CreatedAt, _ = time.Parse("2006-01-02 15:04:05", createdAt) logs = append(logs, log) } return logs, rows.Err() } // Delete removes a log entry from the buffer after successful delivery func (b *LogBuffer) Delete(id int64) error { _, err := b.db.Exec("DELETE FROM buffered_logs WHERE id = ?", id) return err } // DeleteBatch removes multiple log entries after successful delivery func (b *LogBuffer) DeleteBatch(ids []int64) error { if len(ids) == 0 { return nil } tx, err := b.db.Begin() if err != nil { return err } defer tx.Rollback() for _, id := range ids { if _, err := tx.Exec("DELETE FROM buffered_logs WHERE id = ?", id); err != nil { return err } } return tx.Commit() } // Count returns the number of buffered logs func (b *LogBuffer) Count() (int, error) { var count int err := b.db.QueryRow("SELECT COUNT(*) FROM buffered_logs").Scan(&count) return count, err } // Clear removes all buffered logs func (b *LogBuffer) Clear() error { _, err := b.db.Exec("DELETE FROM buffered_logs") return err } // FlushToJSON exports buffered logs to JSON format for debugging func (b *LogBuffer) FlushToJSON() ([]byte, error) { logs, err := b.GetPending(1000) if err != nil { return nil, err } return json.MarshalIndent(logs, "", " ") }