162 lines
3.7 KiB
Go
162 lines
3.7 KiB
Go
package buffer
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
_ "modernc.org/sqlite"
|
|
)
|
|
|
|
// BufferedLog represents a log entry stored for later delivery
|
|
type BufferedLog struct {
|
|
ID int64
|
|
Service string
|
|
Message string
|
|
CreatedAt time.Time
|
|
}
|
|
|
|
// LogBuffer provides SQLite-backed log buffering
|
|
type LogBuffer struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
// NewLogBuffer creates a new log buffer with the given database path
|
|
func NewLogBuffer(dbPath string) (*LogBuffer, error) {
|
|
db, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL&_busy_timeout=5000")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open database: %w", err)
|
|
}
|
|
|
|
// Create table if not exists
|
|
_, err = db.Exec(`
|
|
CREATE TABLE IF NOT EXISTS buffered_logs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
service TEXT NOT NULL,
|
|
message TEXT NOT NULL,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
`)
|
|
if err != nil {
|
|
_ = db.Close()
|
|
return nil, fmt.Errorf("failed to create table: %w", err)
|
|
}
|
|
|
|
// Create index for efficient ordering
|
|
_, _ = db.Exec(`CREATE INDEX IF NOT EXISTS idx_created_at ON buffered_logs(created_at ASC)`)
|
|
|
|
return &LogBuffer{db: db}, nil
|
|
}
|
|
|
|
// Close closes the database connection
|
|
func (b *LogBuffer) Close() error {
|
|
return b.db.Close()
|
|
}
|
|
|
|
// Store stores a log entry in the buffer
|
|
func (b *LogBuffer) Store(service, message string) error {
|
|
_, err := b.db.Exec(
|
|
"INSERT INTO buffered_logs (service, message) VALUES (?, ?)",
|
|
service, message,
|
|
)
|
|
return err
|
|
}
|
|
|
|
// StoreBatch stores multiple log entries in a single transaction
|
|
func (b *LogBuffer) StoreBatch(entries []BufferedLog) error {
|
|
tx, err := b.db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
stmt, err := tx.Prepare("INSERT INTO buffered_logs (service, message) VALUES (?, ?)")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer stmt.Close()
|
|
|
|
for _, entry := range entries {
|
|
if _, err := stmt.Exec(entry.Service, entry.Message); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
// GetPending retrieves pending logs in order of arrival, limited to batchSize
|
|
func (b *LogBuffer) GetPending(batchSize int) ([]BufferedLog, error) {
|
|
rows, err := b.db.Query(
|
|
"SELECT id, service, message, created_at FROM buffered_logs ORDER BY created_at ASC LIMIT ?",
|
|
batchSize,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var logs []BufferedLog
|
|
for rows.Next() {
|
|
var log BufferedLog
|
|
var createdAt string
|
|
if err := rows.Scan(&log.ID, &log.Service, &log.Message, &createdAt); err != nil {
|
|
return nil, err
|
|
}
|
|
log.CreatedAt, _ = time.Parse("2006-01-02 15:04:05", createdAt)
|
|
logs = append(logs, log)
|
|
}
|
|
|
|
return logs, rows.Err()
|
|
}
|
|
|
|
// Delete removes a log entry from the buffer after successful delivery
|
|
func (b *LogBuffer) Delete(id int64) error {
|
|
_, err := b.db.Exec("DELETE FROM buffered_logs WHERE id = ?", id)
|
|
return err
|
|
}
|
|
|
|
// DeleteBatch removes multiple log entries after successful delivery
|
|
func (b *LogBuffer) DeleteBatch(ids []int64) error {
|
|
if len(ids) == 0 {
|
|
return nil
|
|
}
|
|
|
|
tx, err := b.db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
for _, id := range ids {
|
|
if _, err := tx.Exec("DELETE FROM buffered_logs WHERE id = ?", id); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
// Count returns the number of buffered logs
|
|
func (b *LogBuffer) Count() (int, error) {
|
|
var count int
|
|
err := b.db.QueryRow("SELECT COUNT(*) FROM buffered_logs").Scan(&count)
|
|
return count, err
|
|
}
|
|
|
|
// Clear removes all buffered logs
|
|
func (b *LogBuffer) Clear() error {
|
|
_, err := b.db.Exec("DELETE FROM buffered_logs")
|
|
return err
|
|
}
|
|
|
|
// FlushToJSON exports buffered logs to JSON format for debugging
|
|
func (b *LogBuffer) FlushToJSON() ([]byte, error) {
|
|
logs, err := b.GetPending(1000)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return json.MarshalIndent(logs, "", " ")
|
|
}
|