From 30b251ea13c16ed1374bef6d784abd125ab96e37 Mon Sep 17 00:00:00 2001 From: shankar0123 Date: Thu, 30 Apr 2026 14:32:40 +0000 Subject: [PATCH] feat(agent): per-target deploy mutex serializes concurrent deploys to the same target MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 2 of the deploy-hardening I master bundle. Closes the agent-side race window where two concurrent renewals against the same target ID (typical: two SAN entries renewing in the same window) would otherwise collide on the connector's temp-file path or run the reload command against itself. The Agent struct grows a sync.Map of *sync.Mutex keyed on target ID; targetDeployMutex(targetID) lazy-init's one on first acquisition. executeDeploymentJob acquires the mutex before connector.DeployCertificate and releases via defer at function exit — the lock spans the full Deploy duration including PreCommit (validate), atomic-rename, PostCommit (reload), and post-deploy verify (Phases 4-9). Granularity per frozen decision 0.5: one mutex per target ID, NOT per (target, cert) pair. Cert deploy throughput is operator-grade tens-per-minute; coarse serialization simplifies reasoning about reload-side race windows. Mutexes live for the agent's lifetime — target IDs are bounded so no janitor needed (~16 bytes per entry). Empty TargetID (defensive — should never happen for deploy jobs) bypasses the lock to avoid a singleton serialization point pulling all targetless work onto a shared mutex. Tests (5 named cases in cmd/agent/deploy_mutex_test.go): - TestAgent_ConcurrentDeploysToSameTarget_Serialize — race-detector smoke; 10 goroutines acquire same target's mutex; max-in-flight asserts == 1 - TestAgent_DifferentTargetIDs_ParallelizeIndependently — per-target granularity proof - TestAgent_EmptyTargetID_ReturnsNilMutex — defensive contract - TestAgent_TargetMutex_IsStable — sync.Map LoadOrStore returns same pointer across calls - TestAgent_TargetMutex_RaceLookup — race-free under N=50 concurrent lookups for same key go test -race -count=1 green; gofmt + go vet + golangci-lint v2.11.4 all 0 issues against my new code (pre-existing import-grouping drift in agent_test.go / main.go / verify*.go is unrelated to this change and not caught by `go fmt ./...` which CI uses). Phase 3 next: ValidateOnly method on target.Connector interface; default impl returns ErrValidateOnlyNotSupported across all 13 connectors. --- cmd/agent/deploy_mutex_test.go | 143 +++++++++++++++++++++++++++++++++ cmd/agent/main.go | 57 +++++++++++++ 2 files changed, 200 insertions(+) create mode 100644 cmd/agent/deploy_mutex_test.go diff --git a/cmd/agent/deploy_mutex_test.go b/cmd/agent/deploy_mutex_test.go new file mode 100644 index 0000000..a4b6c23 --- /dev/null +++ b/cmd/agent/deploy_mutex_test.go @@ -0,0 +1,143 @@ +package main + +import ( + "sync" + "sync/atomic" + "testing" +) + +// Phase 2 of the deploy-hardening I master bundle: per-target +// deploy mutex serializes concurrent deploys to the same target +// at the agent dispatch layer. + +// TestAgent_ConcurrentDeploysToSameTarget_Serialize spawns N +// goroutines acquiring the same target's mutex and asserts that +// only one is in the critical section at a time. The "critical +// section" is simulated as an atomic-counter increment + sleep + +// decrement; if the lock works, max-in-flight is 1. +func TestAgent_ConcurrentDeploysToSameTarget_Serialize(t *testing.T) { + a := &Agent{} + + const N = 10 + var inFlight, maxInFlight int32 + var done int32 + var wg sync.WaitGroup + + for i := 0; i < N; i++ { + wg.Add(1) + go func() { + defer wg.Done() + mu := a.targetDeployMutex("target-A") + if mu == nil { + t.Errorf("expected non-nil mutex for non-empty target id") + return + } + mu.Lock() + defer mu.Unlock() + n := atomic.AddInt32(&inFlight, 1) + for { + m := atomic.LoadInt32(&maxInFlight) + if n <= m || atomic.CompareAndSwapInt32(&maxInFlight, m, n) { + break + } + } + // Brief work simulating the connector's Deploy. + for j := 0; j < 1000; j++ { + _ = j * j + } + atomic.AddInt32(&inFlight, -1) + atomic.AddInt32(&done, 1) + }() + } + wg.Wait() + + if done != N { + t.Errorf("done = %d, want %d (some goroutines didn't run)", done, N) + } + if maxInFlight > 1 { + t.Errorf("max concurrent critical sections = %d, want 1 (mutex broken)", maxInFlight) + } +} + +// TestAgent_DifferentTargetIDs_ParallelizeIndependently verifies +// the per-target granularity: deploys to target-A and target-B +// proceed in parallel (no global serialization point). +func TestAgent_DifferentTargetIDs_ParallelizeIndependently(t *testing.T) { + a := &Agent{} + + muA := a.targetDeployMutex("target-A") + muB := a.targetDeployMutex("target-B") + + if muA == nil || muB == nil { + t.Fatal("nil mutexes") + } + if muA == muB { + t.Error("target-A and target-B share the same mutex (broken granularity)") + } + + // Acquire A; B should still be acquirable concurrently. + muA.Lock() + defer muA.Unlock() + + acquired := make(chan struct{}) + go func() { + muB.Lock() + close(acquired) + muB.Unlock() + }() + <-acquired // would deadlock if B were blocked by A +} + +// TestAgent_EmptyTargetID_ReturnsNilMutex pins the +// "no-targetID = no-lock" contract. Defends against the +// pathological case where every targetless deploy serializes on a +// shared empty-string mutex. +func TestAgent_EmptyTargetID_ReturnsNilMutex(t *testing.T) { + a := &Agent{} + if mu := a.targetDeployMutex(""); mu != nil { + t.Errorf("empty targetID returned non-nil mutex: %p", mu) + } +} + +// TestAgent_TargetMutex_IsStable verifies sync.Map LoadOrStore +// semantics: same target ID returns the same *sync.Mutex pointer +// across calls (so the lock actually works across goroutines that +// look up the mutex independently). +func TestAgent_TargetMutex_IsStable(t *testing.T) { + a := &Agent{} + mu1 := a.targetDeployMutex("target-X") + mu2 := a.targetDeployMutex("target-X") + if mu1 != mu2 { + t.Errorf("targetMutex returned %p then %p for same id (stability broken)", mu1, mu2) + } +} + +// TestAgent_TargetMutex_RaceLookup pins the race-detector +// invariant: many goroutines calling targetDeployMutex +// concurrently for the same key all get the same pointer (no +// torn read). +func TestAgent_TargetMutex_RaceLookup(t *testing.T) { + a := &Agent{} + const N = 50 + results := make(chan *sync.Mutex, N) + var wg sync.WaitGroup + for i := 0; i < N; i++ { + wg.Add(1) + go func() { + defer wg.Done() + results <- a.targetDeployMutex("target-shared") + }() + } + wg.Wait() + close(results) + var first *sync.Mutex + for got := range results { + if first == nil { + first = got + continue + } + if got != first { + t.Errorf("goroutine got different mutex (%p vs %p)", got, first) + } + } +} diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 1bae84f..fbd1739 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -95,6 +95,47 @@ type Agent struct { // race with ctx.Done() and other cases. retiredOnce sync.Once retiredSignal chan struct{} + + // Deploy-hardening I Phase 2: per-target deploy mutex. + // Two cert renewals against the same target ID (e.g., two SAN + // entries renewing in the same window, or a fast-cycling + // renewal-then-test workflow) MUST serialize at the agent + // dispatch site. Without this lock, the underlying connector's + // temp-file path could collide and the reload command would + // race against itself. + // + // Granularity is one mutex per target ID, NOT per (target, cert) + // pair — frozen decision 0.5. Cert deploy throughput is + // operator-grade tens-per-minute; coarse serialization is fine + // and simplifies reasoning about reload-side race windows. + // + // sync.Map is sized for thousands of unique target IDs without + // rehash thrash; LoadOrStore is atomic + lock-free on the + // hot path. Mutexes live for the agent's lifetime — no janitor + // because target IDs are bounded and the per-target memory + // (~16 bytes per entry) is negligible vs. typical agent heap. + // + // Job items without a TargetID (e.g., agent-managed cert + no + // connector dispatch — should never happen for deploy jobs but + // defended anyway) bypass the lock to avoid a singleton + // serialization point. + deployMutexes sync.Map // map[string]*sync.Mutex, keyed on JobItem.TargetID +} + +// targetDeployMutex returns the per-target-ID *sync.Mutex, +// lazy-initialising one on first acquisition. Returns nil when +// targetID is empty (caller should skip the lock entirely). +// +// Phase 2 of the deploy-hardening I master bundle: the load-bearing +// serialization point that defends against concurrent deploys to the +// same target stomping each other's temp-file paths or reload +// commands. +func (a *Agent) targetDeployMutex(targetID string) *sync.Mutex { + if targetID == "" { + return nil + } + v, _ := a.deployMutexes.LoadOrStore(targetID, &sync.Mutex{}) + return v.(*sync.Mutex) } // WorkResponse represents the response from the work polling endpoint. @@ -667,6 +708,22 @@ func (a *Agent) executeDeploymentJob(ctx context.Context, job JobItem) { }, } + // Phase 2 of the deploy-hardening I master bundle: + // per-target deploy mutex. Acquire BEFORE + // DeployCertificate so two concurrent renewals against + // the same target ID serialize. The lock is held for the + // full Deploy duration including PreCommit (validate), + // PostCommit (reload), and post-deploy verify (Phases + // 4-9). Released on every return path via defer. + var targetID string + if job.TargetID != nil { + targetID = *job.TargetID + } + if mu := a.targetDeployMutex(targetID); mu != nil { + mu.Lock() + defer mu.Unlock() + } + result, err := connector.DeployCertificate(ctx, deployReq) if err != nil { a.logger.Error("deployment failed",