mirror of
https://github.com/shankar0123/certctl.git
synced 2026-06-07 18:01:37 +00:00
0725713e19
Operator decision answered as full soft-delete with optional forced
cascade — hard-delete is not reachable from any public surface. Prior
to this commit, DELETE /agents/{id} ran a plain `DELETE FROM agents`
whose schema-level `ON DELETE CASCADE` on deployment_targets.agent_id
silently wiped every target, orphaning certs and aborting in-flight
jobs. The finding closure reshapes the agent-removal contract around
soft retirement with explicit preflight counts, an opt-in cascade
gated by a mandatory reason, and unconditional protection for the
four reserved sentinel agents used by discovery sources.
Schema — migration 000015:
migrations/000015_agent_retire.up.sql flips
deployment_targets_agent_id_fkey from ON DELETE CASCADE to ON DELETE
RESTRICT, so a stray `DELETE FROM agents` now errors at the DB
boundary instead of quietly destroying targets. Both `agents` and
`deployment_targets` grow a retired_at TIMESTAMPTZ + retired_reason
TEXT pair (TEXT not VARCHAR so operator comments are never
truncated), indexed via partial indexes WHERE retired_at IS NOT
NULL. The migration is self-healing (ADD COLUMN IF NOT EXISTS, DROP
CONSTRAINT IF EXISTS then ADD CONSTRAINT, CREATE INDEX IF NOT
EXISTS) so repeated runs against partially-migrated databases
converge. migrations/000015_agent_retire.down.sql restores CASCADE
and drops the new columns for clean rollback. A dedicated
repository-layer testcontainers test
(internal/repository/postgres/migration_000015_test.go) asserts the
before/after FK action, column presence, index presence, and
round-trip idempotency under up→down→up.
Domain — sentinel guard + dependency counts:
internal/domain/connector.go gains IsRetired() on Agent, the
exported SentinelAgentIDs slice listing server-scanner,
cloud-aws-sm, cloud-azure-kv, cloud-gcp-sm verbatim (matching the
four reserved IDs documented in CLAUDE.md and created at startup in
cmd/server/main.go), IsSentinelAgent(id string) predicate,
AgentDependencyCounts{ActiveTargets, ActiveCertificates,
PendingJobs} with a HasDependencies() method, and ActorTypeAgent /
ActorTypeSystem enum values used by audit emission downstream.
Coverage locked down by internal/domain/connector_test.go.
Service — 8-step ordered contract:
internal/service/agent_retire.go:RetireAgent(ctx, id, actor,
opts{Force, Reason}) enforces a fixed execution order:
(1) sentinel guard — IsSentinelAgent(id) returns ErrAgentIsSentinel
unconditionally; force=true does NOT bypass it.
(2) fetch — ErrAgentNotFound on miss.
(3) idempotency — if IsRetired() already, return
AgentRetirementResult{AlreadyRetired: true} with no new audit
event and no state change (safe to replay from flaky clients).
(4) preflight counts — collectAgentDependencyCounts runs
ActiveTargets, ActiveCertificates, PendingJobs sequentially
(not in parallel; keeps the per-query timeout predictable and
matches the repo's existing call-chain shape).
(5) force-reason guard — opts.Force=true with empty Reason returns
ErrForceReasonRequired (wired into the 400 status surface).
(6) dependency guard — HasDependencies() with opts.Force=false
returns BlockedByDependenciesError{Counts} (wired into the 409
body with per-bucket counts).
(7) mutation — single pinned retiredAt := time.Now(); agent
retirement first, then cascade target retirement if opts.Force,
all under the repo's single transaction so the two retired_at
stamps match to the second.
(8) best-effort audit — agent_retired always; agent_retirement_
cascaded additionally on the force path. Actor is whatever the
handler resolves from the request; actor type is mapped by
resolveActorType (system/agent-prefix→Agent/else→User). Audit
emission failures are logged via slog.Error but do not abort
the retirement (matches the house convention used by every
other scheduler-emitted event).
BlockedByDependenciesError implements Error() as
"active_targets=%d, active_certificates=%d, pending_jobs=%d" and
Unwrap() → ErrBlockedByDependencies. The single struct satisfies
errors.Is via Unwrap (used by scheduler-level tests) and errors.As
via the concrete type (used by the handler to fish out Counts for
the 409 body). ListRetiredAgents(page, perPage) adds a separate
paginated accessor with page<1→1 and perPage<1→50 normalization so
retired rows are queryable without polluting the default agent
listing.
Sentinel guard coverage is asymmetric by design: all four reserved
IDs are protected, and force=true cannot override. Regression tests
in internal/service/agent_retire_test.go assert each of the eight
steps in order, plus sentinel bypass attempts and idempotency
replay.
Handler + router — status-code surface:
internal/api/handler/agents.go:RetireAgent exposes seven status
codes on DELETE /agents/{id}:
200 on a fresh retirement (body echoes AgentRetirementResult).
204 on idempotent replay (AlreadyRetired=true; no new audit).
400 on ErrForceReasonRequired.
403 on ErrAgentIsSentinel.
404 on ErrAgentNotFound.
409 on BlockedByDependenciesError, with a custom body shape
{error, counts{active_targets, active_certificates,
pending_jobs}} that bypasses the default ErrorWithRequestID
envelope so callers get the per-bucket numbers directly.
500 on any other error.
Heartbeat HandleHeartbeat returns 410 Gone when the agent is
retired (ErrAgentRetired), signalling the agent to shut down.
Query params `force=true` and `reason=<text>` drive the cascade
path; both are forwarded as url.Values through the new MCP
transport.
internal/api/router/router.go registers GET /api/v1/agents/retired
literal-path BEFORE /api/v1/agents/{id} — Go 1.22 ServeMux's
literal-beats-pattern-var precedence routes "retired" to the
paginated retired-agents listing instead of fetching a hypothetical
agent named "retired".
Agent binary — clean shutdown on 410:
cmd/agent/main.go gains the ErrAgentRetired sentinel, a
retiredOnce sync.Once, and a retiredSignal chan struct{}. A
markRetired(source, statusCode, body) helper closes the channel
exactly once; the Run() select loop observes the close and returns
ErrAgentRetired; main() matches via errors.Is(err, ErrAgentRetired)
and exits cleanly instead of spinning in the heartbeat retry loop.
The 410 Gone surface is therefore terminal for the agent process.
MCP transport:
internal/mcp/client.go adds Client.DeleteWithQuery(path, query),
a new additive transport method. Client.Delete is path-only; without
this method the retire tool would silently drop `force` and `reason`,
turning every cascade retire into a default soft-retire. The new
method shares do()'s 204 normalization and 4xx/5xx error
propagation so tool authors get one contract.
internal/mcp/tools.go + internal/mcp/types.go expose the
retire_agent tool with Force+Reason inputs wired through
DeleteWithQuery.
CLI:
cmd/cli/main.go + internal/cli/client.go add two CLI surfaces:
`agents list --retired` (client-side strip of --retired then
delegation to ListRetiredAgents, sharing --page/--per-page parsing
with the default listing) and `agents retire <id> [--force --reason
"…"]` (mirrors ErrForceReasonRequired — force without reason is
rejected client-side before the request is sent). JSON + table
output modes both honor the new columns.
Frontend:
web/src/pages/AgentsPage.tsx surfaces retired/retire affordances.
web/src/api/client.ts + web/src/api/types.ts expose the retire
endpoint and the retired-listing. 4 new Vitest regression cases.
OpenAPI:
api/openapi.yaml documents DELETE /agents/{id} with all seven
status codes, 410 on heartbeat, and the 409 per-bucket body shape.
Regression coverage (six new test files, all green):
internal/service/agent_retire_test.go — 8-step contract + sentinel guards
internal/api/handler/agent_retire_handler_test.go — 7-status-code surface + 410 heartbeat
internal/mcp/retire_agent_test.go — DeleteWithQuery wire-through
internal/cli/agent_retire_test.go — --retired listing + --force/--reason pairing
internal/repository/postgres/migration_000015_test.go — FK flip + columns + indexes + up↔down
internal/domain/connector_test.go — IsRetired, IsSentinelAgent, SentinelAgentIDs, HasDependencies
Files:
api/openapi.yaml — DELETE + 410 + 409 body shape
cmd/agent/main.go — ErrAgentRetired, markRetired, retiredSignal
cmd/cli/main.go — handleAgents list/get/retire dispatch
docs/architecture.md, docs/concepts.md,
docs/testing-guide.md — retirement contract narrative
internal/api/handler/agents.go — RetireAgent, status surface, 410 on heartbeat
internal/api/handler/agent_handler_test.go — extended coverage
internal/api/handler/agent_retire_handler_test.go — new
internal/api/router/router.go — /agents/retired before /agents/{id}
internal/cli/agent_retire_test.go — new
internal/cli/client.go — ListRetiredAgents + RetireAgent
internal/domain/connector.go — IsRetired, SentinelAgentIDs,
IsSentinelAgent, AgentDependencyCounts,
ActorTypeAgent/System
internal/domain/connector_test.go — new
internal/integration/lifecycle_test.go — retirement fixture
internal/mcp/client.go — DeleteWithQuery additive transport
internal/mcp/retire_agent_test.go — new
internal/mcp/tools.go, internal/mcp/types.go — retire_agent tool + Force/Reason inputs
internal/repository/interfaces.go — AgentRepository retirement methods
internal/repository/postgres/agent.go — retire + cascade target retire + counts
internal/repository/postgres/migration_000015_test.go — new
internal/service/agent.go — wire into AgentService surface
internal/service/agent_retire.go — new 8-step contract
internal/service/agent_retire_test.go — new
internal/service/deployment.go — skip retired agents
internal/service/target.go — skip retired agents
internal/service/testutil_test.go — shared mocks extended
migrations/000015_agent_retire.up.sql — new
migrations/000015_agent_retire.down.sql — new
web/src/api/client.ts, types.ts + tests — retire endpoint wiring
web/src/pages/AgentsPage.tsx — retire UI
529 lines
17 KiB
Go
529 lines
17 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rand"
|
|
"crypto/sha256"
|
|
"encoding/base64"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/shankar0123/certctl/internal/domain"
|
|
"github.com/shankar0123/certctl/internal/repository"
|
|
)
|
|
|
|
// AgentService provides business logic for managing and coordinating with agents.
|
|
type AgentService struct {
|
|
agentRepo repository.AgentRepository
|
|
certRepo repository.CertificateRepository
|
|
jobRepo repository.JobRepository
|
|
targetRepo repository.TargetRepository
|
|
profileRepo repository.CertificateProfileRepository
|
|
auditService *AuditService
|
|
issuerRegistry *IssuerRegistry
|
|
renewalService *RenewalService
|
|
}
|
|
|
|
// NewAgentService creates a new agent service.
|
|
func NewAgentService(
|
|
agentRepo repository.AgentRepository,
|
|
certRepo repository.CertificateRepository,
|
|
jobRepo repository.JobRepository,
|
|
targetRepo repository.TargetRepository,
|
|
auditService *AuditService,
|
|
issuerRegistry *IssuerRegistry,
|
|
renewalService *RenewalService,
|
|
) *AgentService {
|
|
return &AgentService{
|
|
agentRepo: agentRepo,
|
|
certRepo: certRepo,
|
|
jobRepo: jobRepo,
|
|
targetRepo: targetRepo,
|
|
auditService: auditService,
|
|
issuerRegistry: issuerRegistry,
|
|
renewalService: renewalService,
|
|
}
|
|
}
|
|
|
|
// SetProfileRepo sets the profile repository for EKU resolution during CSR signing.
|
|
func (s *AgentService) SetProfileRepo(repo repository.CertificateProfileRepository) {
|
|
s.profileRepo = repo
|
|
}
|
|
|
|
// Register creates a new agent and returns its API key (only once).
|
|
func (s *AgentService) Register(ctx context.Context, name string, hostname string) (*domain.Agent, string, error) {
|
|
if name == "" || hostname == "" {
|
|
return nil, "", fmt.Errorf("agent name and hostname are required")
|
|
}
|
|
|
|
// Generate API key. crypto/rand failure is non-recoverable — propagate immediately.
|
|
apiKey, err := generateAPIKey()
|
|
if err != nil {
|
|
return nil, "", fmt.Errorf("failed to generate agent api key: %w", err)
|
|
}
|
|
apiKeyHash := hashAPIKey(apiKey)
|
|
|
|
now := time.Now()
|
|
agent := &domain.Agent{
|
|
ID: generateID("agent"),
|
|
Name: name,
|
|
Hostname: hostname,
|
|
APIKeyHash: apiKeyHash,
|
|
Status: domain.AgentStatusOnline,
|
|
RegisteredAt: now,
|
|
LastHeartbeatAt: &now,
|
|
}
|
|
|
|
if err := s.agentRepo.Create(ctx, agent); err != nil {
|
|
return nil, "", fmt.Errorf("failed to create agent: %w", err)
|
|
}
|
|
|
|
// Record audit event
|
|
if err := s.auditService.RecordEvent(ctx, "system", domain.ActorTypeSystem,
|
|
"agent_registered", "agent", agent.ID,
|
|
map[string]interface{}{"name": name, "hostname": hostname}); err != nil {
|
|
slog.Error("failed to record audit event", "error", err)
|
|
}
|
|
|
|
// Return the API key only once; the agent must save it securely
|
|
return agent, apiKey, nil
|
|
}
|
|
|
|
// Heartbeat updates an agent's last seen time, status, and metadata.
|
|
//
|
|
// I-004: retired agents must be rejected up-front. A retired agent that is
|
|
// still polling is a zombie — its row exists only for audit history and must
|
|
// not be allowed to bump LastHeartbeatAt (which would resurrect it in stats
|
|
// dashboards and stale-offline sweeps). The sentinel ErrAgentRetired is
|
|
// returned unwrapped so the HTTP handler can map it to 410 Gone via
|
|
// errors.Is; the agent process detects the 410 and shuts down cleanly
|
|
// instead of continuing to heartbeat indefinitely.
|
|
func (s *AgentService) Heartbeat(ctx context.Context, agentID string, metadata *domain.AgentMetadata) error {
|
|
agent, err := s.agentRepo.Get(ctx, agentID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to fetch agent: %w", err)
|
|
}
|
|
|
|
// I-004 guard: retired agents are frozen. Do not call UpdateHeartbeat —
|
|
// bumping the timestamp would defeat the retired-row filter that protects
|
|
// stats, scheduler sweeps, and handler listings.
|
|
if agent.IsRetired() {
|
|
return ErrAgentRetired
|
|
}
|
|
|
|
// Update heartbeat and metadata
|
|
if err := s.agentRepo.UpdateHeartbeat(ctx, agentID, metadata); err != nil {
|
|
return fmt.Errorf("failed to update heartbeat: %w", err)
|
|
}
|
|
|
|
// Update status if previously offline
|
|
if agent.Status != domain.AgentStatusOnline {
|
|
agent.Status = domain.AgentStatusOnline
|
|
if err := s.agentRepo.Update(ctx, agent); err != nil {
|
|
slog.Error("failed to update agent status", "error", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SubmitCSR validates and processes a Certificate Signing Request from an agent.
|
|
// In agent keygen mode, this completes an AwaitingCSR renewal job by signing the CSR
|
|
// and storing the cert version. The private key stays on the agent — only the CSR
|
|
// (public key) reaches the server.
|
|
func (s *AgentService) SubmitCSR(ctx context.Context, agentID string, certID string, csrPEM []byte) error {
|
|
// Fetch agent
|
|
agent, err := s.agentRepo.Get(ctx, agentID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to fetch agent: %w", err)
|
|
}
|
|
|
|
// Validate CSR format
|
|
if len(csrPEM) == 0 {
|
|
return fmt.Errorf("invalid CSR: empty")
|
|
}
|
|
|
|
if certID != "" {
|
|
cert, err := s.certRepo.Get(ctx, certID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to fetch certificate: %w", err)
|
|
}
|
|
|
|
// Check for AwaitingCSR jobs first (agent keygen mode)
|
|
if s.renewalService != nil {
|
|
awaitingJobs, err := s.renewalService.GetAwaitingCSRJobs(ctx, certID)
|
|
if err == nil && len(awaitingJobs) > 0 {
|
|
// Complete the renewal via the renewal service (signs CSR, stores version, creates deploy jobs)
|
|
if err := s.renewalService.CompleteAgentCSRRenewal(ctx, awaitingJobs[0], cert, string(csrPEM)); err != nil {
|
|
return fmt.Errorf("failed to complete agent CSR renewal: %w", err)
|
|
}
|
|
|
|
// Record audit event
|
|
if auditErr := s.auditService.RecordEvent(ctx, agent.ID, domain.ActorTypeAgent,
|
|
"csr_submitted", "certificate", certID,
|
|
map[string]interface{}{
|
|
"agent_hostname": agent.Hostname,
|
|
"keygen_mode": "agent",
|
|
"job_id": awaitingJobs[0].ID,
|
|
}); auditErr != nil {
|
|
slog.Error("failed to record audit event", "error", auditErr)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Fallback: direct issuer signing (no AwaitingCSR job — ad-hoc CSR submission)
|
|
connector, ok := s.issuerRegistry.Get(cert.IssuerID)
|
|
if ok {
|
|
// Resolve profile for EKU resolution and crypto policy enforcement
|
|
var ekus []string
|
|
var profile *domain.CertificateProfile
|
|
if cert.CertificateProfileID != "" && s.profileRepo != nil {
|
|
if p, profileErr := s.profileRepo.Get(ctx, cert.CertificateProfileID); profileErr == nil && p != nil {
|
|
profile = p
|
|
ekus = profile.AllowedEKUs
|
|
}
|
|
}
|
|
|
|
// Validate CSR key algorithm/size against profile (crypto policy enforcement)
|
|
csrInfo, csrErr := ValidateCSRAgainstProfile(string(csrPEM), profile)
|
|
if csrErr != nil {
|
|
return fmt.Errorf("CSR validation failed: %w", csrErr)
|
|
}
|
|
|
|
// Resolve MaxTTL from profile
|
|
var maxTTLSeconds int
|
|
if profile != nil {
|
|
maxTTLSeconds = profile.MaxTTLSeconds
|
|
}
|
|
|
|
result, err := connector.IssueCertificate(ctx, cert.CommonName, cert.SANs, string(csrPEM), ekus, maxTTLSeconds)
|
|
if err != nil {
|
|
return fmt.Errorf("issuer signing failed: %w", err)
|
|
}
|
|
|
|
version := &domain.CertificateVersion{
|
|
ID: generateID("certver"),
|
|
CertificateID: certID,
|
|
SerialNumber: result.Serial,
|
|
NotBefore: result.NotBefore,
|
|
NotAfter: result.NotAfter,
|
|
FingerprintSHA256: computeCertFingerprint(result.CertPEM),
|
|
PEMChain: result.CertPEM + "\n" + result.ChainPEM,
|
|
CSRPEM: string(csrPEM),
|
|
CreatedAt: time.Now(),
|
|
}
|
|
if csrInfo != nil {
|
|
version.KeyAlgorithm = csrInfo.KeyAlgorithm
|
|
version.KeySize = csrInfo.KeySize
|
|
}
|
|
|
|
if err := s.certRepo.CreateVersion(ctx, version); err != nil {
|
|
return fmt.Errorf("failed to store certificate version: %w", err)
|
|
}
|
|
|
|
cert.Status = domain.CertificateStatusActive
|
|
cert.ExpiresAt = result.NotAfter
|
|
now := time.Now()
|
|
cert.LastRenewalAt = &now
|
|
cert.UpdatedAt = now
|
|
if err := s.certRepo.Update(ctx, cert); err != nil {
|
|
slog.Error("failed to update certificate", "error", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Record audit event
|
|
if auditErr := s.auditService.RecordEvent(ctx, agent.ID, domain.ActorTypeAgent,
|
|
"csr_submitted", "certificate", certID,
|
|
map[string]interface{}{"agent_hostname": agent.Hostname}); auditErr != nil {
|
|
slog.Error("failed to record audit event", "error", auditErr)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetCertificateForAgent returns the latest public certificate material for an agent.
|
|
func (s *AgentService) GetCertificateForAgent(ctx context.Context, agentID string, certID string) ([]byte, error) {
|
|
// Fetch agent
|
|
_, err := s.agentRepo.Get(ctx, agentID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to fetch agent: %w", err)
|
|
}
|
|
|
|
// Get latest version
|
|
versions, err := s.certRepo.ListVersions(ctx, certID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to fetch certificate versions: %w", err)
|
|
}
|
|
|
|
if len(versions) == 0 {
|
|
return nil, fmt.Errorf("no certificate versions found")
|
|
}
|
|
|
|
// Return the most recent version (latest CreatedAt timestamp)
|
|
latestVersion := versions[0]
|
|
for _, v := range versions {
|
|
if v.CreatedAt.After(latestVersion.CreatedAt) {
|
|
latestVersion = v
|
|
}
|
|
}
|
|
|
|
// Record audit event
|
|
if err := s.auditService.RecordEvent(ctx, agentID, domain.ActorTypeAgent,
|
|
"certificate_retrieved", "certificate", certID,
|
|
map[string]interface{}{"version": latestVersion.SerialNumber}); err != nil {
|
|
slog.Error("failed to record audit event", "error", err)
|
|
}
|
|
|
|
return []byte(latestVersion.PEMChain), nil
|
|
}
|
|
|
|
// GetPendingWork returns actionable jobs for an agent: deployment jobs (Pending) and
|
|
// renewal/issuance jobs awaiting CSR submission (AwaitingCSR).
|
|
// Jobs are scoped to the requesting agent via agent_id (set at job creation) or
|
|
// through target→agent relationships for legacy jobs and AwaitingCSR routing.
|
|
func (s *AgentService) GetPendingWork(ctx context.Context, agentID string) ([]*domain.Job, error) {
|
|
// Verify agent exists
|
|
_, err := s.agentRepo.Get(ctx, agentID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to fetch agent: %w", err)
|
|
}
|
|
|
|
// Atomically claim jobs assigned to this agent. H-6 (CWE-362) remediation:
|
|
// ClaimPendingByAgentID uses SELECT ... FOR UPDATE SKIP LOCKED so concurrent poll
|
|
// requests (duplicate agents, retry storms, or a lagging long-poll) never observe
|
|
// the same Pending deployment row. Pending deployments are flipped to Running inside
|
|
// the claim transaction; AwaitingCSR jobs keep their state since CSR submission is
|
|
// the state-machine trigger for their next transition.
|
|
return s.jobRepo.ClaimPendingByAgentID(ctx, agentID)
|
|
}
|
|
|
|
// ReportJobStatus updates a job's status based on agent feedback.
|
|
func (s *AgentService) ReportJobStatus(ctx context.Context, agentID string, jobID string, status domain.JobStatus, errMsg string) error {
|
|
// Fetch job to verify it exists
|
|
_, err := s.jobRepo.Get(ctx, jobID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to fetch job: %w", err)
|
|
}
|
|
|
|
// Update job status
|
|
if err := s.jobRepo.UpdateStatus(ctx, jobID, status, errMsg); err != nil {
|
|
return fmt.Errorf("failed to update job status: %w", err)
|
|
}
|
|
|
|
// Record audit event
|
|
if err := s.auditService.RecordEvent(ctx, agentID, domain.ActorTypeAgent,
|
|
"job_status_reported", "job", jobID,
|
|
map[string]interface{}{"status": status, "error": errMsg}); err != nil {
|
|
slog.Error("failed to record audit event", "error", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// MarkStaleAgentsOffline marks agents as offline if they haven't sent a heartbeat
|
|
// within the given threshold duration.
|
|
func (s *AgentService) MarkStaleAgentsOffline(ctx context.Context, threshold time.Duration) error {
|
|
agents, err := s.agentRepo.List(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to list agents: %w", err)
|
|
}
|
|
|
|
cutoff := time.Now().Add(-threshold)
|
|
for _, agent := range agents {
|
|
if agent.Status == domain.AgentStatusOnline && agent.LastHeartbeatAt != nil && agent.LastHeartbeatAt.Before(cutoff) {
|
|
agent.Status = domain.AgentStatusOffline
|
|
if err := s.agentRepo.Update(ctx, agent); err != nil {
|
|
slog.Error("failed to mark agent offline", "agent_id", agent.ID, "error", err)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetAgentByAPIKey retrieves an agent by hashed API key.
|
|
func (s *AgentService) GetAgentByAPIKey(ctx context.Context, apiKey string) (*domain.Agent, error) {
|
|
apiKeyHash := hashAPIKey(apiKey)
|
|
agent, err := s.agentRepo.GetByAPIKey(ctx, apiKeyHash)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid API key: %w", err)
|
|
}
|
|
return agent, nil
|
|
}
|
|
|
|
// ListAgents returns paginated agents (handler interface method).
|
|
func (s *AgentService) ListAgents(ctx context.Context, page, perPage int) ([]domain.Agent, int64, error) {
|
|
if page < 1 {
|
|
page = 1
|
|
}
|
|
if perPage < 1 {
|
|
perPage = 50
|
|
}
|
|
|
|
agents, err := s.agentRepo.List(ctx)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("failed to list agents: %w", err)
|
|
}
|
|
|
|
total := int64(len(agents))
|
|
start := (page - 1) * perPage
|
|
if start >= int(total) {
|
|
return nil, total, nil
|
|
}
|
|
end := start + perPage
|
|
if end > int(total) {
|
|
end = int(total)
|
|
}
|
|
|
|
var result []domain.Agent
|
|
for _, a := range agents[start:end] {
|
|
if a != nil {
|
|
result = append(result, *a)
|
|
}
|
|
}
|
|
|
|
return result, total, nil
|
|
}
|
|
|
|
// GetAgent returns a single agent (handler interface method).
|
|
func (s *AgentService) GetAgent(ctx context.Context, id string) (*domain.Agent, error) {
|
|
return s.agentRepo.Get(ctx, id)
|
|
}
|
|
|
|
// RegisterAgent creates and registers a new agent (handler interface method).
|
|
func (s *AgentService) RegisterAgent(ctx context.Context, agent domain.Agent) (*domain.Agent, error) {
|
|
agent.ID = generateID("agent")
|
|
apiKey, err := generateAPIKey()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to generate agent api key: %w", err)
|
|
}
|
|
agent.APIKeyHash = hashAPIKey(apiKey)
|
|
agent.Status = domain.AgentStatusOnline
|
|
now := time.Now()
|
|
agent.RegisteredAt = now
|
|
agent.LastHeartbeatAt = &now
|
|
|
|
if err := s.agentRepo.Create(ctx, &agent); err != nil {
|
|
return nil, fmt.Errorf("failed to register agent: %w", err)
|
|
}
|
|
return &agent, nil
|
|
}
|
|
|
|
// CSRSubmit processes a CSR submission from an agent (handler interface method).
|
|
// The csrPEM parameter contains "certID:csrPEM" or just the CSR PEM.
|
|
func (s *AgentService) CSRSubmit(ctx context.Context, agentID string, csrPEM string) (string, error) {
|
|
err := s.SubmitCSR(ctx, agentID, "", []byte(csrPEM))
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return "csr_accepted", nil
|
|
}
|
|
|
|
// CSRSubmitForCert processes a CSR submission for a specific certificate (handler interface method).
|
|
func (s *AgentService) CSRSubmitForCert(ctx context.Context, agentID string, certID string, csrPEM string) (string, error) {
|
|
err := s.SubmitCSR(ctx, agentID, certID, []byte(csrPEM))
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return "csr_signed", nil
|
|
}
|
|
|
|
// GetWork returns pending deployment jobs for an agent (handler interface method).
|
|
func (s *AgentService) GetWork(ctx context.Context, agentID string) ([]domain.Job, error) {
|
|
jobs, err := s.GetPendingWork(ctx, agentID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var result []domain.Job
|
|
for _, j := range jobs {
|
|
if j != nil {
|
|
result = append(result, *j)
|
|
}
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// GetWorkWithTargets returns actionable jobs enriched with target/certificate details.
|
|
// Deployment jobs include target type + config. AwaitingCSR jobs include common name + SANs
|
|
// so the agent knows what CSR to generate.
|
|
func (s *AgentService) GetWorkWithTargets(ctx context.Context, agentID string) ([]domain.WorkItem, error) {
|
|
jobs, err := s.GetPendingWork(ctx, agentID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var items []domain.WorkItem
|
|
for _, j := range jobs {
|
|
if j == nil {
|
|
continue
|
|
}
|
|
item := domain.WorkItem{
|
|
ID: j.ID,
|
|
Type: j.Type,
|
|
CertificateID: j.CertificateID,
|
|
TargetID: j.TargetID,
|
|
Status: j.Status,
|
|
}
|
|
|
|
// Enrich with target details for deployment jobs
|
|
if j.TargetID != nil && *j.TargetID != "" {
|
|
target, err := s.targetRepo.Get(ctx, *j.TargetID)
|
|
if err == nil && target != nil {
|
|
item.TargetType = string(target.Type)
|
|
item.TargetConfig = target.Config
|
|
}
|
|
}
|
|
|
|
// Enrich with certificate details for AwaitingCSR jobs (agent needs CN + SANs for CSR)
|
|
if j.Status == domain.JobStatusAwaitingCSR {
|
|
cert, err := s.certRepo.Get(ctx, j.CertificateID)
|
|
if err == nil && cert != nil {
|
|
item.CommonName = cert.CommonName
|
|
item.SANs = cert.SANs
|
|
}
|
|
}
|
|
|
|
items = append(items, item)
|
|
}
|
|
|
|
return items, nil
|
|
}
|
|
|
|
// UpdateJobStatus reports a job's status from an agent (handler interface method).
|
|
func (s *AgentService) UpdateJobStatus(ctx context.Context, agentID string, jobID string, status string, errMsg string) error {
|
|
return s.ReportJobStatus(ctx, agentID, jobID, domain.JobStatus(status), errMsg)
|
|
}
|
|
|
|
// CertificatePickup retrieves a certificate for an agent (handler interface method).
|
|
func (s *AgentService) CertificatePickup(ctx context.Context, agentID, certID string) (string, error) {
|
|
certPEM, err := s.GetCertificateForAgent(ctx, agentID, certID)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return string(certPEM), nil
|
|
}
|
|
|
|
// generateAPIKey creates a cryptographically secure random API key for an agent.
|
|
// It fills a 32-byte buffer from crypto/rand (256 bits of entropy) and encodes it with
|
|
// base64.RawURLEncoding, yielding a 43-character URL-safe, unpadded ASCII string.
|
|
// The plaintext key is shown to the caller exactly once; only its SHA-256 hash is stored.
|
|
// Fixes C-1 (CWE-338: previously used math/rand, which is not cryptographically secure).
|
|
func generateAPIKey() (string, error) {
|
|
b := make([]byte, 32)
|
|
if _, err := rand.Read(b); err != nil {
|
|
return "", fmt.Errorf("generate agent api key: %w", err)
|
|
}
|
|
return base64.RawURLEncoding.EncodeToString(b), nil
|
|
}
|
|
|
|
// hashAPIKey hashes an API key using SHA256.
|
|
func hashAPIKey(apiKey string) string {
|
|
hash := sha256.Sum256([]byte(apiKey))
|
|
return hex.EncodeToString(hash[:])
|
|
}
|