feat(backend): implement service monitor proto & connect it to http /agents
ci-agent / build (push) Has been cancelled
ci-agent / build (push) Has been cancelled
This commit is contained in:
@@ -186,3 +186,45 @@ func (c *Collector) Agents() []*Agent {
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// ServicesStream handles the ServicesUpdate client-streaming RPC.
|
||||
// Agents send service status updates which are stored in the collector.
|
||||
// Returns a single response when the agent closes the stream.
|
||||
func (c *Collector) ServicesStream(stream proto.Collector_ServicesStreamServer) error {
|
||||
md, ok := metadata.FromIncomingContext(stream.Context())
|
||||
if !ok {
|
||||
return fmt.Errorf("no metadata in context")
|
||||
}
|
||||
|
||||
whoamiVals := md["whoami"]
|
||||
if len(whoamiVals) == 0 {
|
||||
return fmt.Errorf("whoami metadata missing")
|
||||
}
|
||||
agentName := whoamiVals[0]
|
||||
|
||||
log.Printf("Agent %s started services update stream", agentName)
|
||||
|
||||
for {
|
||||
update, err := stream.Recv()
|
||||
if err == io.EOF {
|
||||
log.Printf("Agent %s finished services update stream", agentName)
|
||||
return stream.SendAndClose(&proto.ServicesUpdateResp{})
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to receive services update: %w", err)
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
if agent, ok := c.agents[agentName]; ok {
|
||||
services := make([]string, 0, len(update.Services))
|
||||
for _, s := range update.Services {
|
||||
services = append(services, fmt.Sprintf("%s:%s", s.Name, s.Status))
|
||||
}
|
||||
agent.Services = services
|
||||
log.Printf("Updated services for agent %s: %v", agentName, agent.Services)
|
||||
} else {
|
||||
log.Printf("Warning: received services update for unknown agent %s", agentName)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user