mirror of
https://github.com/shankar0123/certctl.git
synced 2026-06-12 00:28:58 +00:00
feat(M31): agent work routing — scope jobs to assigned agents
Deployment jobs now set agent_id from target→agent relationship at creation time. GetPendingWork() uses ListPendingByAgentID() with a 3-way UNION query (direct match, legacy NULL fallback via target JOIN, AwaitingCSR via cert→target→agent chain) so each agent only receives its own jobs. - Added AgentID *string to Job domain struct - Added agent_id to all job SQL queries (5 SELECTs, INSERT, UPDATE, scanJob) - New ListPendingByAgentID() repository method - Rewrote GetPendingWork() from ~25 lines to single scoped query - 4 new Go tests (3 agent routing + 1 deployment agent_id) - Frontend: agent_id/target_id on Job type Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -251,38 +251,17 @@ func (s *AgentService) GetCertificateForAgent(ctx context.Context, agentID strin
|
||||
|
||||
// GetPendingWork returns actionable jobs for an agent: deployment jobs (Pending) and
|
||||
// renewal/issuance jobs awaiting CSR submission (AwaitingCSR).
|
||||
// Jobs are scoped to the requesting agent via agent_id (set at job creation) or
|
||||
// through target→agent relationships for legacy jobs and AwaitingCSR routing.
|
||||
func (s *AgentService) GetPendingWork(ctx context.Context, agentID string) ([]*domain.Job, error) {
|
||||
// Fetch agent to verify it exists
|
||||
// Verify agent exists
|
||||
_, err := s.agentRepo.Get(ctx, agentID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to fetch agent: %w", err)
|
||||
}
|
||||
|
||||
var workForAgent []*domain.Job
|
||||
|
||||
// Get pending deployment jobs
|
||||
pendingJobs, err := s.jobRepo.ListByStatus(ctx, domain.JobStatusPending)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list pending jobs: %w", err)
|
||||
}
|
||||
for _, job := range pendingJobs {
|
||||
if job.Type == domain.JobTypeDeployment {
|
||||
workForAgent = append(workForAgent, job)
|
||||
}
|
||||
}
|
||||
|
||||
// Get AwaitingCSR jobs (agent keygen mode — agent needs to generate key + submit CSR)
|
||||
awaitingJobs, err := s.jobRepo.ListByStatus(ctx, domain.JobStatusAwaitingCSR)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list awaiting CSR jobs: %w", err)
|
||||
}
|
||||
for _, job := range awaitingJobs {
|
||||
if job.Type == domain.JobTypeRenewal || job.Type == domain.JobTypeIssuance {
|
||||
workForAgent = append(workForAgent, job)
|
||||
}
|
||||
}
|
||||
|
||||
return workForAgent, nil
|
||||
// Return only jobs assigned to this agent (via agent_id or target→agent relationship)
|
||||
return s.jobRepo.ListPendingByAgentID(ctx, agentID)
|
||||
}
|
||||
|
||||
// ReportJobStatus updates a job's status based on agent feedback.
|
||||
|
||||
@@ -131,8 +131,9 @@ func TestHeartbeat_NotFound(t *testing.T) {
|
||||
func TestGetPendingWork(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
now := time.Now()
|
||||
agentID := "agent-001"
|
||||
agent := &domain.Agent{
|
||||
ID: "agent-001",
|
||||
ID: agentID,
|
||||
Name: "prod-agent",
|
||||
Hostname: "server-01",
|
||||
Status: domain.AgentStatusOnline,
|
||||
@@ -146,6 +147,7 @@ func TestGetPendingWork(t *testing.T) {
|
||||
Type: domain.JobTypeDeployment,
|
||||
CertificateID: "cert-001",
|
||||
Status: domain.JobStatusPending,
|
||||
AgentID: &agentID,
|
||||
CreatedAt: now,
|
||||
}
|
||||
job2 := &domain.Job{
|
||||
@@ -157,7 +159,7 @@ func TestGetPendingWork(t *testing.T) {
|
||||
}
|
||||
|
||||
agentRepo := &mockAgentRepo{
|
||||
Agents: map[string]*domain.Agent{"agent-001": agent},
|
||||
Agents: map[string]*domain.Agent{agentID: agent},
|
||||
HeartbeatUpdates: make(map[string]time.Time),
|
||||
}
|
||||
certRepo := &mockCertRepo{
|
||||
@@ -177,7 +179,7 @@ func TestGetPendingWork(t *testing.T) {
|
||||
|
||||
agentService := NewAgentService(agentRepo, certRepo, jobRepo, targetRepo, auditService, issuerRegistry, nil)
|
||||
|
||||
jobs, err := agentService.GetPendingWork(ctx, "agent-001")
|
||||
jobs, err := agentService.GetPendingWork(ctx, agentID)
|
||||
if err != nil {
|
||||
t.Fatalf("GetPendingWork failed: %v", err)
|
||||
}
|
||||
@@ -185,11 +187,132 @@ func TestGetPendingWork(t *testing.T) {
|
||||
if len(jobs) != 1 {
|
||||
t.Errorf("expected 1 deployment job, got %d", len(jobs))
|
||||
}
|
||||
if jobs[0].Type != domain.JobTypeDeployment {
|
||||
if len(jobs) > 0 && jobs[0].Type != domain.JobTypeDeployment {
|
||||
t.Errorf("expected JobTypeDeployment, got %s", jobs[0].Type)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetPendingWork_OnlyReturnsAgentJobs(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
now := time.Now()
|
||||
agentA := "agent-A"
|
||||
agentB := "agent-B"
|
||||
|
||||
agentRepo := &mockAgentRepo{
|
||||
Agents: map[string]*domain.Agent{
|
||||
agentA: {ID: agentA, Name: "agent-A", Hostname: "host-a", Status: domain.AgentStatusOnline, RegisteredAt: now, APIKeyHash: "hashA"},
|
||||
agentB: {ID: agentB, Name: "agent-B", Hostname: "host-b", Status: domain.AgentStatusOnline, RegisteredAt: now, APIKeyHash: "hashB"},
|
||||
},
|
||||
HeartbeatUpdates: make(map[string]time.Time),
|
||||
}
|
||||
|
||||
jobA := &domain.Job{ID: "job-A", Type: domain.JobTypeDeployment, CertificateID: "cert-001", Status: domain.JobStatusPending, AgentID: &agentA, CreatedAt: now}
|
||||
jobB := &domain.Job{ID: "job-B", Type: domain.JobTypeDeployment, CertificateID: "cert-002", Status: domain.JobStatusPending, AgentID: &agentB, CreatedAt: now}
|
||||
|
||||
jobRepo := &mockJobRepo{
|
||||
Jobs: map[string]*domain.Job{"job-A": jobA, "job-B": jobB},
|
||||
StatusUpdates: make(map[string]domain.JobStatus),
|
||||
}
|
||||
certRepo := &mockCertRepo{Certs: make(map[string]*domain.ManagedCertificate), Versions: make(map[string][]*domain.CertificateVersion)}
|
||||
targetRepo := &mockTargetRepo{Targets: make(map[string]*domain.DeploymentTarget)}
|
||||
auditService := NewAuditService(&mockAuditRepo{})
|
||||
|
||||
agentService := NewAgentService(agentRepo, certRepo, jobRepo, targetRepo, auditService, make(map[string]IssuerConnector), nil)
|
||||
|
||||
// Agent A should only see its job
|
||||
jobsA, err := agentService.GetPendingWork(ctx, agentA)
|
||||
if err != nil {
|
||||
t.Fatalf("GetPendingWork for agent-A failed: %v", err)
|
||||
}
|
||||
if len(jobsA) != 1 {
|
||||
t.Fatalf("expected 1 job for agent-A, got %d", len(jobsA))
|
||||
}
|
||||
if jobsA[0].ID != "job-A" {
|
||||
t.Errorf("expected job-A, got %s", jobsA[0].ID)
|
||||
}
|
||||
|
||||
// Agent B should only see its job
|
||||
jobsB, err := agentService.GetPendingWork(ctx, agentB)
|
||||
if err != nil {
|
||||
t.Fatalf("GetPendingWork for agent-B failed: %v", err)
|
||||
}
|
||||
if len(jobsB) != 1 {
|
||||
t.Fatalf("expected 1 job for agent-B, got %d", len(jobsB))
|
||||
}
|
||||
if jobsB[0].ID != "job-B" {
|
||||
t.Errorf("expected job-B, got %s", jobsB[0].ID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetPendingWork_EmptyWhenNoJobsForAgent(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
now := time.Now()
|
||||
agentA := "agent-A"
|
||||
agentB := "agent-B"
|
||||
|
||||
agentRepo := &mockAgentRepo{
|
||||
Agents: map[string]*domain.Agent{
|
||||
agentA: {ID: agentA, Name: "agent-A", Hostname: "host-a", Status: domain.AgentStatusOnline, RegisteredAt: now, APIKeyHash: "hashA"},
|
||||
},
|
||||
HeartbeatUpdates: make(map[string]time.Time),
|
||||
}
|
||||
|
||||
// All jobs belong to agent-B
|
||||
jobB := &domain.Job{ID: "job-B", Type: domain.JobTypeDeployment, CertificateID: "cert-001", Status: domain.JobStatusPending, AgentID: &agentB, CreatedAt: now}
|
||||
|
||||
jobRepo := &mockJobRepo{
|
||||
Jobs: map[string]*domain.Job{"job-B": jobB},
|
||||
StatusUpdates: make(map[string]domain.JobStatus),
|
||||
}
|
||||
certRepo := &mockCertRepo{Certs: make(map[string]*domain.ManagedCertificate), Versions: make(map[string][]*domain.CertificateVersion)}
|
||||
targetRepo := &mockTargetRepo{Targets: make(map[string]*domain.DeploymentTarget)}
|
||||
auditService := NewAuditService(&mockAuditRepo{})
|
||||
|
||||
agentService := NewAgentService(agentRepo, certRepo, jobRepo, targetRepo, auditService, make(map[string]IssuerConnector), nil)
|
||||
|
||||
jobs, err := agentService.GetPendingWork(ctx, agentA)
|
||||
if err != nil {
|
||||
t.Fatalf("GetPendingWork failed: %v", err)
|
||||
}
|
||||
if len(jobs) != 0 {
|
||||
t.Errorf("expected 0 jobs for agent-A (all jobs are for agent-B), got %d", len(jobs))
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetPendingWork_DeploymentAndCSR_Scoped(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
now := time.Now()
|
||||
agentA := "agent-A"
|
||||
|
||||
agentRepo := &mockAgentRepo{
|
||||
Agents: map[string]*domain.Agent{
|
||||
agentA: {ID: agentA, Name: "agent-A", Hostname: "host-a", Status: domain.AgentStatusOnline, RegisteredAt: now, APIKeyHash: "hashA"},
|
||||
},
|
||||
HeartbeatUpdates: make(map[string]time.Time),
|
||||
}
|
||||
|
||||
deployJob := &domain.Job{ID: "job-deploy", Type: domain.JobTypeDeployment, CertificateID: "cert-001", Status: domain.JobStatusPending, AgentID: &agentA, CreatedAt: now}
|
||||
csrJob := &domain.Job{ID: "job-csr", Type: domain.JobTypeRenewal, CertificateID: "cert-002", Status: domain.JobStatusAwaitingCSR, AgentID: &agentA, CreatedAt: now}
|
||||
|
||||
jobRepo := &mockJobRepo{
|
||||
Jobs: map[string]*domain.Job{"job-deploy": deployJob, "job-csr": csrJob},
|
||||
StatusUpdates: make(map[string]domain.JobStatus),
|
||||
}
|
||||
certRepo := &mockCertRepo{Certs: make(map[string]*domain.ManagedCertificate), Versions: make(map[string][]*domain.CertificateVersion)}
|
||||
targetRepo := &mockTargetRepo{Targets: make(map[string]*domain.DeploymentTarget)}
|
||||
auditService := NewAuditService(&mockAuditRepo{})
|
||||
|
||||
agentService := NewAgentService(agentRepo, certRepo, jobRepo, targetRepo, auditService, make(map[string]IssuerConnector), nil)
|
||||
|
||||
jobs, err := agentService.GetPendingWork(ctx, agentA)
|
||||
if err != nil {
|
||||
t.Fatalf("GetPendingWork failed: %v", err)
|
||||
}
|
||||
if len(jobs) != 2 {
|
||||
t.Fatalf("expected 2 jobs (deployment + AwaitingCSR), got %d", len(jobs))
|
||||
}
|
||||
}
|
||||
|
||||
func TestReportJobStatus(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
now := time.Now()
|
||||
|
||||
@@ -67,6 +67,11 @@ func (s *DeploymentService) CreateDeploymentJobs(ctx context.Context, certID str
|
||||
if target.ID != "" {
|
||||
job.TargetID = &target.ID
|
||||
}
|
||||
// Route job to the target's assigned agent
|
||||
if target.AgentID != "" {
|
||||
agentID := target.AgentID
|
||||
job.AgentID = &agentID
|
||||
}
|
||||
|
||||
if err := s.jobRepo.Create(ctx, job); err != nil {
|
||||
slog.Error("failed to create deployment job for target", "target_id", target.ID, "error", err)
|
||||
|
||||
@@ -85,6 +85,45 @@ func TestDeploymentService_CreateDeploymentJobs_Success(t *testing.T) {
|
||||
if job.TargetID == nil || len(*job.TargetID) == 0 {
|
||||
t.Errorf("expected job to have TargetID set")
|
||||
}
|
||||
|
||||
// M31: Verify AgentID is set from target's agent assignment
|
||||
if job.AgentID == nil {
|
||||
t.Errorf("expected job to have AgentID set (M31 agent routing)")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestDeploymentService_CreateDeploymentJobs_SetsAgentID verifies AgentID is populated from target.
|
||||
func TestDeploymentService_CreateDeploymentJobs_SetsAgentID(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
svc, jobRepo, targetRepo, _, _, _, _ := newTestDeploymentService()
|
||||
|
||||
target := &domain.DeploymentTarget{
|
||||
ID: "tgt-nginx-1",
|
||||
Name: "NGINX Server 1",
|
||||
Type: domain.TargetTypeNGINX,
|
||||
AgentID: "agent-web-01",
|
||||
Enabled: true,
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
targetRepo.AddTarget(target)
|
||||
|
||||
jobIDs, err := svc.CreateDeploymentJobs(ctx, "mc-cert-1")
|
||||
if err != nil {
|
||||
t.Fatalf("CreateDeploymentJobs failed: %v", err)
|
||||
}
|
||||
|
||||
if len(jobIDs) != 1 {
|
||||
t.Fatalf("expected 1 job, got %d", len(jobIDs))
|
||||
}
|
||||
|
||||
job := jobRepo.Jobs[jobIDs[0]]
|
||||
if job.AgentID == nil {
|
||||
t.Fatal("expected AgentID to be set on deployment job")
|
||||
}
|
||||
if *job.AgentID != "agent-web-01" {
|
||||
t.Errorf("expected AgentID 'agent-web-01', got '%s'", *job.AgentID)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -26,12 +26,18 @@ type RenewalService struct {
|
||||
jobRepo repository.JobRepository
|
||||
renewalPolicyRepo repository.RenewalPolicyRepository
|
||||
profileRepo repository.CertificateProfileRepository
|
||||
targetRepo repository.TargetRepository
|
||||
auditService *AuditService
|
||||
notificationSvc *NotificationService
|
||||
issuerRegistry map[string]IssuerConnector
|
||||
keygenMode string // "agent" (default) or "server" (demo only)
|
||||
}
|
||||
|
||||
// SetTargetRepo sets the target repository for resolving agent_id on deployment jobs.
|
||||
func (s *RenewalService) SetTargetRepo(repo repository.TargetRepository) {
|
||||
s.targetRepo = repo
|
||||
}
|
||||
|
||||
// IssuerConnector defines the service-layer interface for interacting with certificate issuers.
|
||||
// This is distinct from the connector-layer issuer.Connector interface to maintain dependency
|
||||
// inversion. Use IssuerConnectorAdapter to bridge between the two.
|
||||
@@ -636,12 +642,26 @@ func (s *RenewalService) createDeploymentJobs(ctx context.Context, cert *domain.
|
||||
}
|
||||
for _, targetID := range cert.TargetIDs {
|
||||
tid := targetID
|
||||
|
||||
// Resolve agent_id from target for job routing
|
||||
var agentIDPtr *string
|
||||
if s.targetRepo != nil {
|
||||
target, err := s.targetRepo.Get(ctx, tid)
|
||||
if err != nil {
|
||||
slog.Warn("failed to resolve agent for deployment job", "target_id", tid, "error", err)
|
||||
} else if target.AgentID != "" {
|
||||
agentID := target.AgentID
|
||||
agentIDPtr = &agentID
|
||||
}
|
||||
}
|
||||
|
||||
deployJob := &domain.Job{
|
||||
ID: generateID("job"),
|
||||
CertificateID: cert.ID,
|
||||
Type: domain.JobTypeDeployment,
|
||||
Status: domain.JobStatusPending,
|
||||
TargetID: &tid,
|
||||
AgentID: agentIDPtr,
|
||||
MaxAttempts: 3,
|
||||
ScheduledAt: time.Now(),
|
||||
CreatedAt: time.Now(),
|
||||
|
||||
@@ -243,6 +243,25 @@ func (m *mockJobRepo) GetPendingJobs(ctx context.Context, jobType domain.JobType
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
func (m *mockJobRepo) ListPendingByAgentID(ctx context.Context, agentID string) ([]*domain.Job, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if m.ListErr != nil {
|
||||
return nil, m.ListErr
|
||||
}
|
||||
var result []*domain.Job
|
||||
for _, j := range m.Jobs {
|
||||
if j.AgentID != nil && *j.AgentID == agentID {
|
||||
if j.Status == domain.JobStatusPending && j.Type == domain.JobTypeDeployment {
|
||||
result = append(result, j)
|
||||
} else if j.Status == domain.JobStatusAwaitingCSR {
|
||||
result = append(result, j)
|
||||
}
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (m *mockJobRepo) AddJob(job *domain.Job) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
@@ -65,6 +65,10 @@ func (m *mockVerificationJobRepo) GetPendingJobs(ctx context.Context, jobType do
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *mockVerificationJobRepo) ListPendingByAgentID(ctx context.Context, agentID string) ([]*domain.Job, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// newVerificationTestService creates a VerificationService wired with test doubles.
|
||||
func newVerificationTestService(jobs map[string]*domain.Job, jobRepoErr error) (*VerificationService, *mockVerificationJobRepo, *mockAuditRepo) {
|
||||
jobRepo := &mockVerificationJobRepo{jobs: jobs, err: jobRepoErr}
|
||||
|
||||
Reference in New Issue
Block a user