refactor(internal/storage/RequestWriter/WriteReq): wtf is that error handling
This commit is contained in:
@@ -1,6 +1,9 @@
|
|||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -14,29 +17,32 @@ func WriteReq(db *RequestWriter, resultCh <-chan *LogEntry) {
|
|||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
flush := func() {
|
flush := func() {
|
||||||
|
defer db.logger.Debug("Flushed batch", "count", len(batch))
|
||||||
|
err := func() (err error) {
|
||||||
if len(batch) == 0 {
|
if len(batch) == 0 {
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
tx, err := db.db.Begin()
|
tx, err := db.db.Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
db.logger.Error("Failed to begin transaction", "error", err)
|
return fmt.Errorf("Failed to begin transaction: %w", err)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
defer func() {
|
||||||
|
if rollbackErr := tx.Rollback(); rollbackErr != nil && !errors.Is(rollbackErr, sql.ErrTxDone) {
|
||||||
|
err = errors.Join(err, fmt.Errorf("Failed to rollback transaction: %w", rollbackErr))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
stmt, err := tx.Prepare(
|
stmt, err := tx.Prepare(
|
||||||
"INSERT INTO requests (service, ip, path, method, status, created_at) VALUES (?, ?, ?, ?, ?, ?)",
|
"INSERT INTO requests (service, ip, path, method, status, created_at) VALUES (?, ?, ?, ?, ?, ?)",
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
db.logger.Error("Failed to prepare statement", "error", err)
|
err = fmt.Errorf("Failed to prepare statement: %w", err)
|
||||||
if rollbackErr := tx.Rollback(); rollbackErr != nil {
|
return err
|
||||||
db.logger.Error("Failed to rollback transaction", "error", rollbackErr)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
if closeErr := stmt.Close(); closeErr != nil {
|
if closeErr := stmt.Close(); closeErr != nil {
|
||||||
db.logger.Error("Failed to close statement", "error", closeErr)
|
err = errors.Join(err, fmt.Errorf("Failed to close statement: %w", closeErr))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@@ -50,17 +56,20 @@ func WriteReq(db *RequestWriter, resultCh <-chan *LogEntry) {
|
|||||||
time.Now().Format(time.RFC3339),
|
time.Now().Format(time.RFC3339),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
db.logger.Error("Failed to insert entry", "error", err)
|
db.logger.Error(fmt.Errorf("Failed to insert entry: %w", err).Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := tx.Commit(); err != nil {
|
if err := tx.Commit(); err != nil {
|
||||||
db.logger.Error("Failed to commit transaction", "error", err)
|
return fmt.Errorf("Failed to commit transaction: %w", err)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
db.logger.Debug("Flushed batch", "count", len(batch))
|
|
||||||
batch = batch[:0]
|
batch = batch[:0]
|
||||||
|
return err
|
||||||
|
}()
|
||||||
|
if err != nil {
|
||||||
|
db.logger.Error(err.Error())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
|||||||
Reference in New Issue
Block a user