373 lines
10 KiB
Go
373 lines
10 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/buffer"
|
|
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/client"
|
|
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/commander"
|
|
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/config"
|
|
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/logger"
|
|
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/logsource"
|
|
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/logsource/docker"
|
|
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/logsource/file"
|
|
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/logsource/journald"
|
|
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/logsource/kubernetes"
|
|
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/mtls"
|
|
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/registration"
|
|
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto/proto"
|
|
"github.com/samber/lo"
|
|
"golang.org/x/sync/errgroup"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials"
|
|
"google.golang.org/grpc/metadata"
|
|
)
|
|
|
|
func main() {
|
|
cfgPath := os.Getenv("CONFIG_FILE")
|
|
if cfgPath == "" {
|
|
cfgPath = "/etc/hellreign-agent/config.yml"
|
|
}
|
|
|
|
cfg, err := config.Load(cfgPath)
|
|
if err != nil {
|
|
log.Fatalf("Failed to load config: %v", err)
|
|
}
|
|
|
|
lgr := logger.New(os.Getenv("IS_DEBUG") == "1")
|
|
lgr.Debug("Config parsed", "cfg", cfg)
|
|
|
|
// Check if certificates already exist (agent was previously registered)
|
|
if registration.CertsExist(cfg.CertDir) {
|
|
lgr.Info("Certificates found, skipping registration")
|
|
} else {
|
|
if cfg.RegistrationToken == "" {
|
|
lgr.Error("No registration token provided")
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Generate key and CSR
|
|
k, csrPEM, err := registration.GenerateKeyAndCSR(cfg.Label)
|
|
if err != nil {
|
|
lgr.Error("Failed to generate key and CSR", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
lgr.Info("Generated ECDSA key pair and CSR")
|
|
|
|
// Register with backend
|
|
certs, err := registration.Register(cfg.BackendURL, cfg.RegistrationToken, csrPEM)
|
|
if err != nil {
|
|
lgr.Error("Failed to register", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
lgr.Info("Successfully registered, received certificates")
|
|
|
|
// Save certificates
|
|
if err := registration.SaveCerts(cfg.CertDir, certs, k); err != nil {
|
|
lgr.Error("Failed to save certificates", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
lgr.Info("Certificates saved", "cert_dir", cfg.CertDir)
|
|
}
|
|
|
|
creds, err := mtls.LoadMTLSCredentialsFromFiles(
|
|
cfg.CertDir+"/ca.crt",
|
|
cfg.CertDir+"/client.crt",
|
|
cfg.CertDir+"/client.key",
|
|
)
|
|
if err != nil {
|
|
lgr.Error("Failed to load TLS credentials", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Initialize log buffer for offline storage
|
|
dbPath := getEnvOrDefault("BUFFER_DB", "/var/lib/hellreign-agent/agent_buffer.db")
|
|
logBuf, err := buffer.NewLogBuffer(dbPath)
|
|
if err != nil {
|
|
lgr.Error("Failed to create log buffer", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
defer func() { _ = logBuf.Close() }()
|
|
lgr.Info("Log buffer initialized", "path", dbPath)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
wg := &errgroup.Group{}
|
|
|
|
grpcAddr := cfg.GRPCURL
|
|
if grpcAddr == "" {
|
|
grpcAddr = cfg.BackendURL
|
|
}
|
|
grpcAddr = strings.TrimPrefix(grpcAddr, "http://")
|
|
grpcAddr = strings.TrimPrefix(grpcAddr, "https://")
|
|
// Start command executor
|
|
wg.Go(func() error {
|
|
cmdexe := new(commander.CommandExecutor)
|
|
ccli := client.New(cmdexe, cfg.Label, cfg.Label)
|
|
return ccli.HandleCommands(ctx, grpcAddr, creds)
|
|
})
|
|
|
|
// Start services update stream
|
|
if len(cfg.Services) > 0 {
|
|
wg.Go(func() error {
|
|
return reportServices(ctx, grpcAddr, creds, cfg.Label, cfg.Services, lgr)
|
|
})
|
|
}
|
|
|
|
// Start log collectors
|
|
if len(cfg.Services) > 0 {
|
|
wg.Go(func() error {
|
|
conn, err := grpc.NewClient(grpcAddr, grpc.WithTransportCredentials(creds))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect to gRPC: %w", err)
|
|
}
|
|
defer func() { _ = conn.Close() }()
|
|
|
|
ccli := proto.NewCollectorClient(conn)
|
|
|
|
svcWg := new(errgroup.Group)
|
|
for _, svc := range cfg.Services {
|
|
svc := svc
|
|
var src logsource.LogSource
|
|
switch svc.Type {
|
|
case "journald":
|
|
src, err = journald.New(svc, os.Getenv("JOURNALD_LOGDIR"))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create journald source %q: %w", svc.Name, err)
|
|
}
|
|
case "file":
|
|
if svc.Path == nil {
|
|
return fmt.Errorf("path is required for file log source %q", svc.Name)
|
|
}
|
|
src, err = file.New(*svc.Path)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create file source %q: %w", svc.Name, err)
|
|
}
|
|
case "docker":
|
|
src, err = docker.New(svc)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create docker source for container %q: %w", svc.Name, err)
|
|
}
|
|
case "kubernetes":
|
|
src, err = kubernetes.New(svc)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create kubernetes source for pod %q: %w", svc.Name, err)
|
|
}
|
|
default:
|
|
return fmt.Errorf("unknown log source type %q for service %q", svc.Type, svc.Name)
|
|
}
|
|
|
|
svcWg.Go(func() error {
|
|
lgr.Info("Starting log stream", "service", svc.Name)
|
|
|
|
// First, flush any buffered logs from offline period
|
|
if err := flushBufferedLogs(ctx, ccli, logBuf, svc.Name, cfg.Label, cfg.RegistrationToken, lgr); err != nil {
|
|
lgr.Error("Failed to flush buffered logs", "service", svc.Name, "err", err)
|
|
}
|
|
|
|
scli, err := ccli.Stream(
|
|
metadata.NewOutgoingContext(ctx, metadata.MD{
|
|
"whoami": []string{cfg.Label},
|
|
"service": []string{svc.Name},
|
|
"token": []string{cfg.RegistrationToken},
|
|
"services": lo.Map(cfg.Services, func(item config.ServiceConfig, _ int) string {
|
|
return item.Name
|
|
}),
|
|
}),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create stream: %w", err)
|
|
}
|
|
|
|
for {
|
|
line, err := src.ReadLine()
|
|
if err != nil {
|
|
lgr.Error("ReadLine error", "service", svc.Name, "err", err)
|
|
return err
|
|
}
|
|
|
|
if err := scli.Send(&proto.CollectorRequest{
|
|
Message: line,
|
|
}); err != nil {
|
|
// Connection failed, buffer the log
|
|
lgr.Warn("Send failed, buffering log", "service", svc.Name, "err", err)
|
|
if storeErr := logBuf.Store(svc.Name, line); storeErr != nil {
|
|
lgr.Error("Failed to buffer log", "service", svc.Name, "err", storeErr)
|
|
}
|
|
// Try to reconnect
|
|
if reconnectErr := reconnectStream(ctx, &scli, ccli, svc.Name, cfg.Label, cfg.RegistrationToken, logBuf, lgr); reconnectErr != nil {
|
|
return reconnectErr
|
|
}
|
|
continue
|
|
}
|
|
}
|
|
})
|
|
}
|
|
return svcWg.Wait()
|
|
})
|
|
}
|
|
|
|
if err := wg.Wait(); err != nil {
|
|
lgr.Error("Agent dead", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func getEnvOrDefault(key, defaultValue string) string {
|
|
if value := os.Getenv(key); value != "" {
|
|
return value
|
|
}
|
|
return defaultValue
|
|
}
|
|
|
|
// flushBufferedLogs sends any buffered logs to the server
|
|
func flushBufferedLogs(
|
|
ctx context.Context,
|
|
ccli proto.CollectorClient,
|
|
logBuf *buffer.LogBuffer,
|
|
service, agentName, token string,
|
|
lgr *logger.Logger,
|
|
) error {
|
|
count, err := logBuf.Count()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if count == 0 {
|
|
return nil
|
|
}
|
|
|
|
lgr.Info("Flushing buffered logs", "service", service, "count", count)
|
|
|
|
scli, err := ccli.Stream(
|
|
metadata.NewOutgoingContext(ctx, metadata.MD{
|
|
"whoami": []string{agentName},
|
|
"service": []string{service},
|
|
"token": []string{token},
|
|
}),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create stream for flush: %w", err)
|
|
}
|
|
|
|
const batchSize = 100
|
|
var deletedIDs []int64
|
|
|
|
for {
|
|
logs, err := logBuf.GetPending(batchSize)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(logs) == 0 {
|
|
break
|
|
}
|
|
|
|
for _, logEntry := range logs {
|
|
if err := scli.Send(&proto.CollectorRequest{Message: logEntry.Message}); err != nil {
|
|
lgr.Error("Failed to send buffered log", "service", service, "err", err)
|
|
return err
|
|
}
|
|
deletedIDs = append(deletedIDs, logEntry.ID)
|
|
}
|
|
|
|
// Delete successfully sent logs
|
|
if err := logBuf.DeleteBatch(deletedIDs); err != nil {
|
|
lgr.Error("Failed to delete sent logs from buffer", "service", service, "err", err)
|
|
}
|
|
deletedIDs = deletedIDs[:0]
|
|
}
|
|
|
|
_, err = scli.CloseAndRecv()
|
|
lgr.Info("Buffer flush complete", "service", service)
|
|
return err
|
|
}
|
|
|
|
// reconnectStream attempts to recreate a gRPC stream connection
|
|
func reconnectStream(
|
|
ctx context.Context,
|
|
scli *grpc.ClientStreamingClient[proto.CollectorRequest, proto.CollectorResponse],
|
|
ccli proto.CollectorClient,
|
|
service, agentName, token string,
|
|
buf *buffer.LogBuffer,
|
|
lgr *logger.Logger,
|
|
) error {
|
|
lgr.Info("Attempting to reconnect stream...", "service", service)
|
|
|
|
// Try up to 5 times with exponential backoff
|
|
for i := 0; i < 5; i++ {
|
|
time.Sleep(time.Duration(i+1) * time.Second)
|
|
|
|
newCli, err := ccli.Stream(
|
|
metadata.NewOutgoingContext(ctx, metadata.MD{
|
|
"whoami": []string{agentName},
|
|
"service": []string{service},
|
|
"token": []string{token},
|
|
}),
|
|
)
|
|
if err != nil {
|
|
lgr.Warn("Reconnect attempt failed", "service", service, "attempt", i+1, "err", err)
|
|
continue
|
|
}
|
|
|
|
*scli = newCli
|
|
lgr.Info("Stream reconnected successfully", "service", service)
|
|
return flushBufferedLogs(ctx, ccli, buf, service, agentName, token, lgr)
|
|
}
|
|
|
|
return fmt.Errorf("failed to reconnect after 5 attempts for service %s", service)
|
|
}
|
|
|
|
// reportServices periodically sends service status updates to the backend via gRPC.
|
|
// For now, all configured services are reported as "up" every 5 seconds.
|
|
func reportServices(
|
|
ctx context.Context,
|
|
grpcAddr string,
|
|
creds credentials.TransportCredentials,
|
|
label string,
|
|
services []config.ServiceConfig,
|
|
lgr *logger.Logger,
|
|
) error {
|
|
conn, err := grpc.NewClient(grpcAddr, grpc.WithTransportCredentials(creds))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect for services report: %w", err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
ccli := proto.NewCollectorClient(conn)
|
|
ticker := time.NewTicker(5 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
// Send immediately on start, then every 5 seconds
|
|
for {
|
|
svcUpdates := make([]*proto.ServicesUpdate_ServiceUpdate, 0, len(services))
|
|
for _, svc := range services {
|
|
svcUpdates = append(svcUpdates, &proto.ServicesUpdate_ServiceUpdate{
|
|
Name: svc.Name,
|
|
Status: "up",
|
|
})
|
|
}
|
|
|
|
md := metadata.New(map[string]string{"whoami": label})
|
|
_, err := ccli.ReportServices(
|
|
metadata.NewOutgoingContext(ctx, md),
|
|
&proto.ServicesUpdate{Services: svcUpdates},
|
|
)
|
|
if err != nil {
|
|
lgr.Warn("Failed to report services", "err", err)
|
|
} else {
|
|
lgr.Debug("Services reported successfully", "count", len(services))
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-ticker.C:
|
|
}
|
|
}
|
|
}
|