mirror of
https://github.com/shankar0123/certctl.git
synced 2026-06-13 02:28:51 +00:00
feat(M31): agent work routing — scope jobs to assigned agents
Deployment jobs now set agent_id from target→agent relationship at creation time. GetPendingWork() uses ListPendingByAgentID() with a 3-way UNION query (direct match, legacy NULL fallback via target JOIN, AwaitingCSR via cert→target→agent chain) so each agent only receives its own jobs. - Added AgentID *string to Job domain struct - Added agent_id to all job SQL queries (5 SELECTs, INSERT, UPDATE, scanJob) - New ListPendingByAgentID() repository method - Rewrote GetPendingWork() from ~25 lines to single scoped query - 4 new Go tests (3 agent routing + 1 deployment agent_id) - Frontend: agent_id/target_id on Job type Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -226,6 +226,7 @@ func main() {
|
|||||||
certificateService.SetCAOperationsSvc(caOperationsSvc)
|
certificateService.SetCAOperationsSvc(caOperationsSvc)
|
||||||
certificateService.SetTargetRepo(targetRepo)
|
certificateService.SetTargetRepo(targetRepo)
|
||||||
renewalService := service.NewRenewalService(certificateRepo, jobRepo, renewalPolicyRepo, profileRepo, auditService, notificationService, issuerRegistry, cfg.Keygen.Mode)
|
renewalService := service.NewRenewalService(certificateRepo, jobRepo, renewalPolicyRepo, profileRepo, auditService, notificationService, issuerRegistry, cfg.Keygen.Mode)
|
||||||
|
renewalService.SetTargetRepo(targetRepo)
|
||||||
deploymentService := service.NewDeploymentService(jobRepo, targetRepo, agentRepo, certificateRepo, auditService, notificationService)
|
deploymentService := service.NewDeploymentService(jobRepo, targetRepo, agentRepo, certificateRepo, auditService, notificationService)
|
||||||
jobService := service.NewJobService(jobRepo, renewalService, deploymentService, logger)
|
jobService := service.NewJobService(jobRepo, renewalService, deploymentService, logger)
|
||||||
agentService := service.NewAgentService(agentRepo, certificateRepo, jobRepo, targetRepo, auditService, issuerRegistry, renewalService)
|
agentService := service.NewAgentService(agentRepo, certificateRepo, jobRepo, targetRepo, auditService, issuerRegistry, renewalService)
|
||||||
|
|||||||
@@ -45,7 +45,7 @@ New to certificates? Read the [Concepts Guide](concepts.md) first.
|
|||||||
### Design Principles
|
### Design Principles
|
||||||
|
|
||||||
1. **Private Key Isolation** — Agents generate ECDSA P-256 keys locally and submit CSRs only. Private keys never touch the control plane. Server-side keygen available via `CERTCTL_KEYGEN_MODE=server` for demo only.
|
1. **Private Key Isolation** — Agents generate ECDSA P-256 keys locally and submit CSRs only. Private keys never touch the control plane. Server-side keygen available via `CERTCTL_KEYGEN_MODE=server` for demo only.
|
||||||
2. **Pull-Only Deployment** — The server never initiates outbound connections to agents or targets. Agents poll for work. For network appliances and agentless targets, a proxy agent in the same network zone executes deployments via the target's API. This keeps the control plane firewalled off and limits credential scope to the proxy agent's zone.
|
2. **Pull-Only Deployment** — The server never initiates outbound connections to agents or targets. Agents poll for work and receive only jobs assigned to their targets (routed via `agent_id` on jobs or through target→agent relationships). For network appliances and agentless targets, a proxy agent in the same network zone executes deployments via the target's API. This keeps the control plane firewalled off and limits credential scope to the proxy agent's zone.
|
||||||
3. **Sub-CA Capable** — The Local CA can operate as a subordinate CA under an enterprise root (e.g., ADCS). Load a pre-signed CA cert+key from disk and all issued certs chain to the enterprise trust hierarchy. Self-signed mode remains the default for development/demos.
|
3. **Sub-CA Capable** — The Local CA can operate as a subordinate CA under an enterprise root (e.g., ADCS). Load a pre-signed CA cert+key from disk and all issued certs chain to the enterprise trust hierarchy. Self-signed mode remains the default for development/demos.
|
||||||
4. **GUI as Primary Interface** — The web dashboard is the operational control plane, not a secondary viewer. Every backend feature ships with its corresponding GUI surface.
|
4. **GUI as Primary Interface** — The web dashboard is the operational control plane, not a secondary viewer. Every backend feature ships with its corresponding GUI surface.
|
||||||
5. **Decoupled Operations** — Agents operate autonomously; the control plane coordinates but doesn't block agent function
|
5. **Decoupled Operations** — Agents operate autonomously; the control plane coordinates but doesn't block agent function
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ type Job struct {
|
|||||||
Type JobType `json:"type"`
|
Type JobType `json:"type"`
|
||||||
CertificateID string `json:"certificate_id"`
|
CertificateID string `json:"certificate_id"`
|
||||||
TargetID *string `json:"target_id,omitempty"`
|
TargetID *string `json:"target_id,omitempty"`
|
||||||
|
AgentID *string `json:"agent_id,omitempty"`
|
||||||
Status JobStatus `json:"status"`
|
Status JobStatus `json:"status"`
|
||||||
Attempts int `json:"attempts"`
|
Attempts int `json:"attempts"`
|
||||||
MaxAttempts int `json:"max_attempts"`
|
MaxAttempts int `json:"max_attempts"`
|
||||||
|
|||||||
@@ -662,6 +662,20 @@ func (m *mockJobRepository) GetPendingJobs(ctx context.Context, jobType domain.J
|
|||||||
return jobs, nil
|
return jobs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *mockJobRepository) ListPendingByAgentID(ctx context.Context, agentID string) ([]*domain.Job, error) {
|
||||||
|
var result []*domain.Job
|
||||||
|
for _, j := range m.jobs {
|
||||||
|
if j.AgentID != nil && *j.AgentID == agentID {
|
||||||
|
if j.Status == domain.JobStatusPending && j.Type == domain.JobTypeDeployment {
|
||||||
|
result = append(result, j)
|
||||||
|
} else if j.Status == domain.JobStatusAwaitingCSR {
|
||||||
|
result = append(result, j)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
type mockAuditRepository struct {
|
type mockAuditRepository struct {
|
||||||
events []*domain.AuditEvent
|
events []*domain.AuditEvent
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -111,6 +111,8 @@ type JobRepository interface {
|
|||||||
UpdateStatus(ctx context.Context, id string, status domain.JobStatus, errMsg string) error
|
UpdateStatus(ctx context.Context, id string, status domain.JobStatus, errMsg string) error
|
||||||
// GetPendingJobs returns jobs not yet processed of a specific type.
|
// GetPendingJobs returns jobs not yet processed of a specific type.
|
||||||
GetPendingJobs(ctx context.Context, jobType domain.JobType) ([]*domain.Job, error)
|
GetPendingJobs(ctx context.Context, jobType domain.JobType) ([]*domain.Job, error)
|
||||||
|
// ListPendingByAgentID returns pending deployment jobs and AwaitingCSR jobs for a specific agent.
|
||||||
|
ListPendingByAgentID(ctx context.Context, agentID string) ([]*domain.Job, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RenewalPolicyRepository defines operations for managing renewal policies.
|
// RenewalPolicyRepository defines operations for managing renewal policies.
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ func NewJobRepository(db *sql.DB) *JobRepository {
|
|||||||
// List returns all jobs
|
// List returns all jobs
|
||||||
func (r *JobRepository) List(ctx context.Context) ([]*domain.Job, error) {
|
func (r *JobRepository) List(ctx context.Context) ([]*domain.Job, error) {
|
||||||
rows, err := r.db.QueryContext(ctx, `
|
rows, err := r.db.QueryContext(ctx, `
|
||||||
SELECT id, type, certificate_id, target_id, status, attempts, max_attempts,
|
SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
|
||||||
last_error, scheduled_at, started_at, completed_at, created_at
|
last_error, scheduled_at, started_at, completed_at, created_at
|
||||||
FROM jobs
|
FROM jobs
|
||||||
ORDER BY created_at DESC
|
ORDER BY created_at DESC
|
||||||
@@ -52,7 +52,7 @@ func (r *JobRepository) List(ctx context.Context) ([]*domain.Job, error) {
|
|||||||
// Get retrieves a job by ID
|
// Get retrieves a job by ID
|
||||||
func (r *JobRepository) Get(ctx context.Context, id string) (*domain.Job, error) {
|
func (r *JobRepository) Get(ctx context.Context, id string) (*domain.Job, error) {
|
||||||
row := r.db.QueryRowContext(ctx, `
|
row := r.db.QueryRowContext(ctx, `
|
||||||
SELECT id, type, certificate_id, target_id, status, attempts, max_attempts,
|
SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
|
||||||
last_error, scheduled_at, started_at, completed_at, created_at
|
last_error, scheduled_at, started_at, completed_at, created_at
|
||||||
FROM jobs
|
FROM jobs
|
||||||
WHERE id = $1
|
WHERE id = $1
|
||||||
@@ -77,11 +77,11 @@ func (r *JobRepository) Create(ctx context.Context, job *domain.Job) error {
|
|||||||
|
|
||||||
err := r.db.QueryRowContext(ctx, `
|
err := r.db.QueryRowContext(ctx, `
|
||||||
INSERT INTO jobs (
|
INSERT INTO jobs (
|
||||||
id, type, certificate_id, target_id, status, attempts, max_attempts,
|
id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
|
||||||
last_error, scheduled_at, started_at, completed_at, created_at
|
last_error, scheduled_at, started_at, completed_at, created_at
|
||||||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
|
||||||
RETURNING id
|
RETURNING id
|
||||||
`, job.ID, job.Type, job.CertificateID, job.TargetID, job.Status, job.Attempts,
|
`, job.ID, job.Type, job.CertificateID, job.TargetID, job.AgentID, job.Status, job.Attempts,
|
||||||
job.MaxAttempts, job.LastError, job.ScheduledAt, job.StartedAt, job.CompletedAt,
|
job.MaxAttempts, job.LastError, job.ScheduledAt, job.StartedAt, job.CompletedAt,
|
||||||
job.CreatedAt).Scan(&job.ID)
|
job.CreatedAt).Scan(&job.ID)
|
||||||
|
|
||||||
@@ -99,15 +99,16 @@ func (r *JobRepository) Update(ctx context.Context, job *domain.Job) error {
|
|||||||
type = $1,
|
type = $1,
|
||||||
certificate_id = $2,
|
certificate_id = $2,
|
||||||
target_id = $3,
|
target_id = $3,
|
||||||
status = $4,
|
agent_id = $4,
|
||||||
attempts = $5,
|
status = $5,
|
||||||
max_attempts = $6,
|
attempts = $6,
|
||||||
last_error = $7,
|
max_attempts = $7,
|
||||||
scheduled_at = $8,
|
last_error = $8,
|
||||||
started_at = $9,
|
scheduled_at = $9,
|
||||||
completed_at = $10
|
started_at = $10,
|
||||||
WHERE id = $11
|
completed_at = $11
|
||||||
`, job.Type, job.CertificateID, job.TargetID, job.Status, job.Attempts,
|
WHERE id = $12
|
||||||
|
`, job.Type, job.CertificateID, job.TargetID, job.AgentID, job.Status, job.Attempts,
|
||||||
job.MaxAttempts, job.LastError, job.ScheduledAt, job.StartedAt,
|
job.MaxAttempts, job.LastError, job.ScheduledAt, job.StartedAt,
|
||||||
job.CompletedAt, job.ID)
|
job.CompletedAt, job.ID)
|
||||||
|
|
||||||
@@ -150,7 +151,7 @@ func (r *JobRepository) Delete(ctx context.Context, id string) error {
|
|||||||
// ListByStatus returns jobs with a specific status
|
// ListByStatus returns jobs with a specific status
|
||||||
func (r *JobRepository) ListByStatus(ctx context.Context, status domain.JobStatus) ([]*domain.Job, error) {
|
func (r *JobRepository) ListByStatus(ctx context.Context, status domain.JobStatus) ([]*domain.Job, error) {
|
||||||
rows, err := r.db.QueryContext(ctx, `
|
rows, err := r.db.QueryContext(ctx, `
|
||||||
SELECT id, type, certificate_id, target_id, status, attempts, max_attempts,
|
SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
|
||||||
last_error, scheduled_at, started_at, completed_at, created_at
|
last_error, scheduled_at, started_at, completed_at, created_at
|
||||||
FROM jobs
|
FROM jobs
|
||||||
WHERE status = $1
|
WHERE status = $1
|
||||||
@@ -181,7 +182,7 @@ func (r *JobRepository) ListByStatus(ctx context.Context, status domain.JobStatu
|
|||||||
// ListByCertificate returns all jobs for a certificate
|
// ListByCertificate returns all jobs for a certificate
|
||||||
func (r *JobRepository) ListByCertificate(ctx context.Context, certID string) ([]*domain.Job, error) {
|
func (r *JobRepository) ListByCertificate(ctx context.Context, certID string) ([]*domain.Job, error) {
|
||||||
rows, err := r.db.QueryContext(ctx, `
|
rows, err := r.db.QueryContext(ctx, `
|
||||||
SELECT id, type, certificate_id, target_id, status, attempts, max_attempts,
|
SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
|
||||||
last_error, scheduled_at, started_at, completed_at, created_at
|
last_error, scheduled_at, started_at, completed_at, created_at
|
||||||
FROM jobs
|
FROM jobs
|
||||||
WHERE certificate_id = $1
|
WHERE certificate_id = $1
|
||||||
@@ -239,7 +240,7 @@ func (r *JobRepository) UpdateStatus(ctx context.Context, id string, status doma
|
|||||||
// GetPendingJobs returns jobs not yet processed of a specific type
|
// GetPendingJobs returns jobs not yet processed of a specific type
|
||||||
func (r *JobRepository) GetPendingJobs(ctx context.Context, jobType domain.JobType) ([]*domain.Job, error) {
|
func (r *JobRepository) GetPendingJobs(ctx context.Context, jobType domain.JobType) ([]*domain.Job, error) {
|
||||||
rows, err := r.db.QueryContext(ctx, `
|
rows, err := r.db.QueryContext(ctx, `
|
||||||
SELECT id, type, certificate_id, target_id, status, attempts, max_attempts,
|
SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
|
||||||
last_error, scheduled_at, started_at, completed_at, created_at
|
last_error, scheduled_at, started_at, completed_at, created_at
|
||||||
FROM jobs
|
FROM jobs
|
||||||
WHERE type = $1 AND status = $2
|
WHERE type = $1 AND status = $2
|
||||||
@@ -267,13 +268,71 @@ func (r *JobRepository) GetPendingJobs(ctx context.Context, jobType domain.JobTy
|
|||||||
return jobs, nil
|
return jobs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListPendingByAgentID returns pending deployment jobs and AwaitingCSR jobs for a specific agent.
|
||||||
|
// Deployment jobs are matched by agent_id directly (set at creation time), with a fallback
|
||||||
|
// for legacy jobs where agent_id is NULL but target_id resolves to the agent via deployment_targets.
|
||||||
|
// AwaitingCSR jobs are matched through certificate → target mappings → agent ownership.
|
||||||
|
func (r *JobRepository) ListPendingByAgentID(ctx context.Context, agentID string) ([]*domain.Job, error) {
|
||||||
|
rows, err := r.db.QueryContext(ctx, `
|
||||||
|
SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
|
||||||
|
last_error, scheduled_at, started_at, completed_at, created_at
|
||||||
|
FROM jobs
|
||||||
|
WHERE agent_id = $1 AND status = 'Pending' AND type = 'Deployment'
|
||||||
|
|
||||||
|
UNION ALL
|
||||||
|
|
||||||
|
SELECT j.id, j.type, j.certificate_id, j.target_id, j.agent_id, j.status, j.attempts, j.max_attempts,
|
||||||
|
j.last_error, j.scheduled_at, j.started_at, j.completed_at, j.created_at
|
||||||
|
FROM jobs j
|
||||||
|
INNER JOIN deployment_targets dt ON j.target_id = dt.id
|
||||||
|
WHERE j.agent_id IS NULL AND j.status = 'Pending' AND j.type = 'Deployment'
|
||||||
|
AND dt.agent_id = $1
|
||||||
|
|
||||||
|
UNION ALL
|
||||||
|
|
||||||
|
SELECT j.id, j.type, j.certificate_id, j.target_id, j.agent_id, j.status, j.attempts, j.max_attempts,
|
||||||
|
j.last_error, j.scheduled_at, j.started_at, j.completed_at, j.created_at
|
||||||
|
FROM jobs j
|
||||||
|
WHERE j.status = 'AwaitingCSR'
|
||||||
|
AND j.type IN ('Renewal', 'Issuance')
|
||||||
|
AND EXISTS (
|
||||||
|
SELECT 1 FROM certificate_target_mappings ctm
|
||||||
|
INNER JOIN deployment_targets dt ON ctm.target_id = dt.id
|
||||||
|
WHERE ctm.certificate_id = j.certificate_id
|
||||||
|
AND dt.agent_id = $1
|
||||||
|
)
|
||||||
|
|
||||||
|
ORDER BY created_at ASC
|
||||||
|
`, agentID)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to query pending jobs for agent: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var jobs []*domain.Job
|
||||||
|
for rows.Next() {
|
||||||
|
job, err := scanJob(rows)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
jobs = append(jobs, job)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
return nil, fmt.Errorf("error iterating pending agent job rows: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return jobs, nil
|
||||||
|
}
|
||||||
|
|
||||||
// scanJob scans a job from a row or rows
|
// scanJob scans a job from a row or rows
|
||||||
func scanJob(scanner interface {
|
func scanJob(scanner interface {
|
||||||
Scan(...interface{}) error
|
Scan(...interface{}) error
|
||||||
}) (*domain.Job, error) {
|
}) (*domain.Job, error) {
|
||||||
var job domain.Job
|
var job domain.Job
|
||||||
err := scanner.Scan(&job.ID, &job.Type, &job.CertificateID, &job.TargetID,
|
err := scanner.Scan(&job.ID, &job.Type, &job.CertificateID, &job.TargetID,
|
||||||
&job.Status, &job.Attempts, &job.MaxAttempts, &job.LastError,
|
&job.AgentID, &job.Status, &job.Attempts, &job.MaxAttempts, &job.LastError,
|
||||||
&job.ScheduledAt, &job.StartedAt, &job.CompletedAt, &job.CreatedAt)
|
&job.ScheduledAt, &job.StartedAt, &job.CompletedAt, &job.CreatedAt)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -251,38 +251,17 @@ func (s *AgentService) GetCertificateForAgent(ctx context.Context, agentID strin
|
|||||||
|
|
||||||
// GetPendingWork returns actionable jobs for an agent: deployment jobs (Pending) and
|
// GetPendingWork returns actionable jobs for an agent: deployment jobs (Pending) and
|
||||||
// renewal/issuance jobs awaiting CSR submission (AwaitingCSR).
|
// renewal/issuance jobs awaiting CSR submission (AwaitingCSR).
|
||||||
|
// Jobs are scoped to the requesting agent via agent_id (set at job creation) or
|
||||||
|
// through target→agent relationships for legacy jobs and AwaitingCSR routing.
|
||||||
func (s *AgentService) GetPendingWork(ctx context.Context, agentID string) ([]*domain.Job, error) {
|
func (s *AgentService) GetPendingWork(ctx context.Context, agentID string) ([]*domain.Job, error) {
|
||||||
// Fetch agent to verify it exists
|
// Verify agent exists
|
||||||
_, err := s.agentRepo.Get(ctx, agentID)
|
_, err := s.agentRepo.Get(ctx, agentID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to fetch agent: %w", err)
|
return nil, fmt.Errorf("failed to fetch agent: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var workForAgent []*domain.Job
|
// Return only jobs assigned to this agent (via agent_id or target→agent relationship)
|
||||||
|
return s.jobRepo.ListPendingByAgentID(ctx, agentID)
|
||||||
// Get pending deployment jobs
|
|
||||||
pendingJobs, err := s.jobRepo.ListByStatus(ctx, domain.JobStatusPending)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to list pending jobs: %w", err)
|
|
||||||
}
|
|
||||||
for _, job := range pendingJobs {
|
|
||||||
if job.Type == domain.JobTypeDeployment {
|
|
||||||
workForAgent = append(workForAgent, job)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get AwaitingCSR jobs (agent keygen mode — agent needs to generate key + submit CSR)
|
|
||||||
awaitingJobs, err := s.jobRepo.ListByStatus(ctx, domain.JobStatusAwaitingCSR)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to list awaiting CSR jobs: %w", err)
|
|
||||||
}
|
|
||||||
for _, job := range awaitingJobs {
|
|
||||||
if job.Type == domain.JobTypeRenewal || job.Type == domain.JobTypeIssuance {
|
|
||||||
workForAgent = append(workForAgent, job)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return workForAgent, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReportJobStatus updates a job's status based on agent feedback.
|
// ReportJobStatus updates a job's status based on agent feedback.
|
||||||
|
|||||||
@@ -131,8 +131,9 @@ func TestHeartbeat_NotFound(t *testing.T) {
|
|||||||
func TestGetPendingWork(t *testing.T) {
|
func TestGetPendingWork(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
agentID := "agent-001"
|
||||||
agent := &domain.Agent{
|
agent := &domain.Agent{
|
||||||
ID: "agent-001",
|
ID: agentID,
|
||||||
Name: "prod-agent",
|
Name: "prod-agent",
|
||||||
Hostname: "server-01",
|
Hostname: "server-01",
|
||||||
Status: domain.AgentStatusOnline,
|
Status: domain.AgentStatusOnline,
|
||||||
@@ -146,6 +147,7 @@ func TestGetPendingWork(t *testing.T) {
|
|||||||
Type: domain.JobTypeDeployment,
|
Type: domain.JobTypeDeployment,
|
||||||
CertificateID: "cert-001",
|
CertificateID: "cert-001",
|
||||||
Status: domain.JobStatusPending,
|
Status: domain.JobStatusPending,
|
||||||
|
AgentID: &agentID,
|
||||||
CreatedAt: now,
|
CreatedAt: now,
|
||||||
}
|
}
|
||||||
job2 := &domain.Job{
|
job2 := &domain.Job{
|
||||||
@@ -157,7 +159,7 @@ func TestGetPendingWork(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
agentRepo := &mockAgentRepo{
|
agentRepo := &mockAgentRepo{
|
||||||
Agents: map[string]*domain.Agent{"agent-001": agent},
|
Agents: map[string]*domain.Agent{agentID: agent},
|
||||||
HeartbeatUpdates: make(map[string]time.Time),
|
HeartbeatUpdates: make(map[string]time.Time),
|
||||||
}
|
}
|
||||||
certRepo := &mockCertRepo{
|
certRepo := &mockCertRepo{
|
||||||
@@ -177,7 +179,7 @@ func TestGetPendingWork(t *testing.T) {
|
|||||||
|
|
||||||
agentService := NewAgentService(agentRepo, certRepo, jobRepo, targetRepo, auditService, issuerRegistry, nil)
|
agentService := NewAgentService(agentRepo, certRepo, jobRepo, targetRepo, auditService, issuerRegistry, nil)
|
||||||
|
|
||||||
jobs, err := agentService.GetPendingWork(ctx, "agent-001")
|
jobs, err := agentService.GetPendingWork(ctx, agentID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("GetPendingWork failed: %v", err)
|
t.Fatalf("GetPendingWork failed: %v", err)
|
||||||
}
|
}
|
||||||
@@ -185,11 +187,132 @@ func TestGetPendingWork(t *testing.T) {
|
|||||||
if len(jobs) != 1 {
|
if len(jobs) != 1 {
|
||||||
t.Errorf("expected 1 deployment job, got %d", len(jobs))
|
t.Errorf("expected 1 deployment job, got %d", len(jobs))
|
||||||
}
|
}
|
||||||
if jobs[0].Type != domain.JobTypeDeployment {
|
if len(jobs) > 0 && jobs[0].Type != domain.JobTypeDeployment {
|
||||||
t.Errorf("expected JobTypeDeployment, got %s", jobs[0].Type)
|
t.Errorf("expected JobTypeDeployment, got %s", jobs[0].Type)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGetPendingWork_OnlyReturnsAgentJobs(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
now := time.Now()
|
||||||
|
agentA := "agent-A"
|
||||||
|
agentB := "agent-B"
|
||||||
|
|
||||||
|
agentRepo := &mockAgentRepo{
|
||||||
|
Agents: map[string]*domain.Agent{
|
||||||
|
agentA: {ID: agentA, Name: "agent-A", Hostname: "host-a", Status: domain.AgentStatusOnline, RegisteredAt: now, APIKeyHash: "hashA"},
|
||||||
|
agentB: {ID: agentB, Name: "agent-B", Hostname: "host-b", Status: domain.AgentStatusOnline, RegisteredAt: now, APIKeyHash: "hashB"},
|
||||||
|
},
|
||||||
|
HeartbeatUpdates: make(map[string]time.Time),
|
||||||
|
}
|
||||||
|
|
||||||
|
jobA := &domain.Job{ID: "job-A", Type: domain.JobTypeDeployment, CertificateID: "cert-001", Status: domain.JobStatusPending, AgentID: &agentA, CreatedAt: now}
|
||||||
|
jobB := &domain.Job{ID: "job-B", Type: domain.JobTypeDeployment, CertificateID: "cert-002", Status: domain.JobStatusPending, AgentID: &agentB, CreatedAt: now}
|
||||||
|
|
||||||
|
jobRepo := &mockJobRepo{
|
||||||
|
Jobs: map[string]*domain.Job{"job-A": jobA, "job-B": jobB},
|
||||||
|
StatusUpdates: make(map[string]domain.JobStatus),
|
||||||
|
}
|
||||||
|
certRepo := &mockCertRepo{Certs: make(map[string]*domain.ManagedCertificate), Versions: make(map[string][]*domain.CertificateVersion)}
|
||||||
|
targetRepo := &mockTargetRepo{Targets: make(map[string]*domain.DeploymentTarget)}
|
||||||
|
auditService := NewAuditService(&mockAuditRepo{})
|
||||||
|
|
||||||
|
agentService := NewAgentService(agentRepo, certRepo, jobRepo, targetRepo, auditService, make(map[string]IssuerConnector), nil)
|
||||||
|
|
||||||
|
// Agent A should only see its job
|
||||||
|
jobsA, err := agentService.GetPendingWork(ctx, agentA)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetPendingWork for agent-A failed: %v", err)
|
||||||
|
}
|
||||||
|
if len(jobsA) != 1 {
|
||||||
|
t.Fatalf("expected 1 job for agent-A, got %d", len(jobsA))
|
||||||
|
}
|
||||||
|
if jobsA[0].ID != "job-A" {
|
||||||
|
t.Errorf("expected job-A, got %s", jobsA[0].ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Agent B should only see its job
|
||||||
|
jobsB, err := agentService.GetPendingWork(ctx, agentB)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetPendingWork for agent-B failed: %v", err)
|
||||||
|
}
|
||||||
|
if len(jobsB) != 1 {
|
||||||
|
t.Fatalf("expected 1 job for agent-B, got %d", len(jobsB))
|
||||||
|
}
|
||||||
|
if jobsB[0].ID != "job-B" {
|
||||||
|
t.Errorf("expected job-B, got %s", jobsB[0].ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetPendingWork_EmptyWhenNoJobsForAgent(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
now := time.Now()
|
||||||
|
agentA := "agent-A"
|
||||||
|
agentB := "agent-B"
|
||||||
|
|
||||||
|
agentRepo := &mockAgentRepo{
|
||||||
|
Agents: map[string]*domain.Agent{
|
||||||
|
agentA: {ID: agentA, Name: "agent-A", Hostname: "host-a", Status: domain.AgentStatusOnline, RegisteredAt: now, APIKeyHash: "hashA"},
|
||||||
|
},
|
||||||
|
HeartbeatUpdates: make(map[string]time.Time),
|
||||||
|
}
|
||||||
|
|
||||||
|
// All jobs belong to agent-B
|
||||||
|
jobB := &domain.Job{ID: "job-B", Type: domain.JobTypeDeployment, CertificateID: "cert-001", Status: domain.JobStatusPending, AgentID: &agentB, CreatedAt: now}
|
||||||
|
|
||||||
|
jobRepo := &mockJobRepo{
|
||||||
|
Jobs: map[string]*domain.Job{"job-B": jobB},
|
||||||
|
StatusUpdates: make(map[string]domain.JobStatus),
|
||||||
|
}
|
||||||
|
certRepo := &mockCertRepo{Certs: make(map[string]*domain.ManagedCertificate), Versions: make(map[string][]*domain.CertificateVersion)}
|
||||||
|
targetRepo := &mockTargetRepo{Targets: make(map[string]*domain.DeploymentTarget)}
|
||||||
|
auditService := NewAuditService(&mockAuditRepo{})
|
||||||
|
|
||||||
|
agentService := NewAgentService(agentRepo, certRepo, jobRepo, targetRepo, auditService, make(map[string]IssuerConnector), nil)
|
||||||
|
|
||||||
|
jobs, err := agentService.GetPendingWork(ctx, agentA)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetPendingWork failed: %v", err)
|
||||||
|
}
|
||||||
|
if len(jobs) != 0 {
|
||||||
|
t.Errorf("expected 0 jobs for agent-A (all jobs are for agent-B), got %d", len(jobs))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetPendingWork_DeploymentAndCSR_Scoped(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
now := time.Now()
|
||||||
|
agentA := "agent-A"
|
||||||
|
|
||||||
|
agentRepo := &mockAgentRepo{
|
||||||
|
Agents: map[string]*domain.Agent{
|
||||||
|
agentA: {ID: agentA, Name: "agent-A", Hostname: "host-a", Status: domain.AgentStatusOnline, RegisteredAt: now, APIKeyHash: "hashA"},
|
||||||
|
},
|
||||||
|
HeartbeatUpdates: make(map[string]time.Time),
|
||||||
|
}
|
||||||
|
|
||||||
|
deployJob := &domain.Job{ID: "job-deploy", Type: domain.JobTypeDeployment, CertificateID: "cert-001", Status: domain.JobStatusPending, AgentID: &agentA, CreatedAt: now}
|
||||||
|
csrJob := &domain.Job{ID: "job-csr", Type: domain.JobTypeRenewal, CertificateID: "cert-002", Status: domain.JobStatusAwaitingCSR, AgentID: &agentA, CreatedAt: now}
|
||||||
|
|
||||||
|
jobRepo := &mockJobRepo{
|
||||||
|
Jobs: map[string]*domain.Job{"job-deploy": deployJob, "job-csr": csrJob},
|
||||||
|
StatusUpdates: make(map[string]domain.JobStatus),
|
||||||
|
}
|
||||||
|
certRepo := &mockCertRepo{Certs: make(map[string]*domain.ManagedCertificate), Versions: make(map[string][]*domain.CertificateVersion)}
|
||||||
|
targetRepo := &mockTargetRepo{Targets: make(map[string]*domain.DeploymentTarget)}
|
||||||
|
auditService := NewAuditService(&mockAuditRepo{})
|
||||||
|
|
||||||
|
agentService := NewAgentService(agentRepo, certRepo, jobRepo, targetRepo, auditService, make(map[string]IssuerConnector), nil)
|
||||||
|
|
||||||
|
jobs, err := agentService.GetPendingWork(ctx, agentA)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetPendingWork failed: %v", err)
|
||||||
|
}
|
||||||
|
if len(jobs) != 2 {
|
||||||
|
t.Fatalf("expected 2 jobs (deployment + AwaitingCSR), got %d", len(jobs))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestReportJobStatus(t *testing.T) {
|
func TestReportJobStatus(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|||||||
@@ -67,6 +67,11 @@ func (s *DeploymentService) CreateDeploymentJobs(ctx context.Context, certID str
|
|||||||
if target.ID != "" {
|
if target.ID != "" {
|
||||||
job.TargetID = &target.ID
|
job.TargetID = &target.ID
|
||||||
}
|
}
|
||||||
|
// Route job to the target's assigned agent
|
||||||
|
if target.AgentID != "" {
|
||||||
|
agentID := target.AgentID
|
||||||
|
job.AgentID = &agentID
|
||||||
|
}
|
||||||
|
|
||||||
if err := s.jobRepo.Create(ctx, job); err != nil {
|
if err := s.jobRepo.Create(ctx, job); err != nil {
|
||||||
slog.Error("failed to create deployment job for target", "target_id", target.ID, "error", err)
|
slog.Error("failed to create deployment job for target", "target_id", target.ID, "error", err)
|
||||||
|
|||||||
@@ -85,6 +85,45 @@ func TestDeploymentService_CreateDeploymentJobs_Success(t *testing.T) {
|
|||||||
if job.TargetID == nil || len(*job.TargetID) == 0 {
|
if job.TargetID == nil || len(*job.TargetID) == 0 {
|
||||||
t.Errorf("expected job to have TargetID set")
|
t.Errorf("expected job to have TargetID set")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// M31: Verify AgentID is set from target's agent assignment
|
||||||
|
if job.AgentID == nil {
|
||||||
|
t.Errorf("expected job to have AgentID set (M31 agent routing)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestDeploymentService_CreateDeploymentJobs_SetsAgentID verifies AgentID is populated from target.
|
||||||
|
func TestDeploymentService_CreateDeploymentJobs_SetsAgentID(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
svc, jobRepo, targetRepo, _, _, _, _ := newTestDeploymentService()
|
||||||
|
|
||||||
|
target := &domain.DeploymentTarget{
|
||||||
|
ID: "tgt-nginx-1",
|
||||||
|
Name: "NGINX Server 1",
|
||||||
|
Type: domain.TargetTypeNGINX,
|
||||||
|
AgentID: "agent-web-01",
|
||||||
|
Enabled: true,
|
||||||
|
CreatedAt: time.Now(),
|
||||||
|
UpdatedAt: time.Now(),
|
||||||
|
}
|
||||||
|
targetRepo.AddTarget(target)
|
||||||
|
|
||||||
|
jobIDs, err := svc.CreateDeploymentJobs(ctx, "mc-cert-1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("CreateDeploymentJobs failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(jobIDs) != 1 {
|
||||||
|
t.Fatalf("expected 1 job, got %d", len(jobIDs))
|
||||||
|
}
|
||||||
|
|
||||||
|
job := jobRepo.Jobs[jobIDs[0]]
|
||||||
|
if job.AgentID == nil {
|
||||||
|
t.Fatal("expected AgentID to be set on deployment job")
|
||||||
|
}
|
||||||
|
if *job.AgentID != "agent-web-01" {
|
||||||
|
t.Errorf("expected AgentID 'agent-web-01', got '%s'", *job.AgentID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -26,12 +26,18 @@ type RenewalService struct {
|
|||||||
jobRepo repository.JobRepository
|
jobRepo repository.JobRepository
|
||||||
renewalPolicyRepo repository.RenewalPolicyRepository
|
renewalPolicyRepo repository.RenewalPolicyRepository
|
||||||
profileRepo repository.CertificateProfileRepository
|
profileRepo repository.CertificateProfileRepository
|
||||||
|
targetRepo repository.TargetRepository
|
||||||
auditService *AuditService
|
auditService *AuditService
|
||||||
notificationSvc *NotificationService
|
notificationSvc *NotificationService
|
||||||
issuerRegistry map[string]IssuerConnector
|
issuerRegistry map[string]IssuerConnector
|
||||||
keygenMode string // "agent" (default) or "server" (demo only)
|
keygenMode string // "agent" (default) or "server" (demo only)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetTargetRepo sets the target repository for resolving agent_id on deployment jobs.
|
||||||
|
func (s *RenewalService) SetTargetRepo(repo repository.TargetRepository) {
|
||||||
|
s.targetRepo = repo
|
||||||
|
}
|
||||||
|
|
||||||
// IssuerConnector defines the service-layer interface for interacting with certificate issuers.
|
// IssuerConnector defines the service-layer interface for interacting with certificate issuers.
|
||||||
// This is distinct from the connector-layer issuer.Connector interface to maintain dependency
|
// This is distinct from the connector-layer issuer.Connector interface to maintain dependency
|
||||||
// inversion. Use IssuerConnectorAdapter to bridge between the two.
|
// inversion. Use IssuerConnectorAdapter to bridge between the two.
|
||||||
@@ -636,12 +642,26 @@ func (s *RenewalService) createDeploymentJobs(ctx context.Context, cert *domain.
|
|||||||
}
|
}
|
||||||
for _, targetID := range cert.TargetIDs {
|
for _, targetID := range cert.TargetIDs {
|
||||||
tid := targetID
|
tid := targetID
|
||||||
|
|
||||||
|
// Resolve agent_id from target for job routing
|
||||||
|
var agentIDPtr *string
|
||||||
|
if s.targetRepo != nil {
|
||||||
|
target, err := s.targetRepo.Get(ctx, tid)
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn("failed to resolve agent for deployment job", "target_id", tid, "error", err)
|
||||||
|
} else if target.AgentID != "" {
|
||||||
|
agentID := target.AgentID
|
||||||
|
agentIDPtr = &agentID
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
deployJob := &domain.Job{
|
deployJob := &domain.Job{
|
||||||
ID: generateID("job"),
|
ID: generateID("job"),
|
||||||
CertificateID: cert.ID,
|
CertificateID: cert.ID,
|
||||||
Type: domain.JobTypeDeployment,
|
Type: domain.JobTypeDeployment,
|
||||||
Status: domain.JobStatusPending,
|
Status: domain.JobStatusPending,
|
||||||
TargetID: &tid,
|
TargetID: &tid,
|
||||||
|
AgentID: agentIDPtr,
|
||||||
MaxAttempts: 3,
|
MaxAttempts: 3,
|
||||||
ScheduledAt: time.Now(),
|
ScheduledAt: time.Now(),
|
||||||
CreatedAt: time.Now(),
|
CreatedAt: time.Now(),
|
||||||
|
|||||||
@@ -243,6 +243,25 @@ func (m *mockJobRepo) GetPendingJobs(ctx context.Context, jobType domain.JobType
|
|||||||
return jobs, nil
|
return jobs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *mockJobRepo) ListPendingByAgentID(ctx context.Context, agentID string) ([]*domain.Job, error) {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
if m.ListErr != nil {
|
||||||
|
return nil, m.ListErr
|
||||||
|
}
|
||||||
|
var result []*domain.Job
|
||||||
|
for _, j := range m.Jobs {
|
||||||
|
if j.AgentID != nil && *j.AgentID == agentID {
|
||||||
|
if j.Status == domain.JobStatusPending && j.Type == domain.JobTypeDeployment {
|
||||||
|
result = append(result, j)
|
||||||
|
} else if j.Status == domain.JobStatusAwaitingCSR {
|
||||||
|
result = append(result, j)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *mockJobRepo) AddJob(job *domain.Job) {
|
func (m *mockJobRepo) AddJob(job *domain.Job) {
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
defer m.mu.Unlock()
|
defer m.mu.Unlock()
|
||||||
|
|||||||
@@ -65,6 +65,10 @@ func (m *mockVerificationJobRepo) GetPendingJobs(ctx context.Context, jobType do
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *mockVerificationJobRepo) ListPendingByAgentID(ctx context.Context, agentID string) ([]*domain.Job, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// newVerificationTestService creates a VerificationService wired with test doubles.
|
// newVerificationTestService creates a VerificationService wired with test doubles.
|
||||||
func newVerificationTestService(jobs map[string]*domain.Job, jobRepoErr error) (*VerificationService, *mockVerificationJobRepo, *mockAuditRepo) {
|
func newVerificationTestService(jobs map[string]*domain.Job, jobRepoErr error) (*VerificationService, *mockVerificationJobRepo, *mockAuditRepo) {
|
||||||
jobRepo := &mockVerificationJobRepo{jobs: jobs, err: jobRepoErr}
|
jobRepo := &mockVerificationJobRepo{jobs: jobs, err: jobRepoErr}
|
||||||
|
|||||||
@@ -70,6 +70,8 @@ export interface Job {
|
|||||||
id: string;
|
id: string;
|
||||||
certificate_id: string;
|
certificate_id: string;
|
||||||
type: string;
|
type: string;
|
||||||
|
target_id?: string;
|
||||||
|
agent_id?: string;
|
||||||
status: string;
|
status: string;
|
||||||
attempts: number;
|
attempts: number;
|
||||||
max_attempts: number;
|
max_attempts: number;
|
||||||
|
|||||||
Reference in New Issue
Block a user