package collector import ( "fmt" "io" "log" "time" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/repository" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/storage" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto/proto" "google.golang.org/grpc/metadata" ) // Collector handles log streaming from connected agents. type Collector struct { proto.UnimplementedCollectorServer logRepo *repository.LogRepository tracker *ConnTracker batchSize int flushInterval time.Duration } func New(logRepo *repository.LogRepository, tracker *ConnTracker) *Collector { return &Collector{ logRepo: logRepo, tracker: tracker, batchSize: 100, flushInterval: 2 * time.Second, } } func (c *Collector) Stream(stream proto.Collector_StreamServer) error { md, ok := metadata.FromIncomingContext(stream.Context()) if !ok { return fmt.Errorf("no metadata in context") } whoamiVals := md["whoami"] if len(whoamiVals) == 0 { return fmt.Errorf("whoami metadata missing") } agentName := whoamiVals[0] serviceVals := md["service"] if len(serviceVals) == 0 { return fmt.Errorf("service metadata missing") } service := serviceVals[0] agent := &Agent{ ID: agentName, Label: agentName, Services: make([]Service, 0), ConnectedAt: time.Now(), } c.tracker.Register(agent) defer c.tracker.Unregister(agent.ID) log.Printf("Agent %s connected, streaming logs for service: %s", agentName, service) // If no ClickHouse, just consume the stream without storing if !c.logRepo.IsConnected() { log.Printf( "Warning: ClickHouse not connected yet, consuming logs without storing for agent %s", agentName, ) for { _, err := stream.Recv() if err == io.EOF { return nil } if err != nil { return fmt.Errorf("failed to receive: %w", err) } } } // Channels for communication with recv goroutine recvCh := make(chan *proto.CollectorRequest, 1) errCh := make(chan error, 1) // Goroutine that blocks on Recv go func() { for { req, err := stream.Recv() if err != nil { errCh <- err return } recvCh <- req } }() // Buffer for batch inserts var batch []storage.LogEntry ticker := time.NewTicker(c.flushInterval) defer ticker.Stop() flush := func() error { if len(batch) == 0 { return nil } if err := c.logRepo.InsertBatch(stream.Context(), batch); err != nil { log.Printf( "Failed to insert batch for agent %s, service %s: %v", agentName, service, err, ) return err } log.Printf("Flushed %d logs for agent %s, service %s", len(batch), agentName, service) batch = batch[:0] return nil } for { select { case <-stream.Context().Done(): _ = flush() return stream.Context().Err() case <-ticker.C: if err := flush(); err != nil { return err } case req := <-recvCh: batch = append(batch, storage.LogEntry{ Timestamp: time.Now(), Level: "info", Service: service, Agent: agentName, Message: req.Message, }) if len(batch) >= c.batchSize { if err := flush(); err != nil { return err } } case err := <-errCh: if err == io.EOF { return flush() } return fmt.Errorf("failed to receive: %w", err) } } } // GetAgent delegates to the tracker. func (c *Collector) GetAgent(name string) (*Agent, bool) { return c.tracker.GetAgent(name) } // Agents delegates to the tracker. func (c *Collector) Agents() []*Agent { return c.tracker.Agents() }