chore: add logparser logic for agent and add parsed log to clickhouse
ci-agent / build (push) Failing after 3m30s
ci-agent / build (push) Failing after 3m30s
This commit is contained in:
+251
-25
@@ -2,15 +2,27 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/buffer"
|
||||
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/client"
|
||||
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/commander"
|
||||
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/config"
|
||||
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/logger"
|
||||
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/logsource"
|
||||
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/logsource/file"
|
||||
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/logsource/journald"
|
||||
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/mtls"
|
||||
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/registration"
|
||||
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto/proto"
|
||||
"github.com/samber/lo"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
func main() {
|
||||
@@ -24,57 +36,271 @@ func main() {
|
||||
log.Fatalf("Failed to load config: %v", err)
|
||||
}
|
||||
|
||||
log.Printf("Agent label: %s", cfg.Label)
|
||||
lgr := logger.New(os.Getenv("IS_DEBUG") == "1")
|
||||
lgr.Debug("Config parsed", "cfg", cfg)
|
||||
|
||||
if cfg.RegistrationToken == "" {
|
||||
log.Fatal("No registration token provided")
|
||||
lgr.Error("No registration token provided")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Generate key and CSR
|
||||
key, csrPEM, err := registration.GenerateKeyAndCSR(cfg.Label)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to generate key and CSR: %v", err)
|
||||
lgr.Error("Failed to generate key and CSR", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
log.Println("Generated ECDSA key pair and CSR")
|
||||
lgr.Info("Generated ECDSA key pair and CSR")
|
||||
|
||||
// Register with backend
|
||||
certs, err := registration.Register(cfg.BackendURL, cfg.RegistrationToken, csrPEM)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to register: %v", err)
|
||||
lgr.Error("Failed to register", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
log.Println("Successfully registered, received certificates")
|
||||
lgr.Info("Successfully registered, received certificates")
|
||||
|
||||
// Save certificates
|
||||
if err := registration.SaveCerts(cfg.CertDir, certs, key); err != nil {
|
||||
log.Fatalf("Failed to save certificates: %v", err)
|
||||
lgr.Error("Failed to save certificates", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
log.Printf("Certificates saved to %s", cfg.CertDir)
|
||||
lgr.Info("Certificates saved", "cert_dir", cfg.CertDir)
|
||||
|
||||
log.Println("Agent registration complete")
|
||||
err = func() error {
|
||||
creds, err := mtls.LoadMTLSCredentialsFromFiles(
|
||||
cfg.CertDir+"/ca.crt",
|
||||
cfg.CertDir+"/client.crt",
|
||||
cfg.CertDir+"/client.key",
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
creds, err := mtls.LoadMTLSCredentialsFromFiles(
|
||||
cfg.CertDir+"/ca.crt",
|
||||
cfg.CertDir+"/client.crt",
|
||||
cfg.CertDir+"/client.key",
|
||||
)
|
||||
if err != nil {
|
||||
lgr.Error("Failed to load TLS credentials", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Initialize log buffer for offline storage
|
||||
dbPath := getEnvOrDefault("BUFFER_DB", "/var/lib/hellreign-agent/agent_buffer.db")
|
||||
logBuf, err := buffer.NewLogBuffer(dbPath)
|
||||
if err != nil {
|
||||
lgr.Error("Failed to create log buffer", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer func() { _ = logBuf.Close() }()
|
||||
lgr.Info("Log buffer initialized", "path", dbPath)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
wg := &errgroup.Group{}
|
||||
|
||||
// Start command executor
|
||||
wg.Go(func() error {
|
||||
cmdexe := new(commander.CommandExecutor)
|
||||
ccli := client.New(cmdexe, cfg.Label, cfg.Label)
|
||||
// Use grpc_url for gRPC connection, strip scheme if present
|
||||
grpcAddr := cfg.GRPCURL
|
||||
if grpcAddr == "" {
|
||||
// Fallback: derive from backend_url by stripping scheme
|
||||
grpcAddr = cfg.BackendURL
|
||||
}
|
||||
grpcAddr = strings.TrimPrefix(grpcAddr, "http://")
|
||||
grpcAddr = strings.TrimPrefix(grpcAddr, "https://")
|
||||
return ccli.HandleCommands(ctx, grpcAddr, creds)
|
||||
}()
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to generate key and CSR: %v", err)
|
||||
})
|
||||
|
||||
// Start log collectors
|
||||
if len(cfg.Services) > 0 {
|
||||
grpcAddr := cfg.GRPCURL
|
||||
if grpcAddr == "" {
|
||||
grpcAddr = cfg.BackendURL
|
||||
}
|
||||
grpcAddr = strings.TrimPrefix(grpcAddr, "http://")
|
||||
grpcAddr = strings.TrimPrefix(grpcAddr, "https://")
|
||||
|
||||
conn, err := grpc.NewClient(grpcAddr, grpc.WithTransportCredentials(creds))
|
||||
if err != nil {
|
||||
lgr.Error("Failed to connect to gRPC", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer func() { _ = conn.Close() }()
|
||||
|
||||
ccli := proto.NewCollectorClient(conn)
|
||||
|
||||
for _, svc := range cfg.Services {
|
||||
svc := svc
|
||||
var src logsource.LogSource
|
||||
switch svc.Type {
|
||||
case "journald":
|
||||
src, err = journald.New(svc, os.Getenv("JOURNALD_LOGDIR"))
|
||||
if err != nil {
|
||||
lgr.Error("Failed to create journald source", "service", svc.Name, "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
case "file":
|
||||
if svc.Path == nil {
|
||||
lgr.Error("Path is required for file log source", "service", svc.Name)
|
||||
os.Exit(1)
|
||||
}
|
||||
src, err = file.New(*svc.Path)
|
||||
if err != nil {
|
||||
lgr.Error("Failed to create file source", "service", svc.Name, "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
default:
|
||||
lgr.Error("Unknown log source type", "type", svc.Type, "service", svc.Name)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
wg.Go(func() error {
|
||||
lgr.Info("Starting log stream", "service", svc.Name)
|
||||
|
||||
// First, flush any buffered logs from offline period
|
||||
if err := flushBufferedLogs(ctx, ccli, logBuf, svc.Name, cfg.Label, cfg.RegistrationToken, lgr); err != nil {
|
||||
lgr.Error("Failed to flush buffered logs", "service", svc.Name, "err", err)
|
||||
}
|
||||
|
||||
scli, err := ccli.Stream(
|
||||
metadata.NewOutgoingContext(ctx, metadata.MD{
|
||||
"whoami": []string{cfg.Label},
|
||||
"service": []string{svc.Name},
|
||||
"token": []string{cfg.RegistrationToken},
|
||||
"services": lo.Map(cfg.Services, func(item config.ServiceConfig, _ int) string {
|
||||
return item.Name
|
||||
}),
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create stream: %w", err)
|
||||
}
|
||||
|
||||
for {
|
||||
line, err := src.ReadLine()
|
||||
if err != nil {
|
||||
lgr.Error("ReadLine error", "service", svc.Name, "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if err := scli.Send(&proto.CollectorRequest{
|
||||
Message: line,
|
||||
}); err != nil {
|
||||
// Connection failed, buffer the log
|
||||
lgr.Warn("Send failed, buffering log", "service", svc.Name, "err", err)
|
||||
if storeErr := logBuf.Store(svc.Name, line); storeErr != nil {
|
||||
lgr.Error("Failed to buffer log", "service", svc.Name, "err", storeErr)
|
||||
}
|
||||
// Try to reconnect
|
||||
if reconnectErr := reconnectStream(ctx, &scli, ccli, svc.Name, cfg.Label, cfg.RegistrationToken, logBuf, lgr); reconnectErr != nil {
|
||||
return reconnectErr
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if err := wg.Wait(); err != nil {
|
||||
lgr.Error("Agent dead", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func getEnvOrDefault(key, defaultValue string) string {
|
||||
if value := os.Getenv(key); value != "" {
|
||||
return value
|
||||
}
|
||||
return defaultValue
|
||||
}
|
||||
|
||||
// flushBufferedLogs sends any buffered logs to the server
|
||||
func flushBufferedLogs(
|
||||
ctx context.Context,
|
||||
ccli proto.CollectorClient,
|
||||
logBuf *buffer.LogBuffer,
|
||||
service, agentName, token string,
|
||||
lgr *logger.Logger,
|
||||
) error {
|
||||
count, err := logBuf.Count()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if count == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
lgr.Info("Flushing buffered logs", "service", service, "count", count)
|
||||
|
||||
scli, err := ccli.Stream(
|
||||
metadata.NewOutgoingContext(ctx, metadata.MD{
|
||||
"whoami": []string{agentName},
|
||||
"service": []string{service},
|
||||
"token": []string{token},
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create stream for flush: %w", err)
|
||||
}
|
||||
|
||||
const batchSize = 100
|
||||
var deletedIDs []int64
|
||||
|
||||
for {
|
||||
logs, err := logBuf.GetPending(batchSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(logs) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
for _, logEntry := range logs {
|
||||
if err := scli.Send(&proto.CollectorRequest{Message: logEntry.Message}); err != nil {
|
||||
lgr.Error("Failed to send buffered log", "service", service, "err", err)
|
||||
return err
|
||||
}
|
||||
deletedIDs = append(deletedIDs, logEntry.ID)
|
||||
}
|
||||
|
||||
// Delete successfully sent logs
|
||||
if err := logBuf.DeleteBatch(deletedIDs); err != nil {
|
||||
lgr.Error("Failed to delete sent logs from buffer", "service", service, "err", err)
|
||||
}
|
||||
deletedIDs = deletedIDs[:0]
|
||||
}
|
||||
|
||||
_, err = scli.CloseAndRecv()
|
||||
lgr.Info("Buffer flush complete", "service", service)
|
||||
return err
|
||||
}
|
||||
|
||||
// reconnectStream attempts to recreate a gRPC stream connection
|
||||
func reconnectStream(
|
||||
ctx context.Context,
|
||||
scli *grpc.ClientStreamingClient[proto.CollectorRequest, proto.CollectorResponse],
|
||||
ccli proto.CollectorClient,
|
||||
service, agentName, token string,
|
||||
buf *buffer.LogBuffer,
|
||||
lgr *logger.Logger,
|
||||
) error {
|
||||
lgr.Info("Attempting to reconnect stream...", "service", service)
|
||||
|
||||
// Try up to 5 times with exponential backoff
|
||||
for i := 0; i < 5; i++ {
|
||||
time.Sleep(time.Duration(i+1) * time.Second)
|
||||
|
||||
newCli, err := ccli.Stream(
|
||||
metadata.NewOutgoingContext(ctx, metadata.MD{
|
||||
"whoami": []string{agentName},
|
||||
"service": []string{service},
|
||||
"token": []string{token},
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
lgr.Warn("Reconnect attempt failed", "service", service, "attempt", i+1, "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
*scli = newCli
|
||||
lgr.Info("Stream reconnected successfully", "service", service)
|
||||
return flushBufferedLogs(ctx, ccli, buf, service, agentName, token, lgr)
|
||||
}
|
||||
|
||||
return fmt.Errorf("failed to reconnect after 5 attempts for service %s", service)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user