Files
shankar0123 21aeed4f4e legal: addlicense headers + normalize legacy variants (Phase 0 RED-4)
Phase 0 closure (Path B2, post-rewrite):

addlicense sweep — adds the canonical certctl LLC copyright + BUSL-1.1
SPDX header to every production Go file. Template:

  // Copyright 2026 certctl LLC. All rights reserved.
  // SPDX-License-Identifier: BUSL-1.1

Coverage: 338 / 338 production Go files (cmd/ + internal/, excluding
*_test.go and **/testdata/**). Pre-sweep coverage was 22 / 338 (6.5%);
post-sweep is 338 / 338 (100%).

Normalized 22 pre-existing legacy headers (`// Copyright (c) certctl`
+ `// SPDX-License-Identifier: BSL-1.1`) and 1 file using a
`Certctl Contributors` attribution. The legacy SPDX ID `BSL-1.1`
is non-standard; the official SPDX identifier for Business Source
License 1.1 is `BUSL-1.1` (capital U). All 338 files now share the
canonical form.

Generated via:
  addlicense -c "certctl LLC" -y 2026 \
    -f cowork/legal/copyright-header.tpl \
    -ignore '**/testdata/**' -ignore '**/*_test.go' \
    cmd/ internal/

Verification:
  find cmd internal -name '*.go' -not -name '*_test.go' \
    -not -path '*/testdata/*' \
    -exec grep -L '^// Copyright 2026 certctl LLC' {} \; | wc -l

  Returns: 0

gofmt clean. Header additions are comments only, no compile impact.

Closes: cowork/certctl-architecture-diligence-audit.html#fix-RED-4
2026-05-13 21:23:35 +00:00

340 lines
12 KiB
Go

// Copyright 2026 certctl LLC. All rights reserved.
// SPDX-License-Identifier: BUSL-1.1
package service
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/certctl-io/certctl/internal/domain"
"github.com/certctl-io/certctl/internal/repository"
)
// DeploymentService manages certificate deployment to targets via agents.
type DeploymentService struct {
jobRepo repository.JobRepository
targetRepo repository.TargetRepository
agentRepo repository.AgentRepository
certRepo repository.CertificateRepository
auditService *AuditService
notificationSvc *NotificationService
}
// NewDeploymentService creates a new deployment service.
func NewDeploymentService(
jobRepo repository.JobRepository,
targetRepo repository.TargetRepository,
agentRepo repository.AgentRepository,
certRepo repository.CertificateRepository,
auditService *AuditService,
notificationSvc *NotificationService,
) *DeploymentService {
return &DeploymentService{
jobRepo: jobRepo,
targetRepo: targetRepo,
agentRepo: agentRepo,
certRepo: certRepo,
auditService: auditService,
notificationSvc: notificationSvc,
}
}
// CreateDeploymentJobs creates a job for each target of a certificate.
func (s *DeploymentService) CreateDeploymentJobs(ctx context.Context, certID string) ([]string, error) {
// Fetch all targets for this certificate
targets, err := s.targetRepo.ListByCertificate(ctx, certID)
if err != nil {
return nil, fmt.Errorf("failed to list targets: %w", err)
}
if len(targets) == 0 {
return nil, fmt.Errorf("no targets found for certificate %s", certID)
}
var jobIDs []string
// Create a deployment job for each target
for _, target := range targets {
job := &domain.Job{
ID: generateID("job"),
CertificateID: certID,
Type: domain.JobTypeDeployment,
Status: domain.JobStatusPending,
ScheduledAt: time.Now(),
CreatedAt: time.Now(),
}
// Store target info in TargetID field
if target.ID != "" {
job.TargetID = &target.ID
}
// Route job to the target's assigned agent
if target.AgentID != "" {
agentID := target.AgentID
job.AgentID = &agentID
}
if err := s.jobRepo.Create(ctx, job); err != nil {
slog.Error("failed to create deployment job for target", "target_id", target.ID, "error", err)
continue
}
jobIDs = append(jobIDs, job.ID)
}
if len(jobIDs) == 0 {
return nil, fmt.Errorf("failed to create any deployment jobs")
}
// Record audit event
if auditErr := s.auditService.RecordEvent(ctx, "system", domain.ActorTypeSystem,
"deployment_jobs_created", "certificate", certID,
map[string]interface{}{"target_count": len(targets), "job_count": len(jobIDs)}); auditErr != nil {
slog.Error("failed to record audit event", "error", auditErr)
}
return jobIDs, nil
}
// ProcessDeploymentJob handles a deployment job by coordinating with an agent.
func (s *DeploymentService) ProcessDeploymentJob(ctx context.Context, job *domain.Job) error {
// Update job status to in-progress
if err := s.jobRepo.UpdateStatus(ctx, job.ID, domain.JobStatusRunning, ""); err != nil {
return fmt.Errorf("failed to update job status: %w", err)
}
// Fetch certificate
cert, err := s.certRepo.Get(ctx, job.CertificateID)
if err != nil {
updateErr := s.jobRepo.UpdateStatus(ctx, job.ID, domain.JobStatusFailed, fmt.Sprintf("certificate fetch failed: %v", err))
if updateErr != nil {
slog.Error("failed to update job status", "job_id", job.ID, "error", updateErr)
}
return fmt.Errorf("failed to fetch certificate: %w", err)
}
// Fetch target
var targetID string
if job.TargetID != nil {
targetID = *job.TargetID
}
if targetID == "" {
updateErr := s.jobRepo.UpdateStatus(ctx, job.ID, domain.JobStatusFailed, "target_id not found in job")
if updateErr != nil {
slog.Error("failed to update job status", "job_id", job.ID, "error", updateErr)
}
return fmt.Errorf("target_id not found in job")
}
target, err := s.targetRepo.Get(ctx, targetID)
if err != nil {
updateErr := s.jobRepo.UpdateStatus(ctx, job.ID, domain.JobStatusFailed, fmt.Sprintf("target fetch failed: %v", err))
if updateErr != nil {
slog.Error("failed to update job status", "job_id", job.ID, "error", updateErr)
}
return fmt.Errorf("failed to fetch target: %w", err)
}
// Verify agent is available
agentID := target.AgentID
agent, err := s.agentRepo.Get(ctx, agentID)
if err != nil {
updateErr := s.jobRepo.UpdateStatus(ctx, job.ID, domain.JobStatusFailed, fmt.Sprintf("agent fetch failed: %v", err))
if updateErr != nil {
slog.Error("failed to update job status", "job_id", job.ID, "error", updateErr)
}
return fmt.Errorf("failed to fetch agent: %w", err)
}
// I-004: AgentRepository.Get surfaces retired rows by design (for the GUI
// banner + 410 Gone heartbeat path). Deployments must never dispatch to a
// retired agent — it will never heartbeat again and the target row should
// itself have been cascade-retired when the agent was force-retired. A job
// slipping through here would otherwise hit the heartbeat-staleness branch
// below with the misleading reason "agent is offline"; we want operators to
// see the real cause. Fail the job with an explicit reason, send a
// deployment notification so the owner is alerted, and record an audit
// event. Falls through the same notify+audit shape as the offline branch.
if agent.IsRetired() {
updateErr := s.jobRepo.UpdateStatus(ctx, job.ID, domain.JobStatusFailed, "assigned agent is retired")
if updateErr != nil {
slog.Error("failed to update job status", "job_id", job.ID, "error", updateErr)
}
if notifErr := s.notificationSvc.SendDeploymentNotification(ctx, cert, target, false, fmt.Errorf("agent retired")); notifErr != nil {
slog.Error("failed to send deployment notification", "error", notifErr)
}
if auditErr := s.auditService.RecordEvent(ctx, "system", domain.ActorTypeSystem,
"deployment_job_failed", "certificate", job.CertificateID,
map[string]interface{}{"job_id": job.ID, "reason": "agent retired", "target_id": targetID, "agent_id": agentID}); auditErr != nil {
slog.Error("failed to record audit event", "error", auditErr)
}
return fmt.Errorf("agent %s is retired", agentID)
}
// Check agent heartbeat (must be within last 5 minutes)
if agent.LastHeartbeatAt != nil && time.Since(*agent.LastHeartbeatAt) > 5*time.Minute {
updateErr := s.jobRepo.UpdateStatus(ctx, job.ID, domain.JobStatusFailed, "agent is offline")
if updateErr != nil {
slog.Error("failed to update job status", "job_id", job.ID, "error", updateErr)
}
if notifErr := s.notificationSvc.SendDeploymentNotification(ctx, cert, target, false, fmt.Errorf("agent offline")); notifErr != nil {
slog.Error("failed to send deployment notification", "error", notifErr)
}
if auditErr := s.auditService.RecordEvent(ctx, "system", domain.ActorTypeSystem,
"deployment_job_failed", "certificate", job.CertificateID,
map[string]interface{}{"job_id": job.ID, "reason": "agent offline", "target_id": targetID}); auditErr != nil {
slog.Error("failed to record audit event", "error", auditErr)
}
return fmt.Errorf("agent %s is offline", agentID)
}
// In a real implementation, the agent would poll GetPendingWork() to fetch this job.
// The control plane would wait for the agent to complete the work asynchronously.
// For now, we mark it as pending and rely on agent polling.
// Record audit event
if auditErr := s.auditService.RecordEvent(ctx, "system", domain.ActorTypeSystem,
"deployment_job_dispatched", "certificate", job.CertificateID,
map[string]interface{}{"job_id": job.ID, "target_id": targetID, "agent_id": agentID}); auditErr != nil {
slog.Error("failed to record audit event", "error", auditErr)
}
return nil
}
// ValidateDeployment checks the deployment status of a certificate on a target.
func (s *DeploymentService) ValidateDeployment(ctx context.Context, certID string, targetID string) (bool, error) {
// List deployment jobs for this certificate and target
jobs, err := s.jobRepo.ListByCertificate(ctx, certID)
if err != nil {
return false, fmt.Errorf("failed to list jobs: %w", err)
}
for _, job := range jobs {
if job.Type != domain.JobTypeDeployment {
continue
}
// Check if this job is for the target
if job.TargetID == nil || *job.TargetID != targetID {
continue
}
// Check if the most recent job for this target succeeded
if job.Status == domain.JobStatusCompleted {
return true, nil
}
if job.Status == domain.JobStatusFailed {
if job.LastError != nil {
return false, fmt.Errorf("deployment failed: %s", *job.LastError)
}
return false, fmt.Errorf("deployment failed")
}
// Still in progress
return false, fmt.Errorf("deployment in progress")
}
// No deployment job found
return false, fmt.Errorf("no deployment job found for target %s", targetID)
}
// MarkDeploymentComplete marks a deployment job as successfully completed.
// This is called by agents after they finish deploying a certificate.
func (s *DeploymentService) MarkDeploymentComplete(ctx context.Context, jobID string) error {
job, err := s.jobRepo.Get(ctx, jobID)
if err != nil {
return fmt.Errorf("failed to fetch job: %w", err)
}
if err := s.jobRepo.UpdateStatus(ctx, jobID, domain.JobStatusCompleted, ""); err != nil {
return fmt.Errorf("failed to update job status: %w", err)
}
// Fetch certificate and target for notification
cert, err := s.certRepo.Get(ctx, job.CertificateID)
if err != nil {
slog.Error("failed to fetch certificate for notification", "error", err)
return nil
}
var targetID string
if job.TargetID != nil {
targetID = *job.TargetID
}
if targetID != "" {
target, err := s.targetRepo.Get(ctx, targetID)
if err != nil {
slog.Error("failed to fetch target for notification", "error", err)
return nil
}
// Send deployment success notification
if err := s.notificationSvc.SendDeploymentNotification(ctx, cert, target, true, nil); err != nil {
slog.Error("failed to send deployment notification", "error", err)
}
}
// Record audit event
if auditErr := s.auditService.RecordEvent(ctx, "system", domain.ActorTypeSystem,
"deployment_job_completed", "certificate", job.CertificateID,
map[string]interface{}{"job_id": jobID, "target_id": targetID}); auditErr != nil {
slog.Error("failed to record audit event", "error", auditErr)
}
return nil
}
// MarkDeploymentFailed marks a deployment job as failed.
// Called by agents when deployment fails.
func (s *DeploymentService) MarkDeploymentFailed(ctx context.Context, jobID string, errMsg string) error {
job, err := s.jobRepo.Get(ctx, jobID)
if err != nil {
return fmt.Errorf("failed to fetch job: %w", err)
}
if err := s.jobRepo.UpdateStatus(ctx, jobID, domain.JobStatusFailed, errMsg); err != nil {
return fmt.Errorf("failed to update job status: %w", err)
}
// Fetch certificate and target for notification
cert, err := s.certRepo.Get(ctx, job.CertificateID)
if err != nil {
slog.Error("failed to fetch certificate for notification", "error", err)
return nil
}
var targetID string
if job.TargetID != nil {
targetID = *job.TargetID
}
if targetID != "" {
target, err := s.targetRepo.Get(ctx, targetID)
if err != nil {
slog.Error("failed to fetch target for notification", "error", err)
return nil
}
// Send deployment failure notification
if err := s.notificationSvc.SendDeploymentNotification(ctx, cert, target, false, fmt.Errorf("%s", errMsg)); err != nil {
slog.Error("failed to send deployment notification", "error", err)
}
}
// Record audit event
if auditErr := s.auditService.RecordEvent(ctx, "system", domain.ActorTypeSystem,
"deployment_job_failed", "certificate", job.CertificateID,
map[string]interface{}{"job_id": jobID, "target_id": targetID, "error": errMsg}); auditErr != nil {
slog.Error("failed to record audit event", "error", auditErr)
}
return nil
}