Files
HellreigN/backend/internal/grpcsrv/collector/tracker.go
T
2026-04-05 05:38:03 +03:00

166 lines
4.1 KiB
Go

package collector
import (
"context"
"log"
"sync"
"time"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
)
// ConnTracker tracks connected Collector agents and handles cleanup on disconnect.
// It implements grpc.StatsHandler for disconnect detection.
type ConnTracker struct {
mu sync.RWMutex
agents map[string]*Agent
}
func NewConnTracker() *ConnTracker {
return &ConnTracker{
agents: make(map[string]*Agent),
}
}
// Register adds an agent to the tracker. Called by Collector.Stream().
func (t *ConnTracker) Register(agent *Agent) {
t.mu.Lock()
t.agents[agent.ID] = agent
t.mu.Unlock()
log.Printf("[collector] agent registered: %s", agent.ID)
}
// Unregister removes an agent from the tracker.
func (t *ConnTracker) Unregister(id string) {
t.mu.Lock()
delete(t.agents, id)
t.mu.Unlock()
log.Printf("[collector] agent unregistered: %s", id)
}
// GetAgent returns the agent for the given ID.
func (t *ConnTracker) GetAgent(id string) (*Agent, bool) {
t.mu.RLock()
defer t.mu.RUnlock()
a, ok := t.agents[id]
return a, ok
}
// Agents returns all connected agents.
func (t *ConnTracker) Agents() []*Agent {
t.mu.RLock()
defer t.mu.RUnlock()
result := make([]*Agent, 0, len(t.agents))
for _, a := range t.agents {
result = append(result, a)
}
return result
}
// grpc.StatsHandler implementation.
func (t *ConnTracker) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
return ctx
}
func (t *ConnTracker) HandleRPC(ctx context.Context, _ stats.RPCStats) {}
func (t *ConnTracker) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}
func (t *ConnTracker) HandleConn(ctx context.Context, s stats.ConnStats) {
switch s.(type) {
case *stats.ConnEnd:
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return
}
whoamiVals := md["whoami"]
if len(whoamiVals) == 0 {
return
}
t.Unregister(whoamiVals[0])
}
}
// UpdateServices updates the services list for the given agent.
func (t *ConnTracker) UpdateServices(id string, services []Service) bool {
t.mu.Lock()
defer t.mu.Unlock()
agent, ok := t.agents[id]
if !ok {
return false
}
agent.Services = services
return true
}
// UpdateSystemMetrics updates the system metrics for the given agent.
func (t *ConnTracker) UpdateSystemMetrics(id string, metrics SystemMetrics) bool {
t.mu.Lock()
defer t.mu.Unlock()
agent, ok := t.agents[id]
if !ok {
return false
}
agent.SystemMetrics = metrics
return true
}
// GetSystemMetrics returns system metrics for all connected agents.
func (t *ConnTracker) GetSystemMetrics() map[string]AgentMetricsInfo {
t.mu.RLock()
defer t.mu.RUnlock()
result := make(map[string]AgentMetricsInfo)
for id, agent := range t.agents {
result[id] = AgentMetricsInfo{
ID: id,
Label: agent.Label,
ConnectedAt: agent.ConnectedAt,
CPUPercent: agent.SystemMetrics.CPUPercent,
MemoryPercent: agent.SystemMetrics.MemoryPercent,
DiskPercent: agent.SystemMetrics.DiskPercent,
NetworkRxBytes: agent.SystemMetrics.NetworkRxBytes,
NetworkTxBytes: agent.SystemMetrics.NetworkTxBytes,
}
}
return result
}
// Service represents a named service with its current status.
type Service struct {
Name, Status string
}
// SystemMetrics represents system resource metrics.
type SystemMetrics struct {
CPUPercent float64
MemoryPercent float64
DiskPercent float64
NetworkRxBytes float64
NetworkTxBytes float64
}
// AgentMetricsInfo contains agent info with its system metrics.
type AgentMetricsInfo struct {
ID string `json:"id"`
Label string `json:"label"`
ConnectedAt time.Time `json:"connected_at"`
CPUPercent float64 `json:"cpu_percent"`
MemoryPercent float64 `json:"memory_percent"`
DiskPercent float64 `json:"disk_percent"`
NetworkRxBytes float64 `json:"network_rx_bytes"`
NetworkTxBytes float64 `json:"network_tx_bytes"`
}
// Agent represents a connected agent streaming logs to the collector.
type Agent struct {
ID string
Label string
Services []Service
SystemMetrics SystemMetrics
ConnectedAt time.Time
}