diff --git a/cmd/server/main.go b/cmd/server/main.go index 1d6ffd9..e2ea75d 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -226,6 +226,7 @@ func main() { certificateService.SetCAOperationsSvc(caOperationsSvc) certificateService.SetTargetRepo(targetRepo) renewalService := service.NewRenewalService(certificateRepo, jobRepo, renewalPolicyRepo, profileRepo, auditService, notificationService, issuerRegistry, cfg.Keygen.Mode) + renewalService.SetTargetRepo(targetRepo) deploymentService := service.NewDeploymentService(jobRepo, targetRepo, agentRepo, certificateRepo, auditService, notificationService) jobService := service.NewJobService(jobRepo, renewalService, deploymentService, logger) agentService := service.NewAgentService(agentRepo, certificateRepo, jobRepo, targetRepo, auditService, issuerRegistry, renewalService) diff --git a/docs/architecture.md b/docs/architecture.md index 610810c..3e80206 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -45,7 +45,7 @@ New to certificates? Read the [Concepts Guide](concepts.md) first. ### Design Principles 1. **Private Key Isolation** — Agents generate ECDSA P-256 keys locally and submit CSRs only. Private keys never touch the control plane. Server-side keygen available via `CERTCTL_KEYGEN_MODE=server` for demo only. -2. **Pull-Only Deployment** — The server never initiates outbound connections to agents or targets. Agents poll for work. For network appliances and agentless targets, a proxy agent in the same network zone executes deployments via the target's API. This keeps the control plane firewalled off and limits credential scope to the proxy agent's zone. +2. **Pull-Only Deployment** — The server never initiates outbound connections to agents or targets. Agents poll for work and receive only jobs assigned to their targets (routed via `agent_id` on jobs or through target→agent relationships). For network appliances and agentless targets, a proxy agent in the same network zone executes deployments via the target's API. This keeps the control plane firewalled off and limits credential scope to the proxy agent's zone. 3. **Sub-CA Capable** — The Local CA can operate as a subordinate CA under an enterprise root (e.g., ADCS). Load a pre-signed CA cert+key from disk and all issued certs chain to the enterprise trust hierarchy. Self-signed mode remains the default for development/demos. 4. **GUI as Primary Interface** — The web dashboard is the operational control plane, not a secondary viewer. Every backend feature ships with its corresponding GUI surface. 5. **Decoupled Operations** — Agents operate autonomously; the control plane coordinates but doesn't block agent function diff --git a/internal/domain/job.go b/internal/domain/job.go index 5f8d1fe..95cdd89 100644 --- a/internal/domain/job.go +++ b/internal/domain/job.go @@ -11,6 +11,7 @@ type Job struct { Type JobType `json:"type"` CertificateID string `json:"certificate_id"` TargetID *string `json:"target_id,omitempty"` + AgentID *string `json:"agent_id,omitempty"` Status JobStatus `json:"status"` Attempts int `json:"attempts"` MaxAttempts int `json:"max_attempts"` diff --git a/internal/integration/lifecycle_test.go b/internal/integration/lifecycle_test.go index 72fcefd..2464443 100644 --- a/internal/integration/lifecycle_test.go +++ b/internal/integration/lifecycle_test.go @@ -662,6 +662,20 @@ func (m *mockJobRepository) GetPendingJobs(ctx context.Context, jobType domain.J return jobs, nil } +func (m *mockJobRepository) ListPendingByAgentID(ctx context.Context, agentID string) ([]*domain.Job, error) { + var result []*domain.Job + for _, j := range m.jobs { + if j.AgentID != nil && *j.AgentID == agentID { + if j.Status == domain.JobStatusPending && j.Type == domain.JobTypeDeployment { + result = append(result, j) + } else if j.Status == domain.JobStatusAwaitingCSR { + result = append(result, j) + } + } + } + return result, nil +} + type mockAuditRepository struct { events []*domain.AuditEvent } diff --git a/internal/repository/interfaces.go b/internal/repository/interfaces.go index 44c9151..3428d1e 100644 --- a/internal/repository/interfaces.go +++ b/internal/repository/interfaces.go @@ -111,6 +111,8 @@ type JobRepository interface { UpdateStatus(ctx context.Context, id string, status domain.JobStatus, errMsg string) error // GetPendingJobs returns jobs not yet processed of a specific type. GetPendingJobs(ctx context.Context, jobType domain.JobType) ([]*domain.Job, error) + // ListPendingByAgentID returns pending deployment jobs and AwaitingCSR jobs for a specific agent. + ListPendingByAgentID(ctx context.Context, agentID string) ([]*domain.Job, error) } // RenewalPolicyRepository defines operations for managing renewal policies. diff --git a/internal/repository/postgres/job.go b/internal/repository/postgres/job.go index f6980b4..cd7524d 100644 --- a/internal/repository/postgres/job.go +++ b/internal/repository/postgres/job.go @@ -22,7 +22,7 @@ func NewJobRepository(db *sql.DB) *JobRepository { // List returns all jobs func (r *JobRepository) List(ctx context.Context) ([]*domain.Job, error) { rows, err := r.db.QueryContext(ctx, ` - SELECT id, type, certificate_id, target_id, status, attempts, max_attempts, + SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts, last_error, scheduled_at, started_at, completed_at, created_at FROM jobs ORDER BY created_at DESC @@ -52,7 +52,7 @@ func (r *JobRepository) List(ctx context.Context) ([]*domain.Job, error) { // Get retrieves a job by ID func (r *JobRepository) Get(ctx context.Context, id string) (*domain.Job, error) { row := r.db.QueryRowContext(ctx, ` - SELECT id, type, certificate_id, target_id, status, attempts, max_attempts, + SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts, last_error, scheduled_at, started_at, completed_at, created_at FROM jobs WHERE id = $1 @@ -77,11 +77,11 @@ func (r *JobRepository) Create(ctx context.Context, job *domain.Job) error { err := r.db.QueryRowContext(ctx, ` INSERT INTO jobs ( - id, type, certificate_id, target_id, status, attempts, max_attempts, + id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts, last_error, scheduled_at, started_at, completed_at, created_at - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) RETURNING id - `, job.ID, job.Type, job.CertificateID, job.TargetID, job.Status, job.Attempts, + `, job.ID, job.Type, job.CertificateID, job.TargetID, job.AgentID, job.Status, job.Attempts, job.MaxAttempts, job.LastError, job.ScheduledAt, job.StartedAt, job.CompletedAt, job.CreatedAt).Scan(&job.ID) @@ -99,15 +99,16 @@ func (r *JobRepository) Update(ctx context.Context, job *domain.Job) error { type = $1, certificate_id = $2, target_id = $3, - status = $4, - attempts = $5, - max_attempts = $6, - last_error = $7, - scheduled_at = $8, - started_at = $9, - completed_at = $10 - WHERE id = $11 - `, job.Type, job.CertificateID, job.TargetID, job.Status, job.Attempts, + agent_id = $4, + status = $5, + attempts = $6, + max_attempts = $7, + last_error = $8, + scheduled_at = $9, + started_at = $10, + completed_at = $11 + WHERE id = $12 + `, job.Type, job.CertificateID, job.TargetID, job.AgentID, job.Status, job.Attempts, job.MaxAttempts, job.LastError, job.ScheduledAt, job.StartedAt, job.CompletedAt, job.ID) @@ -150,7 +151,7 @@ func (r *JobRepository) Delete(ctx context.Context, id string) error { // ListByStatus returns jobs with a specific status func (r *JobRepository) ListByStatus(ctx context.Context, status domain.JobStatus) ([]*domain.Job, error) { rows, err := r.db.QueryContext(ctx, ` - SELECT id, type, certificate_id, target_id, status, attempts, max_attempts, + SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts, last_error, scheduled_at, started_at, completed_at, created_at FROM jobs WHERE status = $1 @@ -181,7 +182,7 @@ func (r *JobRepository) ListByStatus(ctx context.Context, status domain.JobStatu // ListByCertificate returns all jobs for a certificate func (r *JobRepository) ListByCertificate(ctx context.Context, certID string) ([]*domain.Job, error) { rows, err := r.db.QueryContext(ctx, ` - SELECT id, type, certificate_id, target_id, status, attempts, max_attempts, + SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts, last_error, scheduled_at, started_at, completed_at, created_at FROM jobs WHERE certificate_id = $1 @@ -239,7 +240,7 @@ func (r *JobRepository) UpdateStatus(ctx context.Context, id string, status doma // GetPendingJobs returns jobs not yet processed of a specific type func (r *JobRepository) GetPendingJobs(ctx context.Context, jobType domain.JobType) ([]*domain.Job, error) { rows, err := r.db.QueryContext(ctx, ` - SELECT id, type, certificate_id, target_id, status, attempts, max_attempts, + SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts, last_error, scheduled_at, started_at, completed_at, created_at FROM jobs WHERE type = $1 AND status = $2 @@ -267,13 +268,71 @@ func (r *JobRepository) GetPendingJobs(ctx context.Context, jobType domain.JobTy return jobs, nil } +// ListPendingByAgentID returns pending deployment jobs and AwaitingCSR jobs for a specific agent. +// Deployment jobs are matched by agent_id directly (set at creation time), with a fallback +// for legacy jobs where agent_id is NULL but target_id resolves to the agent via deployment_targets. +// AwaitingCSR jobs are matched through certificate → target mappings → agent ownership. +func (r *JobRepository) ListPendingByAgentID(ctx context.Context, agentID string) ([]*domain.Job, error) { + rows, err := r.db.QueryContext(ctx, ` + SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts, + last_error, scheduled_at, started_at, completed_at, created_at + FROM jobs + WHERE agent_id = $1 AND status = 'Pending' AND type = 'Deployment' + + UNION ALL + + SELECT j.id, j.type, j.certificate_id, j.target_id, j.agent_id, j.status, j.attempts, j.max_attempts, + j.last_error, j.scheduled_at, j.started_at, j.completed_at, j.created_at + FROM jobs j + INNER JOIN deployment_targets dt ON j.target_id = dt.id + WHERE j.agent_id IS NULL AND j.status = 'Pending' AND j.type = 'Deployment' + AND dt.agent_id = $1 + + UNION ALL + + SELECT j.id, j.type, j.certificate_id, j.target_id, j.agent_id, j.status, j.attempts, j.max_attempts, + j.last_error, j.scheduled_at, j.started_at, j.completed_at, j.created_at + FROM jobs j + WHERE j.status = 'AwaitingCSR' + AND j.type IN ('Renewal', 'Issuance') + AND EXISTS ( + SELECT 1 FROM certificate_target_mappings ctm + INNER JOIN deployment_targets dt ON ctm.target_id = dt.id + WHERE ctm.certificate_id = j.certificate_id + AND dt.agent_id = $1 + ) + + ORDER BY created_at ASC + `, agentID) + + if err != nil { + return nil, fmt.Errorf("failed to query pending jobs for agent: %w", err) + } + defer rows.Close() + + var jobs []*domain.Job + for rows.Next() { + job, err := scanJob(rows) + if err != nil { + return nil, err + } + jobs = append(jobs, job) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating pending agent job rows: %w", err) + } + + return jobs, nil +} + // scanJob scans a job from a row or rows func scanJob(scanner interface { Scan(...interface{}) error }) (*domain.Job, error) { var job domain.Job err := scanner.Scan(&job.ID, &job.Type, &job.CertificateID, &job.TargetID, - &job.Status, &job.Attempts, &job.MaxAttempts, &job.LastError, + &job.AgentID, &job.Status, &job.Attempts, &job.MaxAttempts, &job.LastError, &job.ScheduledAt, &job.StartedAt, &job.CompletedAt, &job.CreatedAt) if err != nil { diff --git a/internal/service/agent.go b/internal/service/agent.go index 9a64482..e18c60a 100644 --- a/internal/service/agent.go +++ b/internal/service/agent.go @@ -251,38 +251,17 @@ func (s *AgentService) GetCertificateForAgent(ctx context.Context, agentID strin // GetPendingWork returns actionable jobs for an agent: deployment jobs (Pending) and // renewal/issuance jobs awaiting CSR submission (AwaitingCSR). +// Jobs are scoped to the requesting agent via agent_id (set at job creation) or +// through target→agent relationships for legacy jobs and AwaitingCSR routing. func (s *AgentService) GetPendingWork(ctx context.Context, agentID string) ([]*domain.Job, error) { - // Fetch agent to verify it exists + // Verify agent exists _, err := s.agentRepo.Get(ctx, agentID) if err != nil { return nil, fmt.Errorf("failed to fetch agent: %w", err) } - var workForAgent []*domain.Job - - // Get pending deployment jobs - pendingJobs, err := s.jobRepo.ListByStatus(ctx, domain.JobStatusPending) - if err != nil { - return nil, fmt.Errorf("failed to list pending jobs: %w", err) - } - for _, job := range pendingJobs { - if job.Type == domain.JobTypeDeployment { - workForAgent = append(workForAgent, job) - } - } - - // Get AwaitingCSR jobs (agent keygen mode — agent needs to generate key + submit CSR) - awaitingJobs, err := s.jobRepo.ListByStatus(ctx, domain.JobStatusAwaitingCSR) - if err != nil { - return nil, fmt.Errorf("failed to list awaiting CSR jobs: %w", err) - } - for _, job := range awaitingJobs { - if job.Type == domain.JobTypeRenewal || job.Type == domain.JobTypeIssuance { - workForAgent = append(workForAgent, job) - } - } - - return workForAgent, nil + // Return only jobs assigned to this agent (via agent_id or target→agent relationship) + return s.jobRepo.ListPendingByAgentID(ctx, agentID) } // ReportJobStatus updates a job's status based on agent feedback. diff --git a/internal/service/agent_test.go b/internal/service/agent_test.go index 789b0a0..36553b5 100644 --- a/internal/service/agent_test.go +++ b/internal/service/agent_test.go @@ -131,8 +131,9 @@ func TestHeartbeat_NotFound(t *testing.T) { func TestGetPendingWork(t *testing.T) { ctx := context.Background() now := time.Now() + agentID := "agent-001" agent := &domain.Agent{ - ID: "agent-001", + ID: agentID, Name: "prod-agent", Hostname: "server-01", Status: domain.AgentStatusOnline, @@ -146,6 +147,7 @@ func TestGetPendingWork(t *testing.T) { Type: domain.JobTypeDeployment, CertificateID: "cert-001", Status: domain.JobStatusPending, + AgentID: &agentID, CreatedAt: now, } job2 := &domain.Job{ @@ -157,7 +159,7 @@ func TestGetPendingWork(t *testing.T) { } agentRepo := &mockAgentRepo{ - Agents: map[string]*domain.Agent{"agent-001": agent}, + Agents: map[string]*domain.Agent{agentID: agent}, HeartbeatUpdates: make(map[string]time.Time), } certRepo := &mockCertRepo{ @@ -177,7 +179,7 @@ func TestGetPendingWork(t *testing.T) { agentService := NewAgentService(agentRepo, certRepo, jobRepo, targetRepo, auditService, issuerRegistry, nil) - jobs, err := agentService.GetPendingWork(ctx, "agent-001") + jobs, err := agentService.GetPendingWork(ctx, agentID) if err != nil { t.Fatalf("GetPendingWork failed: %v", err) } @@ -185,11 +187,132 @@ func TestGetPendingWork(t *testing.T) { if len(jobs) != 1 { t.Errorf("expected 1 deployment job, got %d", len(jobs)) } - if jobs[0].Type != domain.JobTypeDeployment { + if len(jobs) > 0 && jobs[0].Type != domain.JobTypeDeployment { t.Errorf("expected JobTypeDeployment, got %s", jobs[0].Type) } } +func TestGetPendingWork_OnlyReturnsAgentJobs(t *testing.T) { + ctx := context.Background() + now := time.Now() + agentA := "agent-A" + agentB := "agent-B" + + agentRepo := &mockAgentRepo{ + Agents: map[string]*domain.Agent{ + agentA: {ID: agentA, Name: "agent-A", Hostname: "host-a", Status: domain.AgentStatusOnline, RegisteredAt: now, APIKeyHash: "hashA"}, + agentB: {ID: agentB, Name: "agent-B", Hostname: "host-b", Status: domain.AgentStatusOnline, RegisteredAt: now, APIKeyHash: "hashB"}, + }, + HeartbeatUpdates: make(map[string]time.Time), + } + + jobA := &domain.Job{ID: "job-A", Type: domain.JobTypeDeployment, CertificateID: "cert-001", Status: domain.JobStatusPending, AgentID: &agentA, CreatedAt: now} + jobB := &domain.Job{ID: "job-B", Type: domain.JobTypeDeployment, CertificateID: "cert-002", Status: domain.JobStatusPending, AgentID: &agentB, CreatedAt: now} + + jobRepo := &mockJobRepo{ + Jobs: map[string]*domain.Job{"job-A": jobA, "job-B": jobB}, + StatusUpdates: make(map[string]domain.JobStatus), + } + certRepo := &mockCertRepo{Certs: make(map[string]*domain.ManagedCertificate), Versions: make(map[string][]*domain.CertificateVersion)} + targetRepo := &mockTargetRepo{Targets: make(map[string]*domain.DeploymentTarget)} + auditService := NewAuditService(&mockAuditRepo{}) + + agentService := NewAgentService(agentRepo, certRepo, jobRepo, targetRepo, auditService, make(map[string]IssuerConnector), nil) + + // Agent A should only see its job + jobsA, err := agentService.GetPendingWork(ctx, agentA) + if err != nil { + t.Fatalf("GetPendingWork for agent-A failed: %v", err) + } + if len(jobsA) != 1 { + t.Fatalf("expected 1 job for agent-A, got %d", len(jobsA)) + } + if jobsA[0].ID != "job-A" { + t.Errorf("expected job-A, got %s", jobsA[0].ID) + } + + // Agent B should only see its job + jobsB, err := agentService.GetPendingWork(ctx, agentB) + if err != nil { + t.Fatalf("GetPendingWork for agent-B failed: %v", err) + } + if len(jobsB) != 1 { + t.Fatalf("expected 1 job for agent-B, got %d", len(jobsB)) + } + if jobsB[0].ID != "job-B" { + t.Errorf("expected job-B, got %s", jobsB[0].ID) + } +} + +func TestGetPendingWork_EmptyWhenNoJobsForAgent(t *testing.T) { + ctx := context.Background() + now := time.Now() + agentA := "agent-A" + agentB := "agent-B" + + agentRepo := &mockAgentRepo{ + Agents: map[string]*domain.Agent{ + agentA: {ID: agentA, Name: "agent-A", Hostname: "host-a", Status: domain.AgentStatusOnline, RegisteredAt: now, APIKeyHash: "hashA"}, + }, + HeartbeatUpdates: make(map[string]time.Time), + } + + // All jobs belong to agent-B + jobB := &domain.Job{ID: "job-B", Type: domain.JobTypeDeployment, CertificateID: "cert-001", Status: domain.JobStatusPending, AgentID: &agentB, CreatedAt: now} + + jobRepo := &mockJobRepo{ + Jobs: map[string]*domain.Job{"job-B": jobB}, + StatusUpdates: make(map[string]domain.JobStatus), + } + certRepo := &mockCertRepo{Certs: make(map[string]*domain.ManagedCertificate), Versions: make(map[string][]*domain.CertificateVersion)} + targetRepo := &mockTargetRepo{Targets: make(map[string]*domain.DeploymentTarget)} + auditService := NewAuditService(&mockAuditRepo{}) + + agentService := NewAgentService(agentRepo, certRepo, jobRepo, targetRepo, auditService, make(map[string]IssuerConnector), nil) + + jobs, err := agentService.GetPendingWork(ctx, agentA) + if err != nil { + t.Fatalf("GetPendingWork failed: %v", err) + } + if len(jobs) != 0 { + t.Errorf("expected 0 jobs for agent-A (all jobs are for agent-B), got %d", len(jobs)) + } +} + +func TestGetPendingWork_DeploymentAndCSR_Scoped(t *testing.T) { + ctx := context.Background() + now := time.Now() + agentA := "agent-A" + + agentRepo := &mockAgentRepo{ + Agents: map[string]*domain.Agent{ + agentA: {ID: agentA, Name: "agent-A", Hostname: "host-a", Status: domain.AgentStatusOnline, RegisteredAt: now, APIKeyHash: "hashA"}, + }, + HeartbeatUpdates: make(map[string]time.Time), + } + + deployJob := &domain.Job{ID: "job-deploy", Type: domain.JobTypeDeployment, CertificateID: "cert-001", Status: domain.JobStatusPending, AgentID: &agentA, CreatedAt: now} + csrJob := &domain.Job{ID: "job-csr", Type: domain.JobTypeRenewal, CertificateID: "cert-002", Status: domain.JobStatusAwaitingCSR, AgentID: &agentA, CreatedAt: now} + + jobRepo := &mockJobRepo{ + Jobs: map[string]*domain.Job{"job-deploy": deployJob, "job-csr": csrJob}, + StatusUpdates: make(map[string]domain.JobStatus), + } + certRepo := &mockCertRepo{Certs: make(map[string]*domain.ManagedCertificate), Versions: make(map[string][]*domain.CertificateVersion)} + targetRepo := &mockTargetRepo{Targets: make(map[string]*domain.DeploymentTarget)} + auditService := NewAuditService(&mockAuditRepo{}) + + agentService := NewAgentService(agentRepo, certRepo, jobRepo, targetRepo, auditService, make(map[string]IssuerConnector), nil) + + jobs, err := agentService.GetPendingWork(ctx, agentA) + if err != nil { + t.Fatalf("GetPendingWork failed: %v", err) + } + if len(jobs) != 2 { + t.Fatalf("expected 2 jobs (deployment + AwaitingCSR), got %d", len(jobs)) + } +} + func TestReportJobStatus(t *testing.T) { ctx := context.Background() now := time.Now() diff --git a/internal/service/deployment.go b/internal/service/deployment.go index 0b5c1af..05fcb68 100644 --- a/internal/service/deployment.go +++ b/internal/service/deployment.go @@ -67,6 +67,11 @@ func (s *DeploymentService) CreateDeploymentJobs(ctx context.Context, certID str if target.ID != "" { job.TargetID = &target.ID } + // Route job to the target's assigned agent + if target.AgentID != "" { + agentID := target.AgentID + job.AgentID = &agentID + } if err := s.jobRepo.Create(ctx, job); err != nil { slog.Error("failed to create deployment job for target", "target_id", target.ID, "error", err) diff --git a/internal/service/deployment_test.go b/internal/service/deployment_test.go index bafdfa6..bb34a68 100644 --- a/internal/service/deployment_test.go +++ b/internal/service/deployment_test.go @@ -85,6 +85,45 @@ func TestDeploymentService_CreateDeploymentJobs_Success(t *testing.T) { if job.TargetID == nil || len(*job.TargetID) == 0 { t.Errorf("expected job to have TargetID set") } + + // M31: Verify AgentID is set from target's agent assignment + if job.AgentID == nil { + t.Errorf("expected job to have AgentID set (M31 agent routing)") + } + } +} + +// TestDeploymentService_CreateDeploymentJobs_SetsAgentID verifies AgentID is populated from target. +func TestDeploymentService_CreateDeploymentJobs_SetsAgentID(t *testing.T) { + ctx := context.Background() + svc, jobRepo, targetRepo, _, _, _, _ := newTestDeploymentService() + + target := &domain.DeploymentTarget{ + ID: "tgt-nginx-1", + Name: "NGINX Server 1", + Type: domain.TargetTypeNGINX, + AgentID: "agent-web-01", + Enabled: true, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + targetRepo.AddTarget(target) + + jobIDs, err := svc.CreateDeploymentJobs(ctx, "mc-cert-1") + if err != nil { + t.Fatalf("CreateDeploymentJobs failed: %v", err) + } + + if len(jobIDs) != 1 { + t.Fatalf("expected 1 job, got %d", len(jobIDs)) + } + + job := jobRepo.Jobs[jobIDs[0]] + if job.AgentID == nil { + t.Fatal("expected AgentID to be set on deployment job") + } + if *job.AgentID != "agent-web-01" { + t.Errorf("expected AgentID 'agent-web-01', got '%s'", *job.AgentID) } } diff --git a/internal/service/renewal.go b/internal/service/renewal.go index 442acd7..7112e0f 100644 --- a/internal/service/renewal.go +++ b/internal/service/renewal.go @@ -26,12 +26,18 @@ type RenewalService struct { jobRepo repository.JobRepository renewalPolicyRepo repository.RenewalPolicyRepository profileRepo repository.CertificateProfileRepository + targetRepo repository.TargetRepository auditService *AuditService notificationSvc *NotificationService issuerRegistry map[string]IssuerConnector keygenMode string // "agent" (default) or "server" (demo only) } +// SetTargetRepo sets the target repository for resolving agent_id on deployment jobs. +func (s *RenewalService) SetTargetRepo(repo repository.TargetRepository) { + s.targetRepo = repo +} + // IssuerConnector defines the service-layer interface for interacting with certificate issuers. // This is distinct from the connector-layer issuer.Connector interface to maintain dependency // inversion. Use IssuerConnectorAdapter to bridge between the two. @@ -636,12 +642,26 @@ func (s *RenewalService) createDeploymentJobs(ctx context.Context, cert *domain. } for _, targetID := range cert.TargetIDs { tid := targetID + + // Resolve agent_id from target for job routing + var agentIDPtr *string + if s.targetRepo != nil { + target, err := s.targetRepo.Get(ctx, tid) + if err != nil { + slog.Warn("failed to resolve agent for deployment job", "target_id", tid, "error", err) + } else if target.AgentID != "" { + agentID := target.AgentID + agentIDPtr = &agentID + } + } + deployJob := &domain.Job{ ID: generateID("job"), CertificateID: cert.ID, Type: domain.JobTypeDeployment, Status: domain.JobStatusPending, TargetID: &tid, + AgentID: agentIDPtr, MaxAttempts: 3, ScheduledAt: time.Now(), CreatedAt: time.Now(), diff --git a/internal/service/testutil_test.go b/internal/service/testutil_test.go index 7343b32..9cdae46 100644 --- a/internal/service/testutil_test.go +++ b/internal/service/testutil_test.go @@ -243,6 +243,25 @@ func (m *mockJobRepo) GetPendingJobs(ctx context.Context, jobType domain.JobType return jobs, nil } +func (m *mockJobRepo) ListPendingByAgentID(ctx context.Context, agentID string) ([]*domain.Job, error) { + m.mu.Lock() + defer m.mu.Unlock() + if m.ListErr != nil { + return nil, m.ListErr + } + var result []*domain.Job + for _, j := range m.Jobs { + if j.AgentID != nil && *j.AgentID == agentID { + if j.Status == domain.JobStatusPending && j.Type == domain.JobTypeDeployment { + result = append(result, j) + } else if j.Status == domain.JobStatusAwaitingCSR { + result = append(result, j) + } + } + } + return result, nil +} + func (m *mockJobRepo) AddJob(job *domain.Job) { m.mu.Lock() defer m.mu.Unlock() diff --git a/internal/service/verification_test.go b/internal/service/verification_test.go index 2240f20..e5c4ee8 100644 --- a/internal/service/verification_test.go +++ b/internal/service/verification_test.go @@ -65,6 +65,10 @@ func (m *mockVerificationJobRepo) GetPendingJobs(ctx context.Context, jobType do return nil, nil } +func (m *mockVerificationJobRepo) ListPendingByAgentID(ctx context.Context, agentID string) ([]*domain.Job, error) { + return nil, nil +} + // newVerificationTestService creates a VerificationService wired with test doubles. func newVerificationTestService(jobs map[string]*domain.Job, jobRepoErr error) (*VerificationService, *mockVerificationJobRepo, *mockAuditRepo) { jobRepo := &mockVerificationJobRepo{jobs: jobs, err: jobRepoErr} diff --git a/web/src/api/types.ts b/web/src/api/types.ts index b7f85ee..d4b6673 100644 --- a/web/src/api/types.ts +++ b/web/src/api/types.ts @@ -70,6 +70,8 @@ export interface Job { id: string; certificate_id: string; type: string; + target_id?: string; + agent_id?: string; status: string; attempts: number; max_attempts: number;