Files
d3m0k1d f26fa3da69
ci-agent / build (push) Has been cancelled
feat: add logif for checl alive func
2026-04-05 08:25:08 +03:00

481 lines
13 KiB
Go

package main
import (
"context"
"fmt"
"log"
"os"
"os/exec"
"strings"
"time"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/buffer"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/client"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/commander"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/config"
agentmetrics "gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/metrics"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/logger"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/logsource"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/logsource/docker"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/logsource/file"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/logsource/journald"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/logsource/kubernetes"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/mtls"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/registration"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto/proto"
"github.com/samber/lo"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
)
func main() {
cfgPath := os.Getenv("CONFIG_FILE")
if cfgPath == "" {
cfgPath = "/etc/hellreign-agent/config.yml"
}
cfg, err := config.Load(cfgPath)
if err != nil {
log.Fatalf("Failed to load config: %v", err)
}
lgr := logger.New(os.Getenv("IS_DEBUG") == "1")
lgr.Debug("Config parsed", "cfg", cfg)
// Check if certificates already exist (agent was previously registered)
if registration.CertsExist(cfg.CertDir) {
lgr.Info("Certificates found, skipping registration")
} else {
if cfg.RegistrationToken == "" {
lgr.Error("No registration token provided")
os.Exit(1)
}
// Generate key and CSR
k, csrPEM, err := registration.GenerateKeyAndCSR(cfg.Label)
if err != nil {
lgr.Error("Failed to generate key and CSR", "err", err)
os.Exit(1)
}
lgr.Info("Generated ECDSA key pair and CSR")
// Register with backend
certs, err := registration.Register(cfg.BackendURL, cfg.RegistrationToken, csrPEM)
if err != nil {
lgr.Error("Failed to register", "err", err)
os.Exit(1)
}
lgr.Info("Successfully registered, received certificates")
// Save certificates
if err := registration.SaveCerts(cfg.CertDir, certs, k); err != nil {
lgr.Error("Failed to save certificates", "err", err)
os.Exit(1)
}
lgr.Info("Certificates saved", "cert_dir", cfg.CertDir)
}
creds, err := mtls.LoadMTLSCredentialsFromFiles(
cfg.CertDir+"/ca.crt",
cfg.CertDir+"/client.crt",
cfg.CertDir+"/client.key",
)
if err != nil {
lgr.Error("Failed to load TLS credentials", "err", err)
os.Exit(1)
}
// Initialize log buffer for offline storage
dbPath := getEnvOrDefault("BUFFER_DB", "/var/lib/hellreign-agent/agent_buffer.db")
logBuf, err := buffer.NewLogBuffer(dbPath)
if err != nil {
lgr.Error("Failed to create log buffer", "err", err)
os.Exit(1)
}
defer func() { _ = logBuf.Close() }()
lgr.Info("Log buffer initialized", "path", dbPath)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg := &errgroup.Group{}
grpcAddr := cfg.GRPCURL
if grpcAddr == "" {
grpcAddr = cfg.BackendURL
}
grpcAddr = strings.TrimPrefix(grpcAddr, "http://")
grpcAddr = strings.TrimPrefix(grpcAddr, "https://")
// Start command executor
wg.Go(func() error {
cmdexe := new(commander.CommandExecutor)
ccli := client.New(cmdexe, cfg.Label, cfg.Label)
return ccli.HandleCommands(ctx, grpcAddr, creds)
})
// Start services update stream
if len(cfg.Services) > 0 {
wg.Go(func() error {
return reportServices(ctx, grpcAddr, creds, cfg.Label, cfg.Services, lgr)
})
}
// Start system metrics reporting
wg.Go(func() error {
return reportSystemMetrics(ctx, grpcAddr, creds, cfg.Label, lgr)
})
// Start log collectors
if len(cfg.Services) > 0 {
wg.Go(func() error {
conn, err := grpc.NewClient(grpcAddr, grpc.WithTransportCredentials(creds))
if err != nil {
return fmt.Errorf("failed to connect to gRPC: %w", err)
}
defer func() { _ = conn.Close() }()
ccli := proto.NewCollectorClient(conn)
svcWg := new(errgroup.Group)
for _, svc := range cfg.Services {
svc := svc
var src logsource.LogSource
switch svc.Type {
case "journald":
src, err = journald.New(svc, os.Getenv("JOURNALD_LOGDIR"))
if err != nil {
return fmt.Errorf("failed to create journald source %q: %w", svc.Name, err)
}
case "file":
if svc.Path == nil {
return fmt.Errorf("path is required for file log source %q", svc.Name)
}
src, err = file.New(*svc.Path)
if err != nil {
return fmt.Errorf("failed to create file source %q: %w", svc.Name, err)
}
case "docker":
src, err = docker.New(svc)
if err != nil {
return fmt.Errorf("failed to create docker source for container %q: %w", svc.Name, err)
}
case "kubernetes":
src, err = kubernetes.New(svc)
if err != nil {
return fmt.Errorf("failed to create kubernetes source for pod %q: %w", svc.Name, err)
}
default:
return fmt.Errorf("unknown log source type %q for service %q", svc.Type, svc.Name)
}
svcWg.Go(func() error {
lgr.Info("Starting log stream", "service", svc.Name)
// First, flush any buffered logs from offline period
if err := flushBufferedLogs(ctx, ccli, logBuf, svc.Name, cfg.Label, cfg.RegistrationToken, lgr); err != nil {
lgr.Error("Failed to flush buffered logs", "service", svc.Name, "err", err)
}
scli, err := ccli.Stream(
metadata.NewOutgoingContext(ctx, metadata.MD{
"whoami": []string{cfg.Label},
"service": []string{svc.Name},
"token": []string{cfg.RegistrationToken},
"services": lo.Map(cfg.Services, func(item config.ServiceConfig, _ int) string {
return item.Name
}),
}),
)
if err != nil {
return fmt.Errorf("failed to create stream: %w", err)
}
for {
line, err := src.ReadLine()
if err != nil {
lgr.Error("ReadLine error", "service", svc.Name, "err", err)
return err
}
if err := scli.Send(&proto.CollectorRequest{
Message: line,
}); err != nil {
// Connection failed, buffer the log
lgr.Warn("Send failed, buffering log", "service", svc.Name, "err", err)
if storeErr := logBuf.Store(svc.Name, line); storeErr != nil {
lgr.Error("Failed to buffer log", "service", svc.Name, "err", storeErr)
}
// Try to reconnect
if reconnectErr := reconnectStream(ctx, &scli, ccli, svc.Name, cfg.Label, cfg.RegistrationToken, logBuf, lgr); reconnectErr != nil {
return reconnectErr
}
continue
}
}
})
}
return svcWg.Wait()
})
}
if err := wg.Wait(); err != nil {
lgr.Error("Agent dead", "err", err)
os.Exit(1)
}
}
func getEnvOrDefault(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}
// flushBufferedLogs sends any buffered logs to the server
func flushBufferedLogs(
ctx context.Context,
ccli proto.CollectorClient,
logBuf *buffer.LogBuffer,
service, agentName, token string,
lgr *logger.Logger,
) error {
count, err := logBuf.Count()
if err != nil {
return err
}
if count == 0 {
return nil
}
lgr.Info("Flushing buffered logs", "service", service, "count", count)
scli, err := ccli.Stream(
metadata.NewOutgoingContext(ctx, metadata.MD{
"whoami": []string{agentName},
"service": []string{service},
"token": []string{token},
}),
)
if err != nil {
return fmt.Errorf("failed to create stream for flush: %w", err)
}
const batchSize = 100
var deletedIDs []int64
for {
logs, err := logBuf.GetPending(batchSize)
if err != nil {
return err
}
if len(logs) == 0 {
break
}
for _, logEntry := range logs {
if err := scli.Send(&proto.CollectorRequest{Message: logEntry.Message}); err != nil {
lgr.Error("Failed to send buffered log", "service", service, "err", err)
return err
}
deletedIDs = append(deletedIDs, logEntry.ID)
}
// Delete successfully sent logs
if err := logBuf.DeleteBatch(deletedIDs); err != nil {
lgr.Error("Failed to delete sent logs from buffer", "service", service, "err", err)
}
deletedIDs = deletedIDs[:0]
}
_, err = scli.CloseAndRecv()
lgr.Info("Buffer flush complete", "service", service)
return err
}
// reconnectStream attempts to recreate a gRPC stream connection
func reconnectStream(
ctx context.Context,
scli *grpc.ClientStreamingClient[proto.CollectorRequest, proto.CollectorResponse],
ccli proto.CollectorClient,
service, agentName, token string,
buf *buffer.LogBuffer,
lgr *logger.Logger,
) error {
lgr.Info("Attempting to reconnect stream...", "service", service)
// Try up to 5 times with exponential backoff
for i := 0; i < 5; i++ {
time.Sleep(time.Duration(i+1) * time.Second)
newCli, err := ccli.Stream(
metadata.NewOutgoingContext(ctx, metadata.MD{
"whoami": []string{agentName},
"service": []string{service},
"token": []string{token},
}),
)
if err != nil {
lgr.Warn("Reconnect attempt failed", "service", service, "attempt", i+1, "err", err)
continue
}
*scli = newCli
lgr.Info("Stream reconnected successfully", "service", service)
return flushBufferedLogs(ctx, ccli, buf, service, agentName, token, lgr)
}
return fmt.Errorf("failed to reconnect after 5 attempts for service %s", service)
}
// reportServices periodically sends service status updates to the backend via gRPC.
func reportServices(
ctx context.Context,
grpcAddr string,
creds credentials.TransportCredentials,
label string,
services []config.ServiceConfig,
lgr *logger.Logger,
) error {
conn, err := grpc.NewClient(grpcAddr, grpc.WithTransportCredentials(creds))
if err != nil {
return fmt.Errorf("failed to connect for services report: %w", err)
}
defer conn.Close()
ccli := proto.NewCollectorClient(conn)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
// Send immediately on start, then every 5 seconds
for {
svcUpdates := make([]*proto.ServicesUpdate_ServiceUpdate, 0, len(services))
for _, svc := range services {
status := checkServiceStatus(svc, lgr)
svcUpdates = append(svcUpdates, &proto.ServicesUpdate_ServiceUpdate{
Name: svc.Name,
Status: status,
})
}
md := metadata.New(map[string]string{"whoami": label})
_, err := ccli.ReportServices(
metadata.NewOutgoingContext(ctx, md),
&proto.ServicesUpdate{Services: svcUpdates},
)
if err != nil {
lgr.Warn("Failed to report services", "err", err)
} else {
lgr.Debug("Services reported successfully", "count", len(services))
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
}
}
// checkServiceStatus checks if a service is alive based on its type.
func checkServiceStatus(svc config.ServiceConfig, lgr *logger.Logger) string {
// If systemd_unit is specified, check systemd first
if svc.SystemdUnit != nil && *svc.SystemdUnit != "" {
status := checkSystemdService(*svc.SystemdUnit)
if status != "up" {
lgr.Debug("Systemd service check", "unit", *svc.SystemdUnit, "status", status)
return status
}
}
// For docker type, check container is running
if svc.Type == "docker" {
status := checkDockerContainer(svc.Name)
if status != "up" {
lgr.Debug("Docker container check", "container", svc.Name, "status", status)
return status
}
}
return "up"
}
// checkSystemdService checks if a systemd service is active.
func checkSystemdService(unit string) string {
cmd := exec.Command("systemctl", "is-active", "--quiet", unit)
if err := cmd.Run(); err != nil {
return "down"
}
return "up"
}
// checkDockerContainer checks if a Docker container is running.
func checkDockerContainer(name string) string {
cmd := exec.Command("docker", "inspect", "-f", "{{.State.Running}}", name)
out, err := cmd.Output()
if err != nil {
return "down"
}
if strings.TrimSpace(string(out)) == "true" {
return "up"
}
return "down"
}
// reportSystemMetrics periodically collects and sends system metrics to the backend via gRPC.
func reportSystemMetrics(
ctx context.Context,
grpcAddr string,
creds credentials.TransportCredentials,
label string,
lgr *logger.Logger,
) error {
conn, err := grpc.NewClient(grpcAddr, grpc.WithTransportCredentials(creds))
if err != nil {
return fmt.Errorf("failed to connect for metrics report: %w", err)
}
defer conn.Close()
ccli := proto.NewCollectorClient(conn)
collector := agentmetrics.NewCollector()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
lgr.Info("System metrics collector started")
for {
metrics, err := collector.Collect()
if err != nil {
lgr.Warn("Failed to collect system metrics", "err", err)
} else {
md := metadata.New(map[string]string{"whoami": label})
_, err := ccli.ReportSystemMetrics(
metadata.NewOutgoingContext(ctx, md),
&proto.SystemMetrics{
CpuPercent: metrics.CPUPercent,
MemoryPercent: metrics.MemoryPercent,
DiskPercent: metrics.DiskPercent,
NetworkRxBytes: metrics.NetworkRxBytes,
NetworkTxBytes: metrics.NetworkTxBytes,
},
)
if err != nil {
lgr.Warn("Failed to report system metrics", "err", err)
} else {
lgr.Debug("System metrics reported",
"cpu", metrics.CPUPercent,
"mem", metrics.MemoryPercent,
"disk", metrics.DiskPercent,
)
}
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
}
}