package collector import ( "context" "log" "sync" "time" "google.golang.org/grpc/metadata" "google.golang.org/grpc/stats" ) // ConnTracker tracks connected Collector agents and handles cleanup on disconnect. // It implements grpc.StatsHandler for disconnect detection. type ConnTracker struct { mu sync.RWMutex agents map[string]*Agent } func NewConnTracker() *ConnTracker { return &ConnTracker{ agents: make(map[string]*Agent), } } // Register adds an agent to the tracker. Called by Collector.Stream(). func (t *ConnTracker) Register(agent *Agent) { t.mu.Lock() t.agents[agent.ID] = agent t.mu.Unlock() log.Printf("[collector] agent registered: %s", agent.ID) } // Unregister removes an agent from the tracker. func (t *ConnTracker) Unregister(id string) { t.mu.Lock() delete(t.agents, id) t.mu.Unlock() log.Printf("[collector] agent unregistered: %s", id) } // GetAgent returns the agent for the given ID. func (t *ConnTracker) GetAgent(id string) (*Agent, bool) { t.mu.RLock() defer t.mu.RUnlock() a, ok := t.agents[id] return a, ok } // Agents returns all connected agents. func (t *ConnTracker) Agents() []*Agent { t.mu.RLock() defer t.mu.RUnlock() result := make([]*Agent, 0, len(t.agents)) for _, a := range t.agents { result = append(result, a) } return result } // grpc.StatsHandler implementation. func (t *ConnTracker) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context { return ctx } func (t *ConnTracker) HandleRPC(ctx context.Context, _ stats.RPCStats) {} func (t *ConnTracker) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { return ctx } func (t *ConnTracker) HandleConn(ctx context.Context, s stats.ConnStats) { switch s.(type) { case *stats.ConnEnd: md, ok := metadata.FromIncomingContext(ctx) if !ok { return } whoamiVals := md["whoami"] if len(whoamiVals) == 0 { return } t.Unregister(whoamiVals[0]) } } // UpdateServices updates the services list for the given agent. func (t *ConnTracker) UpdateServices(id string, services []Service) bool { t.mu.Lock() defer t.mu.Unlock() agent, ok := t.agents[id] if !ok { return false } agent.Services = services return true } // UpdateSystemMetrics updates the system metrics for the given agent. func (t *ConnTracker) UpdateSystemMetrics(id string, metrics SystemMetrics) bool { t.mu.Lock() defer t.mu.Unlock() agent, ok := t.agents[id] if !ok { return false } agent.SystemMetrics = metrics return true } // GetSystemMetrics returns system metrics for all connected agents. func (t *ConnTracker) GetSystemMetrics() map[string]AgentMetricsInfo { t.mu.RLock() defer t.mu.RUnlock() result := make(map[string]AgentMetricsInfo) for id, agent := range t.agents { result[id] = AgentMetricsInfo{ ID: id, Label: agent.Label, ConnectedAt: agent.ConnectedAt, CPUPercent: agent.SystemMetrics.CPUPercent, MemoryPercent: agent.SystemMetrics.MemoryPercent, DiskPercent: agent.SystemMetrics.DiskPercent, NetworkRxBytes: agent.SystemMetrics.NetworkRxBytes, NetworkTxBytes: agent.SystemMetrics.NetworkTxBytes, } } return result } // Service represents a named service with its current status. type Service struct { Name, Status string } // SystemMetrics represents system resource metrics. type SystemMetrics struct { CPUPercent float64 MemoryPercent float64 DiskPercent float64 NetworkRxBytes float64 NetworkTxBytes float64 } // AgentMetricsInfo contains agent info with its system metrics. type AgentMetricsInfo struct { ID string `json:"id"` Label string `json:"label"` ConnectedAt time.Time `json:"connected_at"` CPUPercent float64 `json:"cpu_percent"` MemoryPercent float64 `json:"memory_percent"` DiskPercent float64 `json:"disk_percent"` NetworkRxBytes float64 `json:"network_rx_bytes"` NetworkTxBytes float64 `json:"network_tx_bytes"` } // Agent represents a connected agent streaming logs to the collector. type Agent struct { ID string Label string Services []Service SystemMetrics SystemMetrics ConnectedAt time.Time }