@@ -82,7 +82,10 @@ func (c *Collector) Stream(stream proto.Collector_StreamServer) error {
|
||||
|
||||
// If no ClickHouse, just consume the stream without storing
|
||||
if !c.logRepo.IsConnected() {
|
||||
log.Printf("Warning: ClickHouse not connected yet, consuming logs without storing for agent %s", agentName)
|
||||
log.Printf(
|
||||
"Warning: ClickHouse not connected yet, consuming logs without storing for agent %s",
|
||||
agentName,
|
||||
)
|
||||
for {
|
||||
_, err := stream.Recv()
|
||||
if err == io.EOF {
|
||||
@@ -120,7 +123,12 @@ func (c *Collector) Stream(stream proto.Collector_StreamServer) error {
|
||||
return nil
|
||||
}
|
||||
if err := c.logRepo.InsertBatch(stream.Context(), batch); err != nil {
|
||||
log.Printf("Failed to insert batch for agent %s, service %s: %v", agentName, service, err)
|
||||
log.Printf(
|
||||
"Failed to insert batch for agent %s, service %s: %v",
|
||||
agentName,
|
||||
service,
|
||||
err,
|
||||
)
|
||||
return err
|
||||
}
|
||||
log.Printf("Flushed %d logs for agent %s, service %s", len(batch), agentName, service)
|
||||
|
||||
@@ -95,7 +95,9 @@ func (self *Agent) WaitJob(jid int64) (*models.Job, error) {
|
||||
return &result.fc, result.err
|
||||
}
|
||||
|
||||
func (self *Commander) Stream(bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command]) error {
|
||||
func (self *Commander) Stream(
|
||||
bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command],
|
||||
) error {
|
||||
md, ok := metadata.FromIncomingContext(bidi.Context())
|
||||
if !ok {
|
||||
return fmt.Errorf("no metadata in context")
|
||||
@@ -164,7 +166,12 @@ func (self *Agent) send() error {
|
||||
// self.jobs[]
|
||||
}
|
||||
|
||||
func newAgent(bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command], jobber Jobber, aid string, label string) Agent {
|
||||
func newAgent(
|
||||
bidi grpc.BidiStreamingServer[proto.FinishedCommand, proto.Command],
|
||||
jobber Jobber,
|
||||
aid string,
|
||||
label string,
|
||||
) Agent {
|
||||
return Agent{
|
||||
bidi: bidi,
|
||||
in: make(chan *proto.Command),
|
||||
|
||||
Reference in New Issue
Block a user