This commit is contained in:
@@ -81,8 +81,8 @@ func (c *Collector) Stream(stream proto.Collector_StreamServer) error {
|
||||
log.Printf("Agent %s connected, streaming logs for service: %s", agentName, service)
|
||||
|
||||
// If no ClickHouse, just consume the stream without storing
|
||||
if c.logRepo == nil {
|
||||
log.Printf("Warning: logRepo is nil, consuming logs without storing for agent %s", agentName)
|
||||
if !c.logRepo.IsConnected() {
|
||||
log.Printf("Warning: ClickHouse not connected yet, consuming logs without storing for agent %s", agentName)
|
||||
for {
|
||||
_, err := stream.Recv()
|
||||
if err == io.EOF {
|
||||
|
||||
@@ -25,6 +25,7 @@ type AgentInfo struct {
|
||||
// @Summary Get connected agents
|
||||
// @Description Returns a list of all agents currently connected via Collector (log streaming)
|
||||
// @Tags agents
|
||||
// @Security Bearer
|
||||
// @Produce json
|
||||
// @Success 200 {array} AgentInfo
|
||||
// @Router /agents [get]
|
||||
|
||||
@@ -33,6 +33,7 @@ type InsertLogRequest struct {
|
||||
// @Produce json
|
||||
// @Param body body InsertLogRequest true "Log entry"
|
||||
// @Success 201 {object} map[string]string
|
||||
// @Security Bearer
|
||||
// @Router /logs [post]
|
||||
func (lh *LogHandlers) Insert(c *gin.Context) {
|
||||
var req InsertLogRequest
|
||||
@@ -72,6 +73,7 @@ type InsertLogsRequest struct {
|
||||
// @Produce json
|
||||
// @Param body body InsertLogsRequest true "Log entries"
|
||||
// @Success 201 {object} map[string]string
|
||||
// @Security Bearer
|
||||
// @Router /logs/batch [post]
|
||||
func (lh *LogHandlers) InsertBatch(c *gin.Context) {
|
||||
var req InsertLogsRequest
|
||||
@@ -124,6 +126,7 @@ type SearchLogsRequest struct {
|
||||
// @Param limit query int false "Limit results" default(100)
|
||||
// @Param offset query int false "Offset results" default(0)
|
||||
// @Success 200 {array} storage.LogEntry
|
||||
// @Security Bearer
|
||||
// @Router /logs [get]
|
||||
func (lh *LogHandlers) Search(c *gin.Context) {
|
||||
var req SearchLogsRequest
|
||||
@@ -170,6 +173,7 @@ func (lh *LogHandlers) Search(c *gin.Context) {
|
||||
// @Tags logs
|
||||
// @Produce json
|
||||
// @Success 200 {array} string
|
||||
// @Security Bearer
|
||||
// @Router /logs/services [get]
|
||||
func (lh *LogHandlers) GetServices(c *gin.Context) {
|
||||
services, err := lh.LogRepo.GetDistinctServices(c.Request.Context())
|
||||
@@ -190,6 +194,7 @@ func (lh *LogHandlers) GetServices(c *gin.Context) {
|
||||
// @Tags logs
|
||||
// @Produce json
|
||||
// @Success 200 {array} string
|
||||
// @Security Bearer
|
||||
// @Router /logs/agents [get]
|
||||
func (lh *LogHandlers) GetAgents(c *gin.Context) {
|
||||
agents, err := lh.LogRepo.GetDistinctAgents(c.Request.Context())
|
||||
@@ -210,6 +215,7 @@ func (lh *LogHandlers) GetAgents(c *gin.Context) {
|
||||
// @Tags logs
|
||||
// @Produce json
|
||||
// @Success 200 {array} string
|
||||
// @Security Bearer
|
||||
// @Router /logs/levels [get]
|
||||
func (lh *LogHandlers) GetLevels(c *gin.Context) {
|
||||
levels, err := lh.LogRepo.GetDistinctLevels(c.Request.Context())
|
||||
|
||||
@@ -2,44 +2,85 @@ package repository
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/storage"
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||
)
|
||||
|
||||
type LogRepository struct {
|
||||
Conn driver.Conn
|
||||
mu sync.RWMutex
|
||||
DB *sql.DB
|
||||
}
|
||||
|
||||
func NewLogRepository(conn driver.Conn) *LogRepository {
|
||||
return &LogRepository{Conn: conn}
|
||||
func NewLogRepository() *LogRepository {
|
||||
return &LogRepository{}
|
||||
}
|
||||
|
||||
func (r *LogRepository) SetDB(db *sql.DB) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
r.DB = db
|
||||
}
|
||||
|
||||
func (r *LogRepository) IsConnected() bool {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
return r.DB != nil
|
||||
}
|
||||
|
||||
func (r *LogRepository) getDB() *sql.DB {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
return r.DB
|
||||
}
|
||||
|
||||
func (r *LogRepository) Init(ctx context.Context) error {
|
||||
return r.Conn.Exec(ctx, storage.CreateLogsTable)
|
||||
db := r.getDB()
|
||||
if db == nil {
|
||||
return nil
|
||||
}
|
||||
_, err := db.ExecContext(ctx, storage.CreateLogsTable)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *LogRepository) Insert(ctx context.Context, log storage.LogEntry) error {
|
||||
return r.Conn.Exec(ctx, `
|
||||
db := r.getDB()
|
||||
if db == nil {
|
||||
return nil
|
||||
}
|
||||
_, err := db.ExecContext(ctx, `
|
||||
INSERT INTO logs (timestamp, level, service, agent, message)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
`, log.Timestamp, log.Level, log.Service, log.Agent, log.Message)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *LogRepository) InsertBatch(ctx context.Context, logs []storage.LogEntry) error {
|
||||
batch, err := r.Conn.PrepareBatch(ctx, "INSERT INTO logs (timestamp, level, service, agent, message)")
|
||||
if err != nil {
|
||||
return err
|
||||
db := r.getDB()
|
||||
if db == nil {
|
||||
return nil
|
||||
}
|
||||
if len(logs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, log := range logs {
|
||||
if err := batch.Append(log.Timestamp, log.Level, log.Service, log.Agent, log.Message); err != nil {
|
||||
return err
|
||||
// Build multi-row INSERT statement
|
||||
query := "INSERT INTO logs (timestamp, level, service, agent, message) VALUES "
|
||||
args := make([]interface{}, 0, len(logs)*5)
|
||||
for i, log := range logs {
|
||||
if i > 0 {
|
||||
query += ", "
|
||||
}
|
||||
query += fmt.Sprintf("($%d, $%d, $%d, $%d, $%d)",
|
||||
i*5+1, i*5+2, i*5+3, i*5+4, i*5+5)
|
||||
args = append(args, log.Timestamp, log.Level, log.Service, log.Agent, log.Message)
|
||||
}
|
||||
|
||||
return batch.Send()
|
||||
_, err := db.ExecContext(ctx, query, args...)
|
||||
return err
|
||||
}
|
||||
|
||||
type LogFilter struct {
|
||||
@@ -53,6 +94,11 @@ type LogFilter struct {
|
||||
}
|
||||
|
||||
func (r *LogRepository) Search(ctx context.Context, filter LogFilter) ([]storage.LogEntry, error) {
|
||||
db := r.getDB()
|
||||
if db == nil {
|
||||
return []storage.LogEntry{}, nil
|
||||
}
|
||||
|
||||
query := "SELECT timestamp, level, service, agent, message FROM logs WHERE 1=1"
|
||||
args := make([]interface{}, 0)
|
||||
argIdx := 1
|
||||
@@ -102,13 +148,13 @@ func (r *LogRepository) Search(ctx context.Context, filter LogFilter) ([]storage
|
||||
args = append(args, filter.Offset)
|
||||
}
|
||||
|
||||
rows, err := r.Conn.Query(ctx, query, args...)
|
||||
rows, err := db.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var logs []storage.LogEntry
|
||||
logs := make([]storage.LogEntry, 0)
|
||||
for rows.Next() {
|
||||
var log storage.LogEntry
|
||||
if err := rows.Scan(&log.Timestamp, &log.Level, &log.Service, &log.Agent, &log.Message); err != nil {
|
||||
@@ -121,13 +167,17 @@ func (r *LogRepository) Search(ctx context.Context, filter LogFilter) ([]storage
|
||||
}
|
||||
|
||||
func (r *LogRepository) GetDistinctServices(ctx context.Context) ([]string, error) {
|
||||
rows, err := r.Conn.Query(ctx, "SELECT DISTINCT service FROM logs ORDER BY service")
|
||||
db := r.getDB()
|
||||
if db == nil {
|
||||
return []string{}, nil
|
||||
}
|
||||
rows, err := db.QueryContext(ctx, "SELECT DISTINCT service FROM logs ORDER BY service")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var services []string
|
||||
services := make([]string, 0)
|
||||
for rows.Next() {
|
||||
var service string
|
||||
if err := rows.Scan(&service); err != nil {
|
||||
@@ -140,13 +190,17 @@ func (r *LogRepository) GetDistinctServices(ctx context.Context) ([]string, erro
|
||||
}
|
||||
|
||||
func (r *LogRepository) GetDistinctAgents(ctx context.Context) ([]string, error) {
|
||||
rows, err := r.Conn.Query(ctx, "SELECT DISTINCT agent FROM logs ORDER BY agent")
|
||||
db := r.getDB()
|
||||
if db == nil {
|
||||
return []string{}, nil
|
||||
}
|
||||
rows, err := db.QueryContext(ctx, "SELECT DISTINCT agent FROM logs ORDER BY agent")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var agents []string
|
||||
agents := make([]string, 0)
|
||||
for rows.Next() {
|
||||
var agent string
|
||||
if err := rows.Scan(&agent); err != nil {
|
||||
@@ -159,13 +213,17 @@ func (r *LogRepository) GetDistinctAgents(ctx context.Context) ([]string, error)
|
||||
}
|
||||
|
||||
func (r *LogRepository) GetDistinctLevels(ctx context.Context) ([]string, error) {
|
||||
rows, err := r.Conn.Query(ctx, "SELECT DISTINCT level FROM logs ORDER BY level")
|
||||
db := r.getDB()
|
||||
if db == nil {
|
||||
return []string{}, nil
|
||||
}
|
||||
rows, err := db.QueryContext(ctx, "SELECT DISTINCT level FROM logs ORDER BY level")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var levels []string
|
||||
levels := make([]string, 0)
|
||||
for rows.Next() {
|
||||
var level string
|
||||
if err := rows.Scan(&level); err != nil {
|
||||
|
||||
@@ -2,10 +2,12 @@ package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||
_ "github.com/ClickHouse/clickhouse-go/v2"
|
||||
)
|
||||
|
||||
type ClickHouseConfig struct {
|
||||
@@ -15,33 +17,46 @@ type ClickHouseConfig struct {
|
||||
Database string
|
||||
}
|
||||
|
||||
func OpenClickHouse(cfg ClickHouseConfig) (driver.Conn, error) {
|
||||
conn, err := clickhouse.Open(&clickhouse.Options{
|
||||
Addr: []string{cfg.Host},
|
||||
Auth: clickhouse.Auth{
|
||||
Database: cfg.Database,
|
||||
Username: cfg.User,
|
||||
Password: cfg.Password,
|
||||
},
|
||||
Settings: clickhouse.Settings{
|
||||
"max_execution_time": 60,
|
||||
},
|
||||
Compression: &clickhouse.Compression{
|
||||
Method: clickhouse.CompressionLZ4,
|
||||
},
|
||||
DialTimeout: 30,
|
||||
MaxOpenConns: 10,
|
||||
MaxIdleConns: 5,
|
||||
ConnMaxLifetime: 3600,
|
||||
ConnOpenStrategy: clickhouse.ConnOpenInOrder,
|
||||
})
|
||||
func OpenClickHouse(cfg ClickHouseConfig) (*sql.DB, error) {
|
||||
dsn := fmt.Sprintf("clickhouse://%s:%s@%s/%s",
|
||||
cfg.User, cfg.Password, cfg.Host, cfg.Database)
|
||||
|
||||
db, err := sql.Open("clickhouse", dsn)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("clickhouse connect: %w", err)
|
||||
return nil, fmt.Errorf("clickhouse open: %w", err)
|
||||
}
|
||||
|
||||
if err := conn.Ping(context.Background()); err != nil {
|
||||
db.SetMaxOpenConns(5)
|
||||
db.SetMaxIdleConns(2)
|
||||
db.SetConnMaxLifetime(10 * time.Minute)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if err := db.PingContext(ctx); err != nil {
|
||||
db.Close()
|
||||
return nil, fmt.Errorf("clickhouse ping: %w", err)
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
log.Printf("ClickHouse connected via database/sql: %s", cfg.Host)
|
||||
return db, nil
|
||||
}
|
||||
|
||||
// OpenClickHouseWithRetry attempts to connect to ClickHouse with retries and backoff.
|
||||
func OpenClickHouseWithRetry(cfg ClickHouseConfig, maxRetries int, initialDelay time.Duration) (*sql.DB, error) {
|
||||
var lastErr error
|
||||
delay := initialDelay
|
||||
|
||||
for i := 0; i < maxRetries; i++ {
|
||||
db, err := OpenClickHouse(cfg)
|
||||
if err == nil {
|
||||
return db, nil
|
||||
}
|
||||
lastErr = err
|
||||
log.Printf("ClickHouse connection attempt %d/%d failed: %v, retrying in %v...", i+1, maxRetries, err, delay)
|
||||
time.Sleep(delay)
|
||||
delay *= 2
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("clickhouse connection failed after %d attempts: %w", maxRetries, lastErr)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user