From e721cff3f81f25c8af7907f9a21bf33744d32aab Mon Sep 17 00:00:00 2001 From: "zero@thinky" Date: Sat, 4 Apr 2026 15:15:33 +0300 Subject: [PATCH] refactor(agent): error handling --- agent/main.go | 164 ++++++++++++++++++++++++-------------------------- 1 file changed, 78 insertions(+), 86 deletions(-) diff --git a/agent/main.go b/agent/main.go index ae506c1..18f72ff 100644 --- a/agent/main.go +++ b/agent/main.go @@ -97,108 +97,100 @@ func main() { wg := &errgroup.Group{} + grpcAddr := cfg.GRPCURL + if grpcAddr == "" { + grpcAddr = cfg.BackendURL + } + grpcAddr = strings.TrimPrefix(grpcAddr, "http://") + grpcAddr = strings.TrimPrefix(grpcAddr, "https://") // Start command executor wg.Go(func() error { cmdexe := new(commander.CommandExecutor) ccli := client.New(cmdexe, cfg.Label, cfg.Label) - grpcAddr := cfg.GRPCURL - if grpcAddr == "" { - grpcAddr = cfg.BackendURL - } - grpcAddr = strings.TrimPrefix(grpcAddr, "http://") - grpcAddr = strings.TrimPrefix(grpcAddr, "https://") return ccli.HandleCommands(ctx, grpcAddr, creds) }) // Start log collectors if len(cfg.Services) > 0 { - grpcAddr := cfg.GRPCURL - if grpcAddr == "" { - grpcAddr = cfg.BackendURL - } - grpcAddr = strings.TrimPrefix(grpcAddr, "http://") - grpcAddr = strings.TrimPrefix(grpcAddr, "https://") - - conn, err := grpc.NewClient(grpcAddr, grpc.WithTransportCredentials(creds)) - if err != nil { - lgr.Error("Failed to connect to gRPC", "err", err) - os.Exit(1) - } - defer func() { _ = conn.Close() }() - - ccli := proto.NewCollectorClient(conn) - - for _, svc := range cfg.Services { - svc := svc - var src logsource.LogSource - switch svc.Type { - case "journald": - src, err = journald.New(svc, os.Getenv("JOURNALD_LOGDIR")) - if err != nil { - lgr.Error("Failed to create journald source", "service", svc.Name, "err", err) - os.Exit(1) - } - case "file": - if svc.Path == nil { - lgr.Error("Path is required for file log source", "service", svc.Name) - os.Exit(1) - } - src, err = file.New(*svc.Path) - if err != nil { - lgr.Error("Failed to create file source", "service", svc.Name, "err", err) - os.Exit(1) - } - default: - lgr.Error("Unknown log source type", "type", svc.Type, "service", svc.Name) - os.Exit(1) + wg.Go(func() error { + conn, err := grpc.NewClient(grpcAddr, grpc.WithTransportCredentials(creds)) + if err != nil { + return fmt.Errorf("failed to connect to gRPC: %w", err) } + defer func() { _ = conn.Close() }() - wg.Go(func() error { - lgr.Info("Starting log stream", "service", svc.Name) + ccli := proto.NewCollectorClient(conn) - // First, flush any buffered logs from offline period - if err := flushBufferedLogs(ctx, ccli, logBuf, svc.Name, cfg.Label, cfg.RegistrationToken, lgr); err != nil { - lgr.Error("Failed to flush buffered logs", "service", svc.Name, "err", err) - } - - scli, err := ccli.Stream( - metadata.NewOutgoingContext(ctx, metadata.MD{ - "whoami": []string{cfg.Label}, - "service": []string{svc.Name}, - "token": []string{cfg.RegistrationToken}, - "services": lo.Map(cfg.Services, func(item config.ServiceConfig, _ int) string { - return item.Name - }), - }), - ) - if err != nil { - return fmt.Errorf("failed to create stream: %w", err) - } - - for { - line, err := src.ReadLine() + svcWg := new(errgroup.Group) + for _, svc := range cfg.Services { + svc := svc + var src logsource.LogSource + switch svc.Type { + case "journald": + src, err = journald.New(svc, os.Getenv("JOURNALD_LOGDIR")) if err != nil { - lgr.Error("ReadLine error", "service", svc.Name, "err", err) - return err + return fmt.Errorf("failed to create journald source %q: %w", svc.Name, err) + } + case "file": + if svc.Path == nil { + return fmt.Errorf("path is required for file log source %q", svc.Name) + } + src, err = file.New(*svc.Path) + if err != nil { + return fmt.Errorf("failed to create file source %q: %w", svc.Name, err) + } + default: + return fmt.Errorf("unknown log source type %q for service %q", svc.Type, svc.Name) + } + + svcWg.Go(func() error { + lgr.Info("Starting log stream", "service", svc.Name) + + // First, flush any buffered logs from offline period + if err := flushBufferedLogs(ctx, ccli, logBuf, svc.Name, cfg.Label, cfg.RegistrationToken, lgr); err != nil { + lgr.Error("Failed to flush buffered logs", "service", svc.Name, "err", err) } - if err := scli.Send(&proto.CollectorRequest{ - Message: line, - }); err != nil { - // Connection failed, buffer the log - lgr.Warn("Send failed, buffering log", "service", svc.Name, "err", err) - if storeErr := logBuf.Store(svc.Name, line); storeErr != nil { - lgr.Error("Failed to buffer log", "service", svc.Name, "err", storeErr) - } - // Try to reconnect - if reconnectErr := reconnectStream(ctx, &scli, ccli, svc.Name, cfg.Label, cfg.RegistrationToken, logBuf, lgr); reconnectErr != nil { - return reconnectErr - } - continue + scli, err := ccli.Stream( + metadata.NewOutgoingContext(ctx, metadata.MD{ + "whoami": []string{cfg.Label}, + "service": []string{svc.Name}, + "token": []string{cfg.RegistrationToken}, + "services": lo.Map(cfg.Services, func(item config.ServiceConfig, _ int) string { + return item.Name + }), + }), + ) + if err != nil { + return fmt.Errorf("failed to create stream: %w", err) } - } - }) - } + + for { + line, err := src.ReadLine() + if err != nil { + lgr.Error("ReadLine error", "service", svc.Name, "err", err) + return err + } + + if err := scli.Send(&proto.CollectorRequest{ + Message: line, + }); err != nil { + // Connection failed, buffer the log + lgr.Warn("Send failed, buffering log", "service", svc.Name, "err", err) + if storeErr := logBuf.Store(svc.Name, line); storeErr != nil { + lgr.Error("Failed to buffer log", "service", svc.Name, "err", storeErr) + } + // Try to reconnect + if reconnectErr := reconnectStream(ctx, &scli, ccli, svc.Name, cfg.Label, cfg.RegistrationToken, logBuf, lgr); reconnectErr != nil { + return reconnectErr + } + continue + } + } + }) + } + return svcWg.Wait() + }) } if err := wg.Wait(); err != nil {