189 lines
3.8 KiB
Go
189 lines
3.8 KiB
Go
package collector
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
|
|
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/repository"
|
|
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/storage"
|
|
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto/proto"
|
|
"google.golang.org/grpc/metadata"
|
|
)
|
|
|
|
type Collector struct {
|
|
proto.UnimplementedCollectorServer
|
|
logRepo *repository.LogRepository
|
|
agents map[string]*Agent
|
|
mu sync.RWMutex
|
|
batchSize int
|
|
flushInterval time.Duration
|
|
}
|
|
|
|
type Agent struct {
|
|
ID string
|
|
Label string
|
|
Services []string
|
|
ConnectedAt time.Time
|
|
}
|
|
|
|
func New(logRepo *repository.LogRepository) *Collector {
|
|
return &Collector{
|
|
logRepo: logRepo,
|
|
agents: make(map[string]*Agent),
|
|
batchSize: 100,
|
|
flushInterval: 2 * time.Second,
|
|
}
|
|
}
|
|
|
|
func (c *Collector) Stream(stream proto.Collector_StreamServer) error {
|
|
md, ok := metadata.FromIncomingContext(stream.Context())
|
|
if !ok {
|
|
return fmt.Errorf("no metadata in context")
|
|
}
|
|
|
|
whoamiVals := md["whoami"]
|
|
if len(whoamiVals) == 0 {
|
|
return fmt.Errorf("whoami metadata missing")
|
|
}
|
|
agentName := whoamiVals[0]
|
|
|
|
serviceVals := md["service"]
|
|
if len(serviceVals) == 0 {
|
|
return fmt.Errorf("service metadata missing")
|
|
}
|
|
service := serviceVals[0]
|
|
|
|
servicesVals := md["services"]
|
|
var services []string
|
|
if len(servicesVals) > 0 {
|
|
services = servicesVals
|
|
}
|
|
|
|
// Register agent
|
|
c.mu.Lock()
|
|
c.agents[agentName] = &Agent{
|
|
ID: agentName,
|
|
Label: agentName,
|
|
Services: services,
|
|
ConnectedAt: time.Now(),
|
|
}
|
|
c.mu.Unlock()
|
|
|
|
defer func() {
|
|
c.mu.Lock()
|
|
delete(c.agents, agentName)
|
|
c.mu.Unlock()
|
|
}()
|
|
|
|
log.Printf("Agent %s connected, streaming logs for service: %s", agentName, service)
|
|
|
|
// If no ClickHouse, just consume the stream without storing
|
|
if !c.logRepo.IsConnected() {
|
|
log.Printf(
|
|
"Warning: ClickHouse not connected yet, consuming logs without storing for agent %s",
|
|
agentName,
|
|
)
|
|
for {
|
|
_, err := stream.Recv()
|
|
if err == io.EOF {
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("failed to receive: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Channels for communication with recv goroutine
|
|
recvCh := make(chan *proto.CollectorRequest, 1)
|
|
errCh := make(chan error, 1)
|
|
|
|
// Goroutine that blocks on Recv
|
|
go func() {
|
|
for {
|
|
req, err := stream.Recv()
|
|
if err != nil {
|
|
errCh <- err
|
|
return
|
|
}
|
|
recvCh <- req
|
|
}
|
|
}()
|
|
|
|
// Buffer for batch inserts
|
|
var batch []storage.LogEntry
|
|
ticker := time.NewTicker(c.flushInterval)
|
|
defer ticker.Stop()
|
|
|
|
flush := func() error {
|
|
if len(batch) == 0 {
|
|
return nil
|
|
}
|
|
if err := c.logRepo.InsertBatch(stream.Context(), batch); err != nil {
|
|
log.Printf(
|
|
"Failed to insert batch for agent %s, service %s: %v",
|
|
agentName,
|
|
service,
|
|
err,
|
|
)
|
|
return err
|
|
}
|
|
log.Printf("Flushed %d logs for agent %s, service %s", len(batch), agentName, service)
|
|
batch = batch[:0]
|
|
return nil
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-stream.Context().Done():
|
|
// Context cancelled, flush remaining
|
|
_ = flush()
|
|
return stream.Context().Err()
|
|
case <-ticker.C:
|
|
if err := flush(); err != nil {
|
|
return err
|
|
}
|
|
case req := <-recvCh:
|
|
batch = append(batch, storage.LogEntry{
|
|
Timestamp: time.Now(),
|
|
Level: "info",
|
|
Service: service,
|
|
Agent: agentName,
|
|
Message: req.Message,
|
|
})
|
|
|
|
if len(batch) >= c.batchSize {
|
|
if err := flush(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
case err := <-errCh:
|
|
if err == io.EOF {
|
|
// Client closed stream
|
|
return flush()
|
|
}
|
|
return fmt.Errorf("failed to receive: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Collector) GetAgent(name string) (*Agent, bool) {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
a, ok := c.agents[name]
|
|
return a, ok
|
|
}
|
|
|
|
func (c *Collector) Agents() []*Agent {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
result := make([]*Agent, 0, len(c.agents))
|
|
for _, a := range c.agents {
|
|
result = append(result, a)
|
|
}
|
|
return result
|
|
}
|