package collector import ( "fmt" "io" "log" "sync" "time" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/repository" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/storage" "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto/proto" "google.golang.org/grpc/metadata" ) type Collector struct { proto.UnimplementedCollectorServer logRepo *repository.LogRepository agents map[string]*Agent mu sync.RWMutex batchSize int flushInterval time.Duration } type Agent struct { ID string Label string Services []string ConnectedAt time.Time } func New(logRepo *repository.LogRepository) *Collector { return &Collector{ logRepo: logRepo, agents: make(map[string]*Agent), batchSize: 100, flushInterval: 2 * time.Second, } } func (c *Collector) Stream(stream proto.Collector_StreamServer) error { md, ok := metadata.FromIncomingContext(stream.Context()) if !ok { return fmt.Errorf("no metadata in context") } whoamiVals := md["whoami"] if len(whoamiVals) == 0 { return fmt.Errorf("whoami metadata missing") } agentName := whoamiVals[0] serviceVals := md["service"] if len(serviceVals) == 0 { return fmt.Errorf("service metadata missing") } service := serviceVals[0] servicesVals := md["services"] var services []string if len(servicesVals) > 0 { services = servicesVals } // Register agent c.mu.Lock() c.agents[agentName] = &Agent{ ID: agentName, Label: agentName, Services: services, ConnectedAt: time.Now(), } c.mu.Unlock() defer func() { c.mu.Lock() delete(c.agents, agentName) c.mu.Unlock() }() log.Printf("Agent %s connected, streaming logs for service: %s", agentName, service) // If no ClickHouse, just consume the stream without storing if !c.logRepo.IsConnected() { log.Printf( "Warning: ClickHouse not connected yet, consuming logs without storing for agent %s", agentName, ) for { _, err := stream.Recv() if err == io.EOF { return nil } if err != nil { return fmt.Errorf("failed to receive: %w", err) } } } // Channels for communication with recv goroutine recvCh := make(chan *proto.CollectorRequest, 1) errCh := make(chan error, 1) // Goroutine that blocks on Recv go func() { for { req, err := stream.Recv() if err != nil { errCh <- err return } recvCh <- req } }() // Buffer for batch inserts var batch []storage.LogEntry ticker := time.NewTicker(c.flushInterval) defer ticker.Stop() flush := func() error { if len(batch) == 0 { return nil } if err := c.logRepo.InsertBatch(stream.Context(), batch); err != nil { log.Printf( "Failed to insert batch for agent %s, service %s: %v", agentName, service, err, ) return err } log.Printf("Flushed %d logs for agent %s, service %s", len(batch), agentName, service) batch = batch[:0] return nil } for { select { case <-stream.Context().Done(): // Context cancelled, flush remaining _ = flush() return stream.Context().Err() case <-ticker.C: if err := flush(); err != nil { return err } case req := <-recvCh: batch = append(batch, storage.LogEntry{ Timestamp: time.Now(), Level: "info", Service: service, Agent: agentName, Message: req.Message, }) if len(batch) >= c.batchSize { if err := flush(); err != nil { return err } } case err := <-errCh: if err == io.EOF { // Client closed stream return flush() } return fmt.Errorf("failed to receive: %w", err) } } } func (c *Collector) GetAgent(name string) (*Agent, bool) { c.mu.RLock() defer c.mu.RUnlock() a, ok := c.agents[name] return a, ok } func (c *Collector) Agents() []*Agent { c.mu.RLock() defer c.mu.RUnlock() result := make([]*Agent, 0, len(c.agents)) for _, a := range c.agents { result = append(result, a) } return result } // ServicesStream handles the ServicesUpdate client-streaming RPC. // Agents send service status updates which are stored in the collector. // Returns a single response when the agent closes the stream. func (c *Collector) ServicesStream(stream proto.Collector_ServicesStreamServer) error { md, ok := metadata.FromIncomingContext(stream.Context()) if !ok { return fmt.Errorf("no metadata in context") } whoamiVals := md["whoami"] if len(whoamiVals) == 0 { return fmt.Errorf("whoami metadata missing") } agentName := whoamiVals[0] log.Printf("Agent %s started services update stream", agentName) for { update, err := stream.Recv() if err == io.EOF { log.Printf("Agent %s finished services update stream", agentName) return stream.SendAndClose(&proto.ServicesUpdateResp{}) } if err != nil { return fmt.Errorf("failed to receive services update: %w", err) } c.mu.Lock() if agent, ok := c.agents[agentName]; ok { services := make([]string, 0, len(update.Services)) for _, s := range update.Services { services = append(services, fmt.Sprintf("%s:%s", s.Name, s.Status)) } agent.Services = services log.Printf("Updated services for agent %s: %v", agentName, agent.Services) } else { log.Printf("Warning: received services update for unknown agent %s", agentName) } c.mu.Unlock() } }