This commit is contained in:
@@ -106,6 +106,12 @@ func main() {
|
||||
scriptManageHandlers := handlers.NewScriptHandlersGroup(scriptSvc, cmdr,
|
||||
os.Getenv("WHEREAMI"))
|
||||
|
||||
graphPath := os.Getenv("GRAPH_YAML_PATH")
|
||||
if graphPath == "" {
|
||||
graphPath = "/etc/hellreign/services.yaml"
|
||||
}
|
||||
graphHandlers := handlers.NewGraphHandlers(graphPath)
|
||||
|
||||
agents := handlers.NewAgentsGroup(h, coll)
|
||||
auth := handlers.AuthGroup{Handlers: h}
|
||||
agentReg := handlers.NewAgentRegistrationGroup(h)
|
||||
@@ -212,6 +218,16 @@ func main() {
|
||||
jobsGroup.GET("/metrics", jobsHandlers.GetJobMetrics)
|
||||
}
|
||||
|
||||
// Service dependency graph (requires admin permission)
|
||||
graphGroup := v1.Group("/graph")
|
||||
graphGroup.Use(auth.AuthMiddleware(), handlers.RequireAdmin())
|
||||
{
|
||||
graphGroup.GET("", graphHandlers.GetYAML)
|
||||
graphGroup.PUT("", graphHandlers.UpdateYAML)
|
||||
graphGroup.GET("/order", graphHandlers.StartupOrder)
|
||||
graphGroup.GET("/cycle", graphHandlers.CycleCheck)
|
||||
}
|
||||
|
||||
// Agent registration
|
||||
agentRegGroup := v1.Group("/agents")
|
||||
{
|
||||
|
||||
@@ -0,0 +1,238 @@
|
||||
package graph
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
)
|
||||
|
||||
// DepCondition represents how a service waits for a dependency.
|
||||
type DepCondition string
|
||||
|
||||
const (
|
||||
Started DepCondition = "started"
|
||||
Healthy DepCondition = "healthy"
|
||||
CompletedSuccessfully DepCondition = "completed_successfully"
|
||||
)
|
||||
|
||||
// ServiceRef uniquely identifies a service across nodes.
|
||||
// If NodeID is empty, it refers to a service in the same node.
|
||||
type ServiceRef struct {
|
||||
NodeID string `json:"node_id,omitempty"`
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
// String returns a human-readable reference like "node:service" or just "service".
|
||||
func (r ServiceRef) String() string {
|
||||
if r.NodeID != "" {
|
||||
return r.NodeID + ":" + r.Name
|
||||
}
|
||||
return r.Name
|
||||
}
|
||||
|
||||
// Dependency declares that a service depends on another service (possibly in a different node).
|
||||
type Dependency struct {
|
||||
Target ServiceRef `json:"target"`
|
||||
Condition DepCondition `json:"condition"`
|
||||
}
|
||||
|
||||
// Service represents a named service within a node with its dependency declarations.
|
||||
type Service struct {
|
||||
Name string `json:"name"`
|
||||
Dependencies []Dependency `json:"dependencies,omitempty"`
|
||||
}
|
||||
|
||||
// Node represents a logical grouping of services (e.g., a server or cluster).
|
||||
type Node struct {
|
||||
ID string `json:"id"`
|
||||
Services []*Service `json:"services"`
|
||||
}
|
||||
|
||||
// Graph holds nodes, services, and computes dependency order.
|
||||
type Graph struct {
|
||||
nodes map[string]*Node
|
||||
// adj[key] = list of services that key depends on
|
||||
// key format: "nodeID:serviceName"
|
||||
adj map[string][]ServiceRef
|
||||
}
|
||||
|
||||
func New() *Graph {
|
||||
return &Graph{
|
||||
nodes: make(map[string]*Node),
|
||||
adj: make(map[string][]ServiceRef),
|
||||
}
|
||||
}
|
||||
|
||||
// AddNode adds a node to the graph.
|
||||
func (g *Graph) AddNode(nodeID string) *Node {
|
||||
if n, ok := g.nodes[nodeID]; ok {
|
||||
return n
|
||||
}
|
||||
n := &Node{ID: nodeID}
|
||||
g.nodes[nodeID] = n
|
||||
return n
|
||||
}
|
||||
|
||||
// AddService adds a service to a node.
|
||||
func (g *Graph) AddService(nodeID string, svc *Service) {
|
||||
node := g.AddNode(nodeID)
|
||||
node.Services = append(node.Services, svc)
|
||||
key := nodeID + ":" + svc.Name
|
||||
g.adj[key] = nil
|
||||
}
|
||||
|
||||
// ResolveRef resolves a ServiceRef to its full "nodeID:serviceName" key.
|
||||
// If ref.NodeID is empty, it's resolved relative to the given sourceNodeID.
|
||||
func (g *Graph) ResolveRef(ref ServiceRef, sourceNodeID string) (string, error) {
|
||||
nodeID := ref.NodeID
|
||||
if nodeID == "" {
|
||||
nodeID = sourceNodeID
|
||||
}
|
||||
key := nodeID + ":" + ref.Name
|
||||
if _, ok := g.adj[key]; !ok {
|
||||
return "", fmt.Errorf("unknown service %q", key)
|
||||
}
|
||||
return key, nil
|
||||
}
|
||||
|
||||
// AddDependency adds a dependency: source service depends on target service.
|
||||
func (g *Graph) AddDependency(sourceNodeID, sourceName string, dep Dependency) error {
|
||||
srcKey := sourceNodeID + ":" + sourceName
|
||||
if _, ok := g.adj[srcKey]; !ok {
|
||||
return fmt.Errorf("unknown source service %q", srcKey)
|
||||
}
|
||||
|
||||
if _, err := g.ResolveRef(dep.Target, sourceNodeID); err != nil {
|
||||
return fmt.Errorf("dependency target invalid: %w", err)
|
||||
}
|
||||
|
||||
g.adj[srcKey] = append(g.adj[srcKey], dep.Target)
|
||||
|
||||
// Also update the Service struct for serialization
|
||||
node, ok := g.nodes[sourceNodeID]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
for _, svc := range node.Services {
|
||||
if svc.Name == sourceName {
|
||||
svc.Dependencies = append(svc.Dependencies, dep)
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// HasCycle detects if the dependency graph contains a cycle.
|
||||
func (g *Graph) HasCycle() bool {
|
||||
const (
|
||||
white = 0
|
||||
gray = 1
|
||||
black = 2
|
||||
)
|
||||
color := make(map[string]int)
|
||||
for key := range g.adj {
|
||||
color[key] = white
|
||||
}
|
||||
|
||||
var dfs func(string) bool
|
||||
dfs = func(u string) bool {
|
||||
color[u] = gray
|
||||
for _, depRef := range g.adj[u] {
|
||||
v, _ := g.ResolveRef(depRef, nodeIDFromKey(u))
|
||||
if color[v] == gray {
|
||||
return true
|
||||
}
|
||||
if color[v] == white && dfs(v) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
color[u] = black
|
||||
return false
|
||||
}
|
||||
|
||||
for key := range g.adj {
|
||||
if color[key] == white {
|
||||
if dfs(key) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// TopologicalSort returns services in startup order (dependencies first).
|
||||
// Returns a flat list of "nodeID:serviceName" keys.
|
||||
func (g *Graph) TopologicalSort() ([]string, error) {
|
||||
if g.HasCycle() {
|
||||
return nil, fmt.Errorf("dependency cycle detected")
|
||||
}
|
||||
|
||||
var result []string
|
||||
visited := make(map[string]bool)
|
||||
|
||||
var dfs func(string)
|
||||
dfs = func(u string) {
|
||||
if visited[u] {
|
||||
return
|
||||
}
|
||||
visited[u] = true
|
||||
for _, depRef := range g.adj[u] {
|
||||
v, _ := g.ResolveRef(depRef, nodeIDFromKey(u))
|
||||
dfs(v)
|
||||
}
|
||||
result = append(result, u)
|
||||
}
|
||||
|
||||
keys := make([]string, 0, len(g.adj))
|
||||
for k := range g.adj {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
|
||||
for _, k := range keys {
|
||||
dfs(k)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// GetNode returns a node by ID.
|
||||
func (g *Graph) GetNode(id string) (*Node, bool) {
|
||||
n, ok := g.nodes[id]
|
||||
return n, ok
|
||||
}
|
||||
|
||||
// GetService returns a service by node ID and name.
|
||||
func (g *Graph) GetService(nodeID, name string) (*Service, bool) {
|
||||
node, ok := g.nodes[nodeID]
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
for _, s := range node.Services {
|
||||
if s.Name == name {
|
||||
return s, true
|
||||
}
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// Nodes returns all nodes sorted by ID.
|
||||
func (g *Graph) Nodes() []*Node {
|
||||
result := make([]*Node, 0, len(g.nodes))
|
||||
for _, n := range g.nodes {
|
||||
result = append(result, n)
|
||||
}
|
||||
sort.Slice(result, func(i, j int) bool {
|
||||
return result[i].ID < result[j].ID
|
||||
})
|
||||
return result
|
||||
}
|
||||
|
||||
// nodeIDFromKey extracts the node ID from a "nodeID:serviceName" key.
|
||||
func nodeIDFromKey(key string) string {
|
||||
for i := 0; i < len(key); i++ {
|
||||
if key[i] == ':' {
|
||||
return key[:i]
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
@@ -0,0 +1,135 @@
|
||||
package graph
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// yamlNode is the intermediate YAML representation of a node.
|
||||
type yamlNode struct {
|
||||
Services map[string]yamlService `yaml:"services"`
|
||||
}
|
||||
|
||||
// yamlService is the intermediate YAML representation of a service.
|
||||
type yamlService struct {
|
||||
DependsOn yamlDependsOn `yaml:"depends_on"`
|
||||
}
|
||||
|
||||
// yamlDependsOn supports both short form (list of strings) and long form (map with conditions).
|
||||
type yamlDependsOn struct {
|
||||
simple []string
|
||||
detail map[string]yamlDepCondition
|
||||
}
|
||||
|
||||
type yamlDepCondition struct {
|
||||
Condition DepCondition `yaml:"condition"`
|
||||
}
|
||||
|
||||
func (d *yamlDependsOn) UnmarshalYAML(value *yaml.Node) error {
|
||||
switch value.Kind {
|
||||
case yaml.SequenceNode:
|
||||
var names []string
|
||||
if err := value.Decode(&names); err != nil {
|
||||
return err
|
||||
}
|
||||
d.simple = names
|
||||
return nil
|
||||
case yaml.MappingNode:
|
||||
d.detail = make(map[string]yamlDepCondition)
|
||||
if err := value.Decode(&d.detail); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("depends_on must be a list or mapping, got %v", value.Kind)
|
||||
}
|
||||
}
|
||||
|
||||
// parseServiceRef parses a reference like "redis" or "infra:redis".
|
||||
func parseServiceRef(ref string) ServiceRef {
|
||||
parts := strings.SplitN(ref, ":", 2)
|
||||
if len(parts) == 2 {
|
||||
return ServiceRef{NodeID: parts[0], Name: parts[1]}
|
||||
}
|
||||
return ServiceRef{Name: parts[0]}
|
||||
}
|
||||
|
||||
// ParseYAML parses a node/service dependency graph from YAML bytes.
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// nodes:
|
||||
// server1:
|
||||
// services:
|
||||
// web:
|
||||
// agent_id: agent-1
|
||||
// depends_on:
|
||||
// - redis
|
||||
// - infra:cache
|
||||
// api:
|
||||
// depends_on:
|
||||
// redis:
|
||||
// condition: healthy
|
||||
// infra:
|
||||
// services:
|
||||
// cache:
|
||||
// db:
|
||||
func ParseYAML(data []byte) (*Graph, error) {
|
||||
var raw struct {
|
||||
Nodes map[string]yamlNode `yaml:"nodes"`
|
||||
}
|
||||
|
||||
if err := yaml.Unmarshal(data, &raw); err != nil {
|
||||
return nil, fmt.Errorf("parse yaml: %w", err)
|
||||
}
|
||||
|
||||
g := New()
|
||||
|
||||
// Phase 1: register all nodes and services
|
||||
for nodeID, yn := range raw.Nodes {
|
||||
g.AddNode(nodeID)
|
||||
for svcName := range yn.Services {
|
||||
g.AddService(nodeID, &Service{Name: svcName})
|
||||
}
|
||||
}
|
||||
|
||||
// Phase 2: wire dependencies
|
||||
for nodeID, yn := range raw.Nodes {
|
||||
for svcName, ys := range yn.Services {
|
||||
// Short form
|
||||
for _, ref := range ys.DependsOn.simple {
|
||||
target := parseServiceRef(ref)
|
||||
if err := g.AddDependency(nodeID, svcName, Dependency{
|
||||
Target: target,
|
||||
Condition: Started,
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
// Long form
|
||||
for ref, cond := range ys.DependsOn.detail {
|
||||
target := parseServiceRef(ref)
|
||||
if err := g.AddDependency(nodeID, svcName, Dependency{
|
||||
Target: target,
|
||||
Condition: cond.Condition,
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return g, nil
|
||||
}
|
||||
|
||||
// ParseYAMLFile reads and parses from a file.
|
||||
func ParseYAMLFile(path string) (*Graph, error) {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ParseYAML(data)
|
||||
}
|
||||
@@ -0,0 +1,149 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"gitea.d3m0k1d.ru/d3m0k1d/HellreigN/backend/internal/graph"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// GraphHandlers manages the service dependency graph.
|
||||
type GraphHandlers struct {
|
||||
path string
|
||||
mu sync.RWMutex
|
||||
yamlData []byte
|
||||
loaded *graph.Graph
|
||||
}
|
||||
|
||||
// NewGraphHandlers loads the graph from the given YAML file path.
|
||||
func NewGraphHandlers(yamlPath string) *GraphHandlers {
|
||||
h := &GraphHandlers{path: yamlPath}
|
||||
if err := h.reload(); err != nil {
|
||||
if _, ok := err.(*os.PathError); ok {
|
||||
log.Printf("[graph] no graph file at %q, starting with empty graph", yamlPath)
|
||||
h.loaded = graph.New()
|
||||
h.yamlData = []byte("nodes: {}\n")
|
||||
} else {
|
||||
log.Fatalf("[graph] failed to load graph from %q: %v", yamlPath, err)
|
||||
}
|
||||
}
|
||||
return h
|
||||
}
|
||||
|
||||
func (h *GraphHandlers) reload() error {
|
||||
data, err := os.ReadFile(h.path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
g, err := graph.ParseYAML(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
h.mu.Lock()
|
||||
h.yamlData = data
|
||||
h.loaded = g
|
||||
h.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetGraph returns the current parsed graph.
|
||||
func (h *GraphHandlers) GetGraph() *graph.Graph {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
return h.loaded
|
||||
}
|
||||
|
||||
// GetYAML returns the raw YAML content.
|
||||
// @Summary Get dependency graph YAML
|
||||
// @Description Returns the service dependency graph as raw YAML text
|
||||
// @Tags graph
|
||||
// @Produce plain
|
||||
// @Success 200 {string} string "YAML content"
|
||||
// @Security Bearer
|
||||
// @Router /graph [get]
|
||||
func (h *GraphHandlers) GetYAML(c *gin.Context) {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
c.Data(http.StatusOK, "text/yaml", h.yamlData)
|
||||
}
|
||||
|
||||
// UpdateYAML updates the graph from new YAML text.
|
||||
// @Summary Update dependency graph YAML
|
||||
// @Description Replaces the service dependency graph YAML and reloads it
|
||||
// @Tags graph
|
||||
// @Accept plain
|
||||
// @Produce json
|
||||
// @Param body body string true "New YAML content"
|
||||
// @Success 200 {object} map[string]string
|
||||
// @Failure 400 {object} map[string]string
|
||||
// @Security Bearer
|
||||
// @Router /graph [put]
|
||||
func (h *GraphHandlers) UpdateYAML(c *gin.Context) {
|
||||
body, err := io.ReadAll(c.Request.Body)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "failed to read body"})
|
||||
return
|
||||
}
|
||||
|
||||
g, err := graph.ParseYAML(body)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
if err := os.WriteFile(h.path, body, 0o644); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to write graph file"})
|
||||
return
|
||||
}
|
||||
|
||||
h.mu.Lock()
|
||||
h.yamlData = body
|
||||
h.loaded = g
|
||||
h.mu.Unlock()
|
||||
|
||||
log.Printf("[graph] updated graph from admin, saved to %s", h.path)
|
||||
c.JSON(http.StatusOK, gin.H{"message": "graph updated"})
|
||||
}
|
||||
|
||||
// StartupOrder returns the computed service startup order.
|
||||
// @Summary Get startup order
|
||||
// @Description Returns the topologically sorted service startup order
|
||||
// @Tags graph
|
||||
// @Produce json
|
||||
// @Success 200 {array} string
|
||||
// @Failure 400 {object} map[string]string
|
||||
// @Security Bearer
|
||||
// @Router /graph/order [get]
|
||||
func (h *GraphHandlers) StartupOrder(c *gin.Context) {
|
||||
h.mu.RLock()
|
||||
g := h.loaded
|
||||
h.mu.RUnlock()
|
||||
|
||||
order, err := g.TopologicalSort()
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, order)
|
||||
}
|
||||
|
||||
// CycleCheck checks if the graph has cycles.
|
||||
// @Summary Check for cycles
|
||||
// @Description Returns whether the dependency graph contains cycles
|
||||
// @Tags graph
|
||||
// @Produce json
|
||||
// @Success 200 {object} map[string]bool
|
||||
// @Security Bearer
|
||||
// @Router /graph/cycle [get]
|
||||
func (h *GraphHandlers) CycleCheck(c *gin.Context) {
|
||||
h.mu.RLock()
|
||||
g := h.loaded
|
||||
h.mu.RUnlock()
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"has_cycle": g.HasCycle()})
|
||||
}
|
||||
Reference in New Issue
Block a user