chore: add k8s and docker as service to agent and update logic for ansible deploy
ci-agent / build (push) Failing after 2m35s

This commit is contained in:
d3m0k1d
2026-04-05 01:43:38 +03:00
parent 428140ff15
commit 3e5e4815d9
9 changed files with 385 additions and 48 deletions
+89
View File
@@ -0,0 +1,89 @@
package docker
import (
"bufio"
"fmt"
"io"
"os/exec"
"syscall"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/config"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/logsource"
)
var _ logsource.LogSource = new(DockerLogSource)
// DockerLogSource reads logs from a Docker container via `docker logs -f`.
type DockerLogSource struct {
cmd *exec.Cmd
stdout io.ReadCloser
stdoutscanner *bufio.Scanner
}
// ReadLine implements logsource.LogSource.
func (d *DockerLogSource) ReadLine() (string, error) {
if d.stdoutscanner.Scan() {
return d.stdoutscanner.Text(), nil
} else {
if d.stdoutscanner.Err() == nil {
return "", fmt.Errorf("%w: %s", logsource.ErrDead, io.EOF)
}
return "", d.stdoutscanner.Err()
}
}
// Close implements logsource.LogSource.
func (d *DockerLogSource) Close() error {
_ = d.cmd.Process.Signal(syscall.SIGTERM)
return d.cmd.Wait()
}
// New creates a Docker log source for the given container.
// The container name is taken from cfg.Path (if set) or cfg.Name.
func New(cfg config.ServiceConfig) (*DockerLogSource, error) {
containerName := cfg.Name
if cfg.Path != nil && *cfg.Path != "" {
containerName = *cfg.Path
}
// docker logs -f --tail=0 --no-color <container_name>
// -f : follow new logs
// --tail=0 : skip existing logs
// --no-color: strip color codes for clean output
cmd := exec.Command("docker", "logs", "-f", "--tail=0", "--no-color", containerName) //nolint:gosec
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("failed to create stdout pipe for docker logs: %w", err)
}
// Also capture stderr since docker logs merges stdout and stderr from the container
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, fmt.Errorf("failed to create stderr pipe for docker logs: %w", err)
}
err = cmd.Start()
if err != nil {
return nil, fmt.Errorf("failed to start docker logs for container %q: %w", containerName, err)
}
// Use MultiReader to merge stdout and stderr
// Docker logs outputs container stdout+stderr to its own stdout, but we also
// capture the docker CLI's stderr separately in case of errors (e.g. container not found)
stdoutscanner := bufio.NewScanner(stdout)
// Start a goroutine to consume stderr (we don't send docker CLI stderr as logs,
// but we need to prevent the pipe from filling up)
go func() {
buf := make([]byte, 4096)
for {
_, err := stderr.Read(buf)
if err != nil {
return
}
}
}()
return &DockerLogSource{cmd, stdout, stdoutscanner}, nil
}
@@ -0,0 +1,95 @@
package kubernetes
import (
"bufio"
"fmt"
"io"
"os/exec"
"strings"
"syscall"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/config"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/logsource"
)
var _ logsource.LogSource = new(KubernetesLogSource)
// KubernetesLogSource reads logs from a Kubernetes pod via `kubectl logs -f`.
type KubernetesLogSource struct {
cmd *exec.Cmd
stdout io.ReadCloser
stdoutscanner *bufio.Scanner
}
// ReadLine implements logsource.LogSource.
func (k *KubernetesLogSource) ReadLine() (string, error) {
if k.stdoutscanner.Scan() {
return k.stdoutscanner.Text(), nil
} else {
if k.stdoutscanner.Err() == nil {
return "", fmt.Errorf("%w: %s", logsource.ErrDead, io.EOF)
}
return "", k.stdoutscanner.Err()
}
}
// Close implements logsource.LogSource.
func (k *KubernetesLogSource) Close() error {
_ = k.cmd.Process.Signal(syscall.SIGTERM)
return k.cmd.Wait()
}
// New creates a Kubernetes log source for the given pod.
// The pod identifier is taken from cfg.Path in the format "namespace/podname".
// If no namespace is specified (just "podname"), "default" namespace is used.
// If cfg.Path is nil or empty, cfg.Name is used as the pod name with "default" namespace.
func New(cfg config.ServiceConfig) (*KubernetesLogSource, error) {
podName := cfg.Name
namespace := "default"
if cfg.Path != nil && *cfg.Path != "" {
parts := strings.SplitN(*cfg.Path, "/", 2)
if len(parts) == 2 {
namespace = parts[0]
podName = parts[1]
} else {
podName = parts[0]
}
}
// kubectl logs -f <pod> -n <namespace> --tail=0 --no-color
// -f : follow new logs
// --tail=0 : skip existing logs
// --no-color: strip color codes for clean output
cmd := exec.Command("kubectl", "logs", "-f", podName, "-n", namespace, "--tail=0", "--no-color") //nolint:gosec
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("failed to create stdout pipe for kubectl logs: %w", err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, fmt.Errorf("failed to create stderr pipe for kubectl logs: %w", err)
}
err = cmd.Start()
if err != nil {
return nil, fmt.Errorf("failed to start kubectl logs for pod %q (ns: %q): %w", podName, namespace, err)
}
stdoutscanner := bufio.NewScanner(stdout)
// Consume stderr to prevent pipe from filling up
go func() {
buf := make([]byte, 4096)
for {
_, err := stderr.Read(buf)
if err != nil {
return
}
}
}()
return &KubernetesLogSource{cmd, stdout, stdoutscanner}, nil
}
+12
View File
@@ -14,8 +14,10 @@ import (
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/config"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/logger"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/logsource"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/logsource/docker"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/logsource/file"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/logsource/journald"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/logsource/kubernetes"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/mtls"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/agent/internal/registration"
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/proto/proto"
@@ -147,6 +149,16 @@ func main() {
if err != nil {
return fmt.Errorf("failed to create file source %q: %w", svc.Name, err)
}
case "docker":
src, err = docker.New(svc)
if err != nil {
return fmt.Errorf("failed to create docker source for container %q: %w", svc.Name, err)
}
case "kubernetes":
src, err = kubernetes.New(svc)
if err != nil {
return fmt.Errorf("failed to create kubernetes source for pod %q: %w", svc.Name, err)
}
default:
return fmt.Errorf("unknown log source type %q for service %q", svc.Type, svc.Name)
}
+53 -12
View File
@@ -7,32 +7,39 @@ import (
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
)
// ErrUnknownDeployType is returned when an unsupported deployment type is specified
var ErrUnknownDeployType = fmt.Errorf("unknown deploy type, expected 'docker' or 'binary'")
// Executor handles running Ansible playbooks
type Executor struct {
workDir string
grpcServerHost string
grpcServerPort string
backendURL string
workDir string
grpcServerHost string
grpcServerPort string
backendURL string
giteaReleasesURL string
}
// ExecutorConfig holds configuration for the Executor
type ExecutorConfig struct {
WorkDir string
GRPCServerHost string
GRPCServerPort string
BackendURL string
WorkDir string
GRPCServerHost string
GRPCServerPort string
BackendURL string
GiteaReleasesURL string
}
// NewExecutor creates a new Ansible executor
func NewExecutor(cfg ExecutorConfig) *Executor {
return &Executor{
workDir: cfg.WorkDir,
grpcServerHost: cfg.GRPCServerHost,
grpcServerPort: cfg.GRPCServerPort,
backendURL: cfg.BackendURL,
workDir: cfg.WorkDir,
grpcServerHost: cfg.GRPCServerHost,
grpcServerPort: cfg.GRPCServerPort,
backendURL: cfg.BackendURL,
giteaReleasesURL: cfg.GiteaReleasesURL,
}
}
@@ -55,12 +62,42 @@ func (e *Executor) GRPCURL() string {
return e.grpcServerHost + ":" + e.grpcServerPort
}
// CheckDockerCollection verifies that the community.docker Ansible collection is installed.
// Returns an error if the collection is not found.
func (e *Executor) CheckDockerCollection() error {
cmd := exec.Command("ansible-galaxy", "collection", "list", "community.docker")
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
return fmt.Errorf("community.docker collection not found: %s", stderr.String())
}
// ansible-galaxy collection list returns output like:
// # /usr/share/ansible/collections/ansible_collections
// Collection Version
// ---------------- -------
// community.docker 3.10.0
//
// If the collection is not installed, it won't appear in the output.
if !strings.Contains(stdout.String(), "community.docker") {
return fmt.Errorf("community.docker collection is not installed. Run: ansible-galaxy collection install community.docker")
}
return nil
}
// Deploy runs Ansible playbook for the given inventory
func (e *Executor) Deploy(
ctx context.Context,
inventoryPath string,
deployType string,
) ([]DeployResult, error) {
if deployType != "docker" && deployType != "binary" {
return nil, fmt.Errorf("invalid deploy type %q: %w", deployType, ErrUnknownDeployType)
}
playbookName := "binary_deploy.yml"
if deployType == "docker" {
playbookName = "docker_deploy.yml"
@@ -72,6 +109,7 @@ func (e *Executor) Deploy(
"-i", inventoryPath,
"-e", fmt.Sprintf("backend_url=%s", e.backendURL),
"-e", fmt.Sprintf("grpc_url=%s", e.grpcServerHost+":"+e.grpcServerPort),
"-e", fmt.Sprintf("gitea_releases_url=%s", e.giteaReleasesURL),
playbookPath,
)
@@ -100,6 +138,7 @@ func (e *Executor) DeployParallel(
deployType string,
) (map[string][]DeployResult, error) {
var wg sync.WaitGroup
var mu sync.Mutex
results := make(map[string][]DeployResult)
errCh := make(chan error, len(inventoryPaths))
@@ -111,7 +150,9 @@ func (e *Executor) DeployParallel(
if err != nil {
errCh <- err
}
mu.Lock()
results[p] = res
mu.Unlock()
}(path)
}
+7 -10
View File
@@ -26,16 +26,13 @@ type Inventory struct {
Hosts []InventoryHost
}
const inventoryTemplateText = `{{ range .Hosts }}
{{ .Name }} ansible_host={{ .IP }} ansible_port={{ .Port }} ansible_user={{ .User }} ansible_connection=ssh
{{ if eq .AuthMethod "key" }}ansible_ssh_private_key_file={{ .SSHKey }}{{ end }}
{{ if eq .AuthMethod "password" }}ansible_ssh_pass={{ .Password }}{{ end }}
deploy_type={{ .DeployType }}
agent_token={{ .Token }}
agent_label={{ .Name }}
grpc_url={{ .GRPCURL }}
{{ end }}`
const inventoryTemplateText = `{{- range $i, $host := .Hosts }}
{{ $host.Name }} ansible_host={{ $host.IP }} ansible_port={{ $host.Port }} ansible_user={{ $host.User }} ansible_connection=ssh{{ if eq $host.AuthMethod "key" }} ansible_ssh_private_key_file={{ $host.SSHKey }}{{ end }}{{ if eq $host.AuthMethod "password" }} ansible_ssh_pass={{ $host.Password }}{{ end }}
deploy_type={{ $host.DeployType }}
agent_token={{ $host.Token }}
agent_label={{ $host.Name }}
grpc_url={{ $host.GRPCURL }}
{{ end -}}`
// GenerateInventory generates an Ansible inventory file from the given hosts
func GenerateInventory(hosts []InventoryHost, outputPath string) error {
+55 -18
View File
@@ -1,8 +1,7 @@
package ansible
// BinaryDeployPlaybook returns the Ansible playbook for binary deployment.
// Downloads the agent binary, writes config, and starts it directly (no systemd).
// systemd unit is managed separately (e.g. via goreleaser .deb/.rpm packages).
// Downloads the agent binary, writes config, and installs a systemd unit for automatic restart.
const BinaryDeployPlaybook = `---
- name: Deploy HellreigN Agent (Binary)
hosts: all
@@ -14,6 +13,7 @@ const BinaryDeployPlaybook = `---
install_dir: /opt/hellreign
bin_name: hellreign-agent
cert_dir: "{{ install_dir }}/certs"
gitea_releases_url: "{{ gitea_releases_url | default('https://gitea.d3m0k1d.ru/d3m0k1d/HellreigN/releases/latest/download') }}"
tasks:
- name: Create installation directory
@@ -30,7 +30,7 @@ const BinaryDeployPlaybook = `---
- name: Download HellreigN Agent binary
get_url:
url: "https://gitea.d3m0k1d.ru/d3m0k1d/HellreigN/releases/latest/download/{{ bin_name }}"
url: "{{ gitea_releases_url }}/{{ bin_name }}"
dest: "{{ install_dir }}/{{ bin_name }}"
mode: '0755'
@@ -48,16 +48,43 @@ const BinaryDeployPlaybook = `---
dest: "{{ install_dir }}/config.yml"
mode: '0644'
- name: Start HellreigN Agent
shell: |
nohup {{ install_dir }}/{{ bin_name }} > /dev/null 2>&1 &
echo $!
args:
executable: /bin/bash
environment:
CONFIG_FILE: "{{ install_dir }}/config.yml"
register: agent_pid
changed_when: true
- name: Create systemd unit file
copy:
content: |
[Unit]
Description=HellreigN Agent
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
ExecStart={{ install_dir }}/{{ bin_name }}
Restart=always
RestartSec=5
Environment=CONFIG_FILE={{ install_dir }}/config.yml
[Install]
WantedBy=multi-user.target
dest: /etc/systemd/system/hellreign-agent.service
mode: '0644'
- name: Reload systemd daemon
systemd:
daemon_reload: yes
- name: Enable and start HellreigN Agent service
systemd:
name: hellreign-agent
enabled: yes
state: started
- name: Wait for agent to start
pause:
seconds: 3
- name: Verify HellreigN Agent is running
command: systemctl is-active --quiet hellreign-agent
changed_when: false
`
// DockerDeployPlaybook returns the Ansible playbook for Docker deployment.
@@ -72,7 +99,9 @@ const DockerDeployPlaybook = `---
grpc_url: "{{ grpc_url | default('localhost:9001') }}"
container_name: hellreign-agent-{{ agent_label }}
image: "gitea.d3m0k1d.ru/d3m0k1d/hellreign-agent:latest"
install_dir: /opt/hellreign
cert_dir: /etc/hellreign-agent/certs
config_dir: /etc/hellreign-agent
tasks:
- name: Install Docker (if not present)
@@ -94,6 +123,12 @@ const DockerDeployPlaybook = `---
state: directory
mode: '0755'
- name: Create configuration directory
file:
path: "{{ config_dir }}"
state: directory
mode: '0755'
- name: Pull HellreigN Agent image
community.docker.docker_image:
name: "{{ image }}"
@@ -103,14 +138,15 @@ const DockerDeployPlaybook = `---
copy:
content: |
backend_url: "{{ backend_url }}"
grpc_url: "{{ grpc_url }}"
grpc_url: "{{ grpc_url | default('localhost:9001') }}"
label: "{{ agent_label }}"
registration_token: "{{ agent_token }}"
cert_dir: "{{ cert_dir }}"
services:
- name: system
type: journald
dest: "{{ cert_dir }}/config.yml"
- name: "{{ agent_label }}"
type: docker
path: "{{ container_name }}"
dest: "{{ config_dir }}/config.yml"
mode: '0644'
- name: Create and run HellreigN Agent container
@@ -121,6 +157,7 @@ const DockerDeployPlaybook = `---
restart_policy: always
volumes:
- "{{ cert_dir }}:/etc/hellreign-agent/certs"
- "{{ config_dir }}/config.yml:/etc/hellreign-agent/config.yml:ro"
env:
CONFIG_FILE: /etc/hellreign-agent/certs/config.yml
CONFIG_FILE: /etc/hellreign-agent/config.yml
`
+2 -3
View File
@@ -1,5 +1,4 @@
package ansible
const BaseInvTemplate = `
`
// This package contains embedded Ansible templates for playbooks and inventory generation.
// All templates are defined in playbooks.go and inventory.go.
+66 -5
View File
@@ -29,22 +29,33 @@ func NewAgentDeployGroup(h *Handlers) *AgentDeployGroup {
grpcPort = "9001"
}
grpcHost := os.Getenv("GRPC_SERVER_HOST")
if grpcHost == "" {
grpcHost = "0.0.0.0"
}
backendURL := os.Getenv("BACKEND_URL")
if backendURL == "" {
backendURL = "http://localhost:8080"
}
giteaURL := os.Getenv("GITEA_RELEASES_URL")
if giteaURL == "" {
giteaURL = "https://gitea.d3m0k1d.ru/d3m0k1d/HellreigN/releases/latest/download"
}
exec := ansible.NewExecutor(ansible.ExecutorConfig{
WorkDir: workDir,
GRPCServerHost: "0.0.0.0", // TODO: make configurable
GRPCServerHost: grpcHost,
GRPCServerPort: grpcPort,
BackendURL: backendURL,
GiteaReleasesURL: giteaURL,
})
// Write playbooks on init
if err := exec.WriteAllPlaybooks(); err != nil {
// Log but don't fail - playbooks can be written later
_ = err
// Log the error - deployment will fail later if playbooks can't be written
fmt.Fprintf(os.Stderr, "WARNING: failed to write Ansible playbooks: %v\n", err)
}
return &AgentDeployGroup{
@@ -72,6 +83,48 @@ func (adg *AgentDeployGroup) DeployAgents(c *gin.Context) {
return
}
// Validate auth credentials for each server
for i, server := range req.Servers {
switch server.AuthMethod {
case repository.AuthMethodKey:
if server.SSHKey == "" {
c.JSON(http.StatusBadRequest, gin.H{
"error": fmt.Sprintf("server %d (%s): sshKey is required when authMethod is 'key'", i, server.IP),
})
return
}
case repository.AuthMethodPassword:
if server.Password == "" {
c.JSON(http.StatusBadRequest, gin.H{
"error": fmt.Sprintf("server %d (%s): password is required when authMethod is 'password'", i, server.IP),
})
return
}
default:
c.JSON(http.StatusBadRequest, gin.H{
"error": fmt.Sprintf("server %d (%s): invalid authMethod %q, expected 'key' or 'password'", i, server.IP, server.AuthMethod),
})
return
}
}
// Pre-flight check: verify community.docker collection is available for docker deployments
needsDockerCollection := false
for _, server := range req.Servers {
if server.DeployType == repository.DeployTypeDocker {
needsDockerCollection = true
break
}
}
if needsDockerCollection {
if err := adg.executor.CheckDockerCollection(); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"error": fmt.Sprintf("Docker deployment requires 'community.docker' Ansible collection: %v", err),
})
return
}
}
// Create work directory
workDir := adg.executor.WorkDir()
if err := os.MkdirAll(workDir, 0755); err != nil {
@@ -123,6 +176,8 @@ func (adg *AgentDeployGroup) DeployAgents(c *gin.Context) {
inventoryPath := filepath.Join(workDir, fmt.Sprintf("inventory_%d_%d", timestamp, i))
if err := ansible.GenerateInventory(inventoryHosts, inventoryPath); err != nil {
// Rollback: delete the token we just created
_ = adg.Repo.DeleteRegistrationToken(token)
results = append(results, repository.DeployResult{
IP: server.IP,
AgentLabel: server.AgentLabel,
@@ -136,10 +191,14 @@ func (adg *AgentDeployGroup) DeployAgents(c *gin.Context) {
// Run Ansible playbook for this server
deployResults, err := adg.executor.Deploy(ctx, inventoryPath, string(server.DeployType))
// Clean up inventory file
os.Remove(inventoryPath)
// Clean up inventory file (log error but don't fail deployment)
if cleanupErr := os.Remove(inventoryPath); cleanupErr != nil {
fmt.Fprintf(os.Stderr, "WARNING: failed to remove inventory file %s: %v\n", inventoryPath, cleanupErr)
}
if err != nil {
// Rollback: delete the token since deployment failed
_ = adg.Repo.DeleteRegistrationToken(token)
results = append(results, repository.DeployResult{
IP: server.IP,
AgentLabel: server.AgentLabel,
@@ -155,6 +214,8 @@ func (adg *AgentDeployGroup) DeployAgents(c *gin.Context) {
if len(deployResults) > 0 && !deployResults[0].Success {
success = false
errMsg = deployResults[0].Stderr
// Rollback: delete the token since ansible playbook reported failure
_ = adg.Repo.DeleteRegistrationToken(token)
}
results = append(results, repository.DeployResult{
@@ -299,6 +299,12 @@ func (r *Repository) MarkRegistrationTokenUsed(token string) error {
return nil
}
// DeleteRegistrationToken deletes a registration token (used for rollback on deployment failure).
func (r *Repository) DeleteRegistrationToken(token string) error {
_, err := r.DB.Exec(`DELETE FROM registration_tokens WHERE token = ?`, token)
return err
}
// ActivateToken activates a user by token value.
func (r *Repository) ActivateToken(token string) error {
result, err := r.DB.Exec(