fix(audit): drain in-flight recording goroutines on shutdown (M-1)

Audit events spawned from the HTTP middleware ran in detached goroutines
using context.Background(). On SIGTERM the DB pool was closed before
those goroutines finished writing, silently dropping audit events
(CWE-662 Improper Synchronization / CWE-400 Uncontrolled Resource
Consumption).

NewAuditLog now returns an *AuditMiddleware struct that tracks every
spawned goroutine with sync.WaitGroup. Callers wire the middleware via
its Middleware method value (preserves the existing
func(http.Handler) http.Handler shape) and drain the WaitGroup with
Flush(ctx), which blocks until in-flight recordings complete or the
provided context is cancelled — mirroring scheduler.WaitForCompletion.

Flush is invoked in cmd/server/main.go between http.Server.Shutdown
(no new requests accepted) and db.Close (pool torn down), with a
timeout returning ErrAuditFlushTimeout wrapping ctx.Err().

Request-derived inputs (method, path, status) are snapshotted before
the goroutine spawn so the worker does not race with http.Server
reusing r after the handler returns.

Tests:
  TestAuditLog_FlushDrainsInFlightGoroutines
  TestAuditLog_FlushTimeoutReturnsErrAuditFlushTimeout

Verification:
  go build ./...                            : 0
  go vet ./...                              : 0
  go test -race -short ./...                : 0 (all packages)
  go test -cover ./internal/api/middleware  : 81.4%
  golangci-lint run                         : 0 issues
  govulncheck ./...                         : 0 vulns in called code
This commit is contained in:
Shankar
2026-04-17 17:29:48 +00:00
parent 96615ae0da
commit 5c69cdf33b
3 changed files with 291 additions and 70 deletions
+128 -10
View File
@@ -2,6 +2,7 @@ package middleware
import (
"context"
"errors"
"fmt"
"io"
"net/http"
@@ -16,7 +17,8 @@ import (
type mockAuditRecorder struct {
mu sync.Mutex
calls []auditCall
err error // if non-nil, RecordAPICall returns this
err error // if non-nil, RecordAPICall returns this
block chan struct{} // if non-nil, RecordAPICall blocks on receive before returning
}
type auditCall struct {
@@ -29,6 +31,13 @@ type auditCall struct {
}
func (m *mockAuditRecorder) RecordAPICall(ctx context.Context, method, path, actor, bodyHash string, status int, latencyMs int64) error {
// Optional: block the recorder until a signal is received so tests can
// exercise the shutdown-drain path deterministically. The block happens
// before any state mutation so Flush-timeout tests see the call
// "in-flight" (wg counter > 0) with no recorded entries yet.
if m.block != nil {
<-m.block
}
m.mu.Lock()
defer m.mu.Unlock()
m.calls = append(m.calls, auditCall{
@@ -90,7 +99,7 @@ func (w *waitableAuditRecorder) Wait(timeout time.Duration) bool {
func TestAuditLog_RecordsAPICall(t *testing.T) {
recorder := newWaitableAuditRecorder()
mw := NewAuditLog(recorder, AuditConfig{})
mw := NewAuditLog(recorder, AuditConfig{}).Middleware
handler := mw(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
@@ -130,7 +139,7 @@ func TestAuditLog_RecordsAPICall(t *testing.T) {
func TestAuditLog_CapturesStatusCode(t *testing.T) {
recorder := newWaitableAuditRecorder()
mw := NewAuditLog(recorder, AuditConfig{})
mw := NewAuditLog(recorder, AuditConfig{}).Middleware
handler := mw(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
@@ -157,7 +166,7 @@ func TestAuditLog_ExcludesHealth(t *testing.T) {
recorder := newWaitableAuditRecorder()
mw := NewAuditLog(recorder, AuditConfig{
ExcludePaths: []string{"/health", "/ready"},
})
}).Middleware
handler := mw(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
@@ -193,7 +202,7 @@ func TestAuditLog_ExcludesHealth(t *testing.T) {
func TestAuditLog_HashesRequestBody(t *testing.T) {
recorder := newWaitableAuditRecorder()
mw := NewAuditLog(recorder, AuditConfig{})
mw := NewAuditLog(recorder, AuditConfig{}).Middleware
// Handler verifies body was restored
handler := mw(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -228,7 +237,7 @@ func TestAuditLog_HashesRequestBody(t *testing.T) {
func TestAuditLog_EmptyBodyNoHash(t *testing.T) {
recorder := newWaitableAuditRecorder()
mw := NewAuditLog(recorder, AuditConfig{})
mw := NewAuditLog(recorder, AuditConfig{}).Middleware
handler := mw(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
@@ -253,7 +262,7 @@ func TestAuditLog_EmptyBodyNoHash(t *testing.T) {
func TestAuditLog_ExtractsAuthenticatedActor(t *testing.T) {
recorder := newWaitableAuditRecorder()
mw := NewAuditLog(recorder, AuditConfig{})
mw := NewAuditLog(recorder, AuditConfig{}).Middleware
handler := mw(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
@@ -285,7 +294,7 @@ func TestAuditLog_ExtractsAuthenticatedActor(t *testing.T) {
func TestAuditLog_RecorderErrorDoesNotBreakResponse(t *testing.T) {
recorder := &mockAuditRecorder{err: fmt.Errorf("db connection lost")}
mw := NewAuditLog(recorder, AuditConfig{})
mw := NewAuditLog(recorder, AuditConfig{}).Middleware
handler := mw(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
@@ -304,7 +313,7 @@ func TestAuditLog_RecorderErrorDoesNotBreakResponse(t *testing.T) {
func TestAuditLog_CapturesLatency(t *testing.T) {
recorder := newWaitableAuditRecorder()
mw := NewAuditLog(recorder, AuditConfig{})
mw := NewAuditLog(recorder, AuditConfig{}).Middleware
handler := mw(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(10 * time.Millisecond)
@@ -330,7 +339,7 @@ func TestAuditLog_CapturesLatency(t *testing.T) {
func TestAuditLog_ExcludesQueryParamsFromPath(t *testing.T) {
recorder := newWaitableAuditRecorder()
mw := NewAuditLog(recorder, AuditConfig{})
mw := NewAuditLog(recorder, AuditConfig{}).Middleware
handler := mw(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
@@ -429,3 +438,112 @@ func TestAuditServiceAdapter_PropagatesError(t *testing.T) {
t.Errorf("expected database error, got %v", err)
}
}
// TestAuditLog_FlushDrainsInFlightGoroutines verifies the M-1 shutdown-drain
// contract: Flush blocks until every audit-recording goroutine spawned by the
// middleware completes, then returns nil. Without the drain (pre-M-1 code),
// the DB pool would be closed while in-flight goroutines were still calling
// RecordAPICall, silently dropping audit events (CWE-662 / CWE-400).
func TestAuditLog_FlushDrainsInFlightGoroutines(t *testing.T) {
// Recorder blocks on `unblock` until the test releases it. This simulates
// a slow DB write still in flight when shutdown begins.
unblock := make(chan struct{})
recorder := &mockAuditRecorder{block: unblock}
auditMW := NewAuditLog(recorder, AuditConfig{})
handler := auditMW.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
// Fire a request. Handler returns immediately; recorder goroutine is
// parked on the `unblock` channel inside RecordAPICall.
req := httptest.NewRequest(http.MethodGet, "/api/v1/certificates", nil)
rr := httptest.NewRecorder()
handler.ServeHTTP(rr, req)
// Start Flush in a goroutine — it must block on the WaitGroup until we
// release the recorder.
flushDone := make(chan error, 1)
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
flushDone <- auditMW.Flush(ctx)
}()
// Confirm Flush is actually blocked (not returning immediately).
select {
case err := <-flushDone:
t.Fatalf("Flush returned before recorder unblocked: err=%v", err)
case <-time.After(50 * time.Millisecond):
// expected: Flush is blocked on wg.Wait
}
// Release the recorder. Flush should now observe wg counter drop to 0
// and return nil.
close(unblock)
select {
case err := <-flushDone:
if err != nil {
t.Fatalf("expected nil from Flush after drain, got %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("Flush did not return after recorder unblocked")
}
// Verify the audit event was actually recorded (i.e., the goroutine
// completed its write — not just that Flush unblocked).
calls := recorder.getCalls()
if len(calls) != 1 {
t.Fatalf("expected 1 recorded audit call, got %d", len(calls))
}
if calls[0].Path != "/api/v1/certificates" {
t.Errorf("expected path /api/v1/certificates, got %s", calls[0].Path)
}
}
// TestAuditLog_FlushTimeoutReturnsErrAuditFlushTimeout verifies that Flush
// respects its context: when in-flight goroutines exceed the shutdown budget,
// Flush returns an error wrapping ErrAuditFlushTimeout plus ctx.Err(). The
// caller can then decide whether to proceed with teardown anyway.
func TestAuditLog_FlushTimeoutReturnsErrAuditFlushTimeout(t *testing.T) {
// Recorder will never unblock on its own — we unblock at end of test for
// a clean race-safe teardown.
unblock := make(chan struct{})
recorder := &mockAuditRecorder{block: unblock}
auditMW := NewAuditLog(recorder, AuditConfig{})
handler := auditMW.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
req := httptest.NewRequest(http.MethodPost, "/api/v1/certificates", nil)
rr := httptest.NewRecorder()
handler.ServeHTTP(rr, req)
// Flush with a tiny deadline — must time out.
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
defer cancel()
err := auditMW.Flush(ctx)
if err == nil {
// Release the blocked goroutine before failing so the race detector
// doesn't trip on teardown.
close(unblock)
t.Fatal("expected Flush to return an error on timeout, got nil")
}
if !errors.Is(err, ErrAuditFlushTimeout) {
close(unblock)
t.Fatalf("expected error to wrap ErrAuditFlushTimeout, got %v", err)
}
if !errors.Is(err, context.DeadlineExceeded) {
close(unblock)
t.Fatalf("expected error to wrap context.DeadlineExceeded, got %v", err)
}
// Race-safe teardown: unblock the recorder goroutine so it exits cleanly
// before the test returns. The goroutine itself is still detached and
// will record to the mock even after Flush timed out — that's the
// documented behavior (Flush surfaces the timeout; caller decides).
close(unblock)
}