Files
HellreigN/agent/internal/logsource/kubernetes/impl.go
T
2026-04-05 01:43:38 +03:00

96 lines
2.5 KiB
Go

package kubernetes
import (
"bufio"
"fmt"
"io"
"os/exec"
"strings"
"syscall"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/config"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/logsource"
)
var _ logsource.LogSource = new(KubernetesLogSource)
// KubernetesLogSource reads logs from a Kubernetes pod via `kubectl logs -f`.
type KubernetesLogSource struct {
cmd *exec.Cmd
stdout io.ReadCloser
stdoutscanner *bufio.Scanner
}
// ReadLine implements logsource.LogSource.
func (k *KubernetesLogSource) ReadLine() (string, error) {
if k.stdoutscanner.Scan() {
return k.stdoutscanner.Text(), nil
} else {
if k.stdoutscanner.Err() == nil {
return "", fmt.Errorf("%w: %s", logsource.ErrDead, io.EOF)
}
return "", k.stdoutscanner.Err()
}
}
// Close implements logsource.LogSource.
func (k *KubernetesLogSource) Close() error {
_ = k.cmd.Process.Signal(syscall.SIGTERM)
return k.cmd.Wait()
}
// New creates a Kubernetes log source for the given pod.
// The pod identifier is taken from cfg.Path in the format "namespace/podname".
// If no namespace is specified (just "podname"), "default" namespace is used.
// If cfg.Path is nil or empty, cfg.Name is used as the pod name with "default" namespace.
func New(cfg config.ServiceConfig) (*KubernetesLogSource, error) {
podName := cfg.Name
namespace := "default"
if cfg.Path != nil && *cfg.Path != "" {
parts := strings.SplitN(*cfg.Path, "/", 2)
if len(parts) == 2 {
namespace = parts[0]
podName = parts[1]
} else {
podName = parts[0]
}
}
// kubectl logs -f <pod> -n <namespace> --tail=0 --no-color
// -f : follow new logs
// --tail=0 : skip existing logs
// --no-color: strip color codes for clean output
cmd := exec.Command("kubectl", "logs", "-f", podName, "-n", namespace, "--tail=0", "--no-color") //nolint:gosec
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("failed to create stdout pipe for kubectl logs: %w", err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, fmt.Errorf("failed to create stderr pipe for kubectl logs: %w", err)
}
err = cmd.Start()
if err != nil {
return nil, fmt.Errorf("failed to start kubectl logs for pod %q (ns: %q): %w", podName, namespace, err)
}
stdoutscanner := bufio.NewScanner(stdout)
// Consume stderr to prevent pipe from filling up
go func() {
buf := make([]byte, 4096)
for {
_, err := stderr.Read(buf)
if err != nil {
return
}
}
}()
return &KubernetesLogSource{cmd, stdout, stdoutscanner}, nil
}