Files
BanForge/internal/storage/writer.go
d3m0k1d 341f49c4b4
Some checks failed
build / build (push) Failing after 2m48s
feat: improve db logic and logger(untested)
2026-01-21 20:44:28 +03:00

81 lines
1.5 KiB
Go

package storage
import (
"time"
)
func Write(db *DB, resultCh <-chan *LogEntry) {
const batchSize = 100
const flushInterval = 1 * time.Second
batch := make([]*LogEntry, 0, batchSize)
ticker := time.NewTicker(flushInterval)
defer ticker.Stop()
flush := func() {
if len(batch) == 0 {
return
}
tx, err := db.db.Begin()
if err != nil {
db.logger.Error("Failed to begin transaction", "error", err)
return
}
stmt, err := tx.Prepare(
"INSERT INTO requests (service, ip, path, method, status, created_at) VALUES (?, ?, ?, ?, ?, ?)")
if err != nil {
tx.Rollback()
db.logger.Error("Failed to prepare statement", "error", err)
return
}
defer func() {
err := stmt.Close()
if err != nil {
db.logger.Error("Failed to close statement", "error", err)
}
}()
for _, entry := range batch {
_, err := stmt.Exec(
entry.Service,
entry.IP,
entry.Path,
entry.Method,
entry.Status,
time.Now().Format(time.RFC3339),
)
if err != nil {
db.logger.Error("Failed to insert entry", "error", err)
}
}
if err := tx.Commit(); err != nil {
db.logger.Error("Failed to commit transaction", "error", err)
} else {
db.logger.Debug("Flushed batch", "count", len(batch))
}
batch = batch[:0]
}
for {
select {
case result, ok := <-resultCh:
if !ok {
flush()
return
}
batch = append(batch, result)
if len(batch) >= batchSize {
flush()
}
case <-ticker.C:
flush()
}
}
}