refactor(agent): error handling
This commit is contained in:
+78
-86
@@ -97,108 +97,100 @@ func main() {
|
|||||||
|
|
||||||
wg := &errgroup.Group{}
|
wg := &errgroup.Group{}
|
||||||
|
|
||||||
|
grpcAddr := cfg.GRPCURL
|
||||||
|
if grpcAddr == "" {
|
||||||
|
grpcAddr = cfg.BackendURL
|
||||||
|
}
|
||||||
|
grpcAddr = strings.TrimPrefix(grpcAddr, "http://")
|
||||||
|
grpcAddr = strings.TrimPrefix(grpcAddr, "https://")
|
||||||
// Start command executor
|
// Start command executor
|
||||||
wg.Go(func() error {
|
wg.Go(func() error {
|
||||||
cmdexe := new(commander.CommandExecutor)
|
cmdexe := new(commander.CommandExecutor)
|
||||||
ccli := client.New(cmdexe, cfg.Label, cfg.Label)
|
ccli := client.New(cmdexe, cfg.Label, cfg.Label)
|
||||||
grpcAddr := cfg.GRPCURL
|
|
||||||
if grpcAddr == "" {
|
|
||||||
grpcAddr = cfg.BackendURL
|
|
||||||
}
|
|
||||||
grpcAddr = strings.TrimPrefix(grpcAddr, "http://")
|
|
||||||
grpcAddr = strings.TrimPrefix(grpcAddr, "https://")
|
|
||||||
return ccli.HandleCommands(ctx, grpcAddr, creds)
|
return ccli.HandleCommands(ctx, grpcAddr, creds)
|
||||||
})
|
})
|
||||||
|
|
||||||
// Start log collectors
|
// Start log collectors
|
||||||
if len(cfg.Services) > 0 {
|
if len(cfg.Services) > 0 {
|
||||||
grpcAddr := cfg.GRPCURL
|
wg.Go(func() error {
|
||||||
if grpcAddr == "" {
|
conn, err := grpc.NewClient(grpcAddr, grpc.WithTransportCredentials(creds))
|
||||||
grpcAddr = cfg.BackendURL
|
if err != nil {
|
||||||
}
|
return fmt.Errorf("failed to connect to gRPC: %w", err)
|
||||||
grpcAddr = strings.TrimPrefix(grpcAddr, "http://")
|
|
||||||
grpcAddr = strings.TrimPrefix(grpcAddr, "https://")
|
|
||||||
|
|
||||||
conn, err := grpc.NewClient(grpcAddr, grpc.WithTransportCredentials(creds))
|
|
||||||
if err != nil {
|
|
||||||
lgr.Error("Failed to connect to gRPC", "err", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
defer func() { _ = conn.Close() }()
|
|
||||||
|
|
||||||
ccli := proto.NewCollectorClient(conn)
|
|
||||||
|
|
||||||
for _, svc := range cfg.Services {
|
|
||||||
svc := svc
|
|
||||||
var src logsource.LogSource
|
|
||||||
switch svc.Type {
|
|
||||||
case "journald":
|
|
||||||
src, err = journald.New(svc, os.Getenv("JOURNALD_LOGDIR"))
|
|
||||||
if err != nil {
|
|
||||||
lgr.Error("Failed to create journald source", "service", svc.Name, "err", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
case "file":
|
|
||||||
if svc.Path == nil {
|
|
||||||
lgr.Error("Path is required for file log source", "service", svc.Name)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
src, err = file.New(*svc.Path)
|
|
||||||
if err != nil {
|
|
||||||
lgr.Error("Failed to create file source", "service", svc.Name, "err", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
lgr.Error("Unknown log source type", "type", svc.Type, "service", svc.Name)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
|
defer func() { _ = conn.Close() }()
|
||||||
|
|
||||||
wg.Go(func() error {
|
ccli := proto.NewCollectorClient(conn)
|
||||||
lgr.Info("Starting log stream", "service", svc.Name)
|
|
||||||
|
|
||||||
// First, flush any buffered logs from offline period
|
svcWg := new(errgroup.Group)
|
||||||
if err := flushBufferedLogs(ctx, ccli, logBuf, svc.Name, cfg.Label, cfg.RegistrationToken, lgr); err != nil {
|
for _, svc := range cfg.Services {
|
||||||
lgr.Error("Failed to flush buffered logs", "service", svc.Name, "err", err)
|
svc := svc
|
||||||
}
|
var src logsource.LogSource
|
||||||
|
switch svc.Type {
|
||||||
scli, err := ccli.Stream(
|
case "journald":
|
||||||
metadata.NewOutgoingContext(ctx, metadata.MD{
|
src, err = journald.New(svc, os.Getenv("JOURNALD_LOGDIR"))
|
||||||
"whoami": []string{cfg.Label},
|
|
||||||
"service": []string{svc.Name},
|
|
||||||
"token": []string{cfg.RegistrationToken},
|
|
||||||
"services": lo.Map(cfg.Services, func(item config.ServiceConfig, _ int) string {
|
|
||||||
return item.Name
|
|
||||||
}),
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to create stream: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
line, err := src.ReadLine()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
lgr.Error("ReadLine error", "service", svc.Name, "err", err)
|
return fmt.Errorf("failed to create journald source %q: %w", svc.Name, err)
|
||||||
return err
|
}
|
||||||
|
case "file":
|
||||||
|
if svc.Path == nil {
|
||||||
|
return fmt.Errorf("path is required for file log source %q", svc.Name)
|
||||||
|
}
|
||||||
|
src, err = file.New(*svc.Path)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create file source %q: %w", svc.Name, err)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unknown log source type %q for service %q", svc.Type, svc.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
svcWg.Go(func() error {
|
||||||
|
lgr.Info("Starting log stream", "service", svc.Name)
|
||||||
|
|
||||||
|
// First, flush any buffered logs from offline period
|
||||||
|
if err := flushBufferedLogs(ctx, ccli, logBuf, svc.Name, cfg.Label, cfg.RegistrationToken, lgr); err != nil {
|
||||||
|
lgr.Error("Failed to flush buffered logs", "service", svc.Name, "err", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := scli.Send(&proto.CollectorRequest{
|
scli, err := ccli.Stream(
|
||||||
Message: line,
|
metadata.NewOutgoingContext(ctx, metadata.MD{
|
||||||
}); err != nil {
|
"whoami": []string{cfg.Label},
|
||||||
// Connection failed, buffer the log
|
"service": []string{svc.Name},
|
||||||
lgr.Warn("Send failed, buffering log", "service", svc.Name, "err", err)
|
"token": []string{cfg.RegistrationToken},
|
||||||
if storeErr := logBuf.Store(svc.Name, line); storeErr != nil {
|
"services": lo.Map(cfg.Services, func(item config.ServiceConfig, _ int) string {
|
||||||
lgr.Error("Failed to buffer log", "service", svc.Name, "err", storeErr)
|
return item.Name
|
||||||
}
|
}),
|
||||||
// Try to reconnect
|
}),
|
||||||
if reconnectErr := reconnectStream(ctx, &scli, ccli, svc.Name, cfg.Label, cfg.RegistrationToken, logBuf, lgr); reconnectErr != nil {
|
)
|
||||||
return reconnectErr
|
if err != nil {
|
||||||
}
|
return fmt.Errorf("failed to create stream: %w", err)
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
})
|
for {
|
||||||
}
|
line, err := src.ReadLine()
|
||||||
|
if err != nil {
|
||||||
|
lgr.Error("ReadLine error", "service", svc.Name, "err", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := scli.Send(&proto.CollectorRequest{
|
||||||
|
Message: line,
|
||||||
|
}); err != nil {
|
||||||
|
// Connection failed, buffer the log
|
||||||
|
lgr.Warn("Send failed, buffering log", "service", svc.Name, "err", err)
|
||||||
|
if storeErr := logBuf.Store(svc.Name, line); storeErr != nil {
|
||||||
|
lgr.Error("Failed to buffer log", "service", svc.Name, "err", storeErr)
|
||||||
|
}
|
||||||
|
// Try to reconnect
|
||||||
|
if reconnectErr := reconnectStream(ctx, &scli, ccli, svc.Name, cfg.Label, cfg.RegistrationToken, logBuf, lgr); reconnectErr != nil {
|
||||||
|
return reconnectErr
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return svcWg.Wait()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := wg.Wait(); err != nil {
|
if err := wg.Wait(); err != nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user