mirror of
https://github.com/shankar0123/certctl.git
synced 2026-06-07 15:11:29 +00:00
asyncpoll: shared bounded-polling Poller + DigiCert refactor (Phase 1)
Phase 1 of the #5 acquisition-readiness fix from the 2026-05-01 issuer coverage audit. Pre-fix, four async-CA connectors (DigiCert, Sectigo, Entrust, GlobalSign) had GetOrderStatus paths that polled the upstream on every scheduler tick with no exponential backoff, no max-retry cap, and no deadline. The scheduler's tick rate (typically 30s) was the only throttle — an unready order got hit every 30s indefinitely, and a 429 from a rate-limited upstream produced "retry on the next tick" which re-fanned-out the same call. This commit ships the shared infrastructure (asyncpoll package) and refactors DigiCert as the reference. Sectigo / Entrust / GlobalSign follow the same mechanical pattern; they land in Phase 2. Phase 1 (this commit): - internal/connector/issuer/asyncpoll/asyncpoll.go: shared Poller with exponential backoff (5s → 15s → 45s → 2m → 5m capped), ±20% jitter, configurable MaxWait deadline (default 10m), and ctx-aware cancellation. - Result enum: StillPending / Done / Failed. PollFunc returns (Result, err); Poll handles the wait loop, deadline check, and ctx propagation. - ErrMaxWait sentinel for callers that want to distinguish "deadline exhausted" from "fn errored". - asyncpoll_test.go: 11 tests covering happy path, transient error keep-polling, Failed terminates immediately, MaxWait timeout, MaxWait+lastErr wrap, ctx cancel, multiplicative backoff, jitter bounds (statistical), pct=0 deterministic, defaults applied. - DigiCert refactor: GetOrderStatus now wraps pollOrderOnce in asyncpoll.Poll. Status-code triage: 2xx + parse + status="issued" → Done with cert 2xx + parse + status="pending" → StillPending 2xx + parse + status="rejected"/"denied" → Done with status="failed" 2xx + parse fail → Failed (permanent) 4xx (not 429) → Failed (404 = order doesn't exist) 429 / 5xx / network → StillPending - Config.PollMaxWaitSeconds (env: CERTCTL_DIGICERT_POLL_MAX_WAIT_SECONDS) exposes the per-call deadline knob; default 600 (10m). - Test helper buildDigicertConnector + GetOrderStatus_Pending test set PollMaxWaitSeconds=1 so async-pending tests don't block 10 minutes on the production default. Phase 2 (separate follow-up commit, not in this PR): - Sectigo refactor (collectNotReady sentinel maps to StillPending). - Entrust refactor (approval-pending → longer per-issuer MaxWait). - GlobalSign refactor (serial-tracking; same Poller). - Per-connector cadence integration tests against fake HTTP servers. - docs/async-polling.md + docs/connectors.md updates. Audit reference: cowork/issuer-coverage-audit-2026-05-01/RESULTS.md Top-10 fix #5 — Phase 1.
This commit is contained in:
@@ -0,0 +1,216 @@
|
|||||||
|
// Copyright (c) certctl
|
||||||
|
// SPDX-License-Identifier: BSL-1.1
|
||||||
|
|
||||||
|
// Package asyncpoll provides bounded polling for async-CA issuer
|
||||||
|
// connectors (DigiCert, Sectigo, Entrust, GlobalSign).
|
||||||
|
//
|
||||||
|
// Closes the #5 acquisition-readiness blocker from the 2026-05-01
|
||||||
|
// issuer coverage audit. Pre-fix, each async-CA connector had its own
|
||||||
|
// GetOrderStatus path that polled the upstream CA on every scheduler
|
||||||
|
// tick with no exponential backoff, no max-retry cap, and no deadline.
|
||||||
|
// The scheduler's tick rate (typically 30s) was the only throttle —
|
||||||
|
// an unready order got hit every 30s indefinitely, and a 429 from a
|
||||||
|
// rate-limited upstream produced "retry on the next tick" which
|
||||||
|
// re-fanned-out the same call.
|
||||||
|
//
|
||||||
|
// This package consolidates the four implementations behind a single
|
||||||
|
// Poller with:
|
||||||
|
//
|
||||||
|
// - Exponential backoff: 5s → 15s → 45s → 2m → 5m capped (default).
|
||||||
|
// - ±20% jitter at every wait so multiple certctl instances don't
|
||||||
|
// synchronize on the upstream CA's rate-limit window.
|
||||||
|
// - MaxWait deadline (default 10m) — a hard cap on how long a
|
||||||
|
// single Poll call blocks before returning StillPending. The
|
||||||
|
// scheduler can re-enqueue the job for a future tick if the
|
||||||
|
// operator's policy allows further attempts.
|
||||||
|
// - ctx-aware cancellation — propagates the caller's deadline /
|
||||||
|
// cancel through every wait.
|
||||||
|
//
|
||||||
|
// Issuer-specific HTTP request shapes live in the PollFunc closure
|
||||||
|
// passed to Poll; the backoff math is shared.
|
||||||
|
package asyncpoll
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"math/rand/v2"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Result is the outcome of one poll attempt.
|
||||||
|
type Result int
|
||||||
|
|
||||||
|
const (
|
||||||
|
// StillPending — the upstream is still working on the order;
|
||||||
|
// keep polling. The Poller waits, then invokes the PollFunc
|
||||||
|
// again (subject to MaxWait).
|
||||||
|
StillPending Result = iota
|
||||||
|
|
||||||
|
// Done — the order succeeded. The Poller returns immediately
|
||||||
|
// with the (Done, nil) tuple to the caller.
|
||||||
|
Done
|
||||||
|
|
||||||
|
// Failed — permanent failure (rejected, denied, malformed
|
||||||
|
// response). The Poller returns immediately with (Failed, err)
|
||||||
|
// to the caller; no further polling.
|
||||||
|
Failed
|
||||||
|
)
|
||||||
|
|
||||||
|
// PollFunc runs ONE poll attempt and reports the outcome.
|
||||||
|
//
|
||||||
|
// Returning (StillPending, nil) signals "transient; keep polling"
|
||||||
|
// and the Poller waits with the backoff schedule.
|
||||||
|
//
|
||||||
|
// Returning (StillPending, err) ALSO keeps polling — useful when
|
||||||
|
// the upstream returned a transient HTTP error (5xx, network blip)
|
||||||
|
// that the caller wants logged but not treated as fatal.
|
||||||
|
//
|
||||||
|
// Returning (Done, nil) signals success.
|
||||||
|
//
|
||||||
|
// Returning (Failed, err) signals permanent failure; err must be
|
||||||
|
// non-nil so the caller can include it in the upstream-facing
|
||||||
|
// status message.
|
||||||
|
type PollFunc func(ctx context.Context) (Result, error)
|
||||||
|
|
||||||
|
// Config holds the backoff knobs. All fields are optional; zero
|
||||||
|
// values fall back to package defaults documented inline.
|
||||||
|
type Config struct {
|
||||||
|
// MaxWait — hard cap on total wall-clock time inside Poll. After
|
||||||
|
// this expires, Poll returns (StillPending, ErrMaxWait). Default
|
||||||
|
// 10 minutes; tune via per-issuer Config.PollMaxWait.
|
||||||
|
MaxWait time.Duration
|
||||||
|
|
||||||
|
// InitialWait — first backoff (after the first poll attempt).
|
||||||
|
// Default 5 seconds.
|
||||||
|
InitialWait time.Duration
|
||||||
|
|
||||||
|
// MaxBackoff — cap on per-iteration wait. Default 5 minutes.
|
||||||
|
// Backoff schedule: InitialWait → 3× → 3× → ... capped at
|
||||||
|
// MaxBackoff (so 5s → 15s → 45s → 2m15s → 5m → 5m → ... by
|
||||||
|
// default).
|
||||||
|
MaxBackoff time.Duration
|
||||||
|
|
||||||
|
// JitterPct — fractional jitter applied to every wait, ±value.
|
||||||
|
// Default 0.2 (i.e., ±20%). Set to 0 for deterministic timing
|
||||||
|
// in tests.
|
||||||
|
JitterPct float64
|
||||||
|
}
|
||||||
|
|
||||||
|
// ErrMaxWait is returned (alongside StillPending) when the total
|
||||||
|
// wall-clock time inside Poll exceeded Config.MaxWait. Callers can
|
||||||
|
// errors.Is against this sentinel to distinguish "deadline exhausted"
|
||||||
|
// from "fn errored".
|
||||||
|
var ErrMaxWait = errors.New("asyncpoll: MaxWait deadline exceeded")
|
||||||
|
|
||||||
|
// Defaults. Exported so per-issuer tests can reference the same
|
||||||
|
// schedule without duplicating constants.
|
||||||
|
const (
|
||||||
|
DefaultMaxWait = 10 * time.Minute
|
||||||
|
DefaultInitialWait = 5 * time.Second
|
||||||
|
DefaultMaxBackoff = 5 * time.Minute
|
||||||
|
DefaultJitterPct = 0.2
|
||||||
|
)
|
||||||
|
|
||||||
|
// Poll runs fn with exponential backoff + jitter until Done, Failed,
|
||||||
|
// MaxWait, or ctx cancellation.
|
||||||
|
//
|
||||||
|
// On Done — returns (Done, nil). The cert is ready; caller proceeds.
|
||||||
|
//
|
||||||
|
// On Failed — returns (Failed, fnErr). Permanent; no retry.
|
||||||
|
//
|
||||||
|
// On MaxWait timeout — returns (StillPending, ErrMaxWait). The
|
||||||
|
// upstream isn't done yet but the deadline exhausted. Scheduler
|
||||||
|
// can re-enqueue.
|
||||||
|
//
|
||||||
|
// On ctx cancel — returns (StillPending, ctx.Err()). Caller's
|
||||||
|
// deadline / shutdown signal won.
|
||||||
|
//
|
||||||
|
// On fn returning (StillPending, transientErr) — the err is logged
|
||||||
|
// by the closure (not by Poll), and Poll continues with the
|
||||||
|
// backoff schedule. The transient err is preserved as the last
|
||||||
|
// error in case MaxWait or ctx-cancel later fires.
|
||||||
|
func Poll(ctx context.Context, cfg Config, fn PollFunc) (Result, error) {
|
||||||
|
if cfg.MaxWait <= 0 {
|
||||||
|
cfg.MaxWait = DefaultMaxWait
|
||||||
|
}
|
||||||
|
if cfg.InitialWait <= 0 {
|
||||||
|
cfg.InitialWait = DefaultInitialWait
|
||||||
|
}
|
||||||
|
if cfg.MaxBackoff <= 0 {
|
||||||
|
cfg.MaxBackoff = DefaultMaxBackoff
|
||||||
|
}
|
||||||
|
if cfg.JitterPct < 0 {
|
||||||
|
cfg.JitterPct = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
deadline := time.Now().Add(cfg.MaxWait)
|
||||||
|
wait := cfg.InitialWait
|
||||||
|
var lastErr error
|
||||||
|
|
||||||
|
for {
|
||||||
|
result, err := fn(ctx)
|
||||||
|
switch result {
|
||||||
|
case Done:
|
||||||
|
return Done, nil
|
||||||
|
case Failed:
|
||||||
|
return Failed, err
|
||||||
|
case StillPending:
|
||||||
|
lastErr = err // may be nil (clean keep-polling) or a transient err
|
||||||
|
default:
|
||||||
|
return Failed, fmt.Errorf("asyncpoll: PollFunc returned unknown Result %d", result)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compute the next wait with jitter. wait is the cumulative
|
||||||
|
// backoff base; jittered is what actually sleeps.
|
||||||
|
jittered := jitterDuration(wait, cfg.JitterPct)
|
||||||
|
|
||||||
|
// If the next wait would push us past the deadline, return
|
||||||
|
// StillPending now rather than sleeping uselessly.
|
||||||
|
now := time.Now()
|
||||||
|
remaining := deadline.Sub(now)
|
||||||
|
if remaining <= 0 {
|
||||||
|
if lastErr != nil {
|
||||||
|
return StillPending, fmt.Errorf("%w (last err: %v)", ErrMaxWait, lastErr)
|
||||||
|
}
|
||||||
|
return StillPending, ErrMaxWait
|
||||||
|
}
|
||||||
|
if jittered > remaining {
|
||||||
|
jittered = remaining
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sleep, but respect ctx cancellation.
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
if lastErr != nil {
|
||||||
|
return StillPending, fmt.Errorf("%w (last err: %v)", ctx.Err(), lastErr)
|
||||||
|
}
|
||||||
|
return StillPending, ctx.Err()
|
||||||
|
case <-time.After(jittered):
|
||||||
|
}
|
||||||
|
|
||||||
|
// Multiplicative backoff (3×) capped at MaxBackoff.
|
||||||
|
wait *= 3
|
||||||
|
if wait > cfg.MaxBackoff {
|
||||||
|
wait = cfg.MaxBackoff
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// jitterDuration applies ±pct jitter to base. Returned duration is
|
||||||
|
// always positive (a base of 0 returns 0 regardless of pct).
|
||||||
|
//
|
||||||
|
// Visible for testing — the test asserts the bounded envelope rather
|
||||||
|
// than the exact value.
|
||||||
|
func jitterDuration(base time.Duration, pct float64) time.Duration {
|
||||||
|
if base <= 0 || pct <= 0 {
|
||||||
|
return base
|
||||||
|
}
|
||||||
|
// rand/v2's Float64 returns [0, 1); we want [-pct, +pct].
|
||||||
|
delta := (rand.Float64()*2 - 1) * pct
|
||||||
|
jittered := time.Duration(float64(base) * (1 + delta))
|
||||||
|
if jittered < 0 {
|
||||||
|
jittered = 0
|
||||||
|
}
|
||||||
|
return jittered
|
||||||
|
}
|
||||||
@@ -0,0 +1,276 @@
|
|||||||
|
// Copyright (c) certctl
|
||||||
|
// SPDX-License-Identifier: BSL-1.1
|
||||||
|
|
||||||
|
package asyncpoll
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestPoll_DoneOnFirstAttempt asserts the trivial happy path: fn
|
||||||
|
// returns Done immediately, Poll returns Done with no waiting.
|
||||||
|
func TestPoll_DoneOnFirstAttempt(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
calls := atomic.Int64{}
|
||||||
|
start := time.Now()
|
||||||
|
res, err := Poll(context.Background(), Config{InitialWait: 100 * time.Millisecond, JitterPct: 0}, func(ctx context.Context) (Result, error) {
|
||||||
|
calls.Add(1)
|
||||||
|
return Done, nil
|
||||||
|
})
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Poll: unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
if res != Done {
|
||||||
|
t.Fatalf("Poll: want Done, got %d", res)
|
||||||
|
}
|
||||||
|
if calls.Load() != 1 {
|
||||||
|
t.Errorf("Poll: want 1 fn call, got %d", calls.Load())
|
||||||
|
}
|
||||||
|
if elapsed > 50*time.Millisecond {
|
||||||
|
t.Errorf("Poll: should not have waited, elapsed=%v", elapsed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestPoll_DoneAfterPending asserts the standard async-CA shape:
|
||||||
|
// first 2 calls return StillPending, third returns Done. Poll waits
|
||||||
|
// the configured backoff between calls.
|
||||||
|
func TestPoll_DoneAfterPending(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
calls := atomic.Int64{}
|
||||||
|
res, err := Poll(context.Background(), Config{
|
||||||
|
InitialWait: 10 * time.Millisecond,
|
||||||
|
MaxBackoff: 50 * time.Millisecond,
|
||||||
|
MaxWait: 1 * time.Second,
|
||||||
|
JitterPct: 0, // deterministic for assertion
|
||||||
|
}, func(ctx context.Context) (Result, error) {
|
||||||
|
n := calls.Add(1)
|
||||||
|
if n < 3 {
|
||||||
|
return StillPending, nil
|
||||||
|
}
|
||||||
|
return Done, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Poll: unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
if res != Done {
|
||||||
|
t.Fatalf("Poll: want Done, got %d", res)
|
||||||
|
}
|
||||||
|
if calls.Load() != 3 {
|
||||||
|
t.Errorf("Poll: want 3 fn calls, got %d", calls.Load())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestPoll_FailedTerminatesImmediately — Failed is permanent; Poll
|
||||||
|
// returns the err and stops polling immediately.
|
||||||
|
func TestPoll_FailedTerminatesImmediately(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
calls := atomic.Int64{}
|
||||||
|
sentinel := errors.New("permanent: order rejected")
|
||||||
|
res, err := Poll(context.Background(), Config{InitialWait: 100 * time.Millisecond, JitterPct: 0}, func(ctx context.Context) (Result, error) {
|
||||||
|
calls.Add(1)
|
||||||
|
return Failed, sentinel
|
||||||
|
})
|
||||||
|
if !errors.Is(err, sentinel) {
|
||||||
|
t.Errorf("Poll: want sentinel, got %v", err)
|
||||||
|
}
|
||||||
|
if res != Failed {
|
||||||
|
t.Fatalf("Poll: want Failed, got %d", res)
|
||||||
|
}
|
||||||
|
if calls.Load() != 1 {
|
||||||
|
t.Errorf("Poll: Failed must terminate on first call, got %d", calls.Load())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestPoll_TransientErrKeepPolling — fn returns (StillPending, err)
|
||||||
|
// for transient HTTP errors; Poll continues until Done.
|
||||||
|
func TestPoll_TransientErrKeepPolling(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
calls := atomic.Int64{}
|
||||||
|
res, err := Poll(context.Background(), Config{
|
||||||
|
InitialWait: 5 * time.Millisecond,
|
||||||
|
MaxBackoff: 20 * time.Millisecond,
|
||||||
|
MaxWait: 1 * time.Second,
|
||||||
|
JitterPct: 0,
|
||||||
|
}, func(ctx context.Context) (Result, error) {
|
||||||
|
n := calls.Add(1)
|
||||||
|
if n < 3 {
|
||||||
|
return StillPending, fmt.Errorf("transient 503 attempt %d", n)
|
||||||
|
}
|
||||||
|
return Done, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Poll: transient errs should be swallowed on Done, got: %v", err)
|
||||||
|
}
|
||||||
|
if res != Done {
|
||||||
|
t.Fatalf("Poll: want Done, got %d", res)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestPoll_MaxWaitTimeout — fn never returns Done; Poll respects
|
||||||
|
// MaxWait and returns (StillPending, ErrMaxWait).
|
||||||
|
func TestPoll_MaxWaitTimeout(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
calls := atomic.Int64{}
|
||||||
|
res, err := Poll(context.Background(), Config{
|
||||||
|
InitialWait: 5 * time.Millisecond,
|
||||||
|
MaxBackoff: 10 * time.Millisecond,
|
||||||
|
MaxWait: 50 * time.Millisecond,
|
||||||
|
JitterPct: 0,
|
||||||
|
}, func(ctx context.Context) (Result, error) {
|
||||||
|
calls.Add(1)
|
||||||
|
return StillPending, nil
|
||||||
|
})
|
||||||
|
if !errors.Is(err, ErrMaxWait) {
|
||||||
|
t.Errorf("Poll: want ErrMaxWait, got %v", err)
|
||||||
|
}
|
||||||
|
if res != StillPending {
|
||||||
|
t.Fatalf("Poll: want StillPending, got %d", res)
|
||||||
|
}
|
||||||
|
if calls.Load() < 2 {
|
||||||
|
t.Errorf("Poll: should have called fn at least twice in 50ms, got %d", calls.Load())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestPoll_MaxWaitWithLastErr — when MaxWait fires AND the last
|
||||||
|
// fn call returned a transient err, the err chain wraps both signals
|
||||||
|
// so operators can see "we exhausted the deadline AND the last
|
||||||
|
// upstream attempt was a 503."
|
||||||
|
func TestPoll_MaxWaitWithLastErr(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
transient := errors.New("transient 503")
|
||||||
|
res, err := Poll(context.Background(), Config{
|
||||||
|
InitialWait: 5 * time.Millisecond,
|
||||||
|
MaxWait: 30 * time.Millisecond,
|
||||||
|
JitterPct: 0,
|
||||||
|
}, func(ctx context.Context) (Result, error) {
|
||||||
|
return StillPending, transient
|
||||||
|
})
|
||||||
|
if !errors.Is(err, ErrMaxWait) {
|
||||||
|
t.Errorf("Poll: want ErrMaxWait in chain, got %v", err)
|
||||||
|
}
|
||||||
|
if res != StillPending {
|
||||||
|
t.Errorf("Poll: want StillPending, got %d", res)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestPoll_ContextCancelPropagated — caller cancels ctx mid-poll;
|
||||||
|
// Poll returns (StillPending, ctx.Err()).
|
||||||
|
func TestPoll_ContextCancelPropagated(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
time.Sleep(20 * time.Millisecond)
|
||||||
|
cancel()
|
||||||
|
}()
|
||||||
|
|
||||||
|
res, err := Poll(ctx, Config{
|
||||||
|
InitialWait: 5 * time.Millisecond,
|
||||||
|
MaxWait: 5 * time.Second, // far past the cancel
|
||||||
|
JitterPct: 0,
|
||||||
|
}, func(ctx context.Context) (Result, error) {
|
||||||
|
return StillPending, nil
|
||||||
|
})
|
||||||
|
if !errors.Is(err, context.Canceled) {
|
||||||
|
t.Errorf("Poll: want context.Canceled, got %v", err)
|
||||||
|
}
|
||||||
|
if res != StillPending {
|
||||||
|
t.Errorf("Poll: want StillPending, got %d", res)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestPoll_BackoffMultiplicative — assert the backoff grows
|
||||||
|
// multiplicatively (3× per iteration, capped). We measure the
|
||||||
|
// elapsed wall-clock between fn calls.
|
||||||
|
func TestPoll_BackoffMultiplicative(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
var prevCall time.Time
|
||||||
|
gaps := []time.Duration{}
|
||||||
|
calls := atomic.Int64{}
|
||||||
|
|
||||||
|
_, _ = Poll(context.Background(), Config{
|
||||||
|
InitialWait: 10 * time.Millisecond,
|
||||||
|
MaxBackoff: 200 * time.Millisecond,
|
||||||
|
MaxWait: 1 * time.Second,
|
||||||
|
JitterPct: 0,
|
||||||
|
}, func(ctx context.Context) (Result, error) {
|
||||||
|
now := time.Now()
|
||||||
|
if !prevCall.IsZero() {
|
||||||
|
gaps = append(gaps, now.Sub(prevCall))
|
||||||
|
}
|
||||||
|
prevCall = now
|
||||||
|
if calls.Add(1) >= 4 {
|
||||||
|
return Done, nil
|
||||||
|
}
|
||||||
|
return StillPending, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if len(gaps) < 3 {
|
||||||
|
t.Fatalf("expected at least 3 gaps, got %d", len(gaps))
|
||||||
|
}
|
||||||
|
// First gap ~= 10ms, second ~= 30ms, third ~= 90ms (3×).
|
||||||
|
// Tolerate +/- a millisecond or two for scheduler noise.
|
||||||
|
if gaps[0] < 8*time.Millisecond || gaps[0] > 20*time.Millisecond {
|
||||||
|
t.Errorf("gap[0] (initial): want ~10ms, got %v", gaps[0])
|
||||||
|
}
|
||||||
|
if gaps[1] < 25*time.Millisecond || gaps[1] > 45*time.Millisecond {
|
||||||
|
t.Errorf("gap[1] (3×): want ~30ms, got %v", gaps[1])
|
||||||
|
}
|
||||||
|
if gaps[2] < 80*time.Millisecond || gaps[2] > 110*time.Millisecond {
|
||||||
|
t.Errorf("gap[2] (9×): want ~90ms, got %v", gaps[2])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestJitterDuration_Bounds — jitter envelope must stay within
|
||||||
|
// [base*(1-pct), base*(1+pct)]. Run many iterations; if any falls
|
||||||
|
// outside, the test fails. (Statistical test — false-positive rate
|
||||||
|
// is ~0 for the chosen seed pattern of crypto/rand-backed math/rand/v2.)
|
||||||
|
func TestJitterDuration_Bounds(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
base := 100 * time.Millisecond
|
||||||
|
pct := 0.2
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
got := jitterDuration(base, pct)
|
||||||
|
min := time.Duration(float64(base) * (1 - pct))
|
||||||
|
max := time.Duration(float64(base) * (1 + pct))
|
||||||
|
if got < min || got > max {
|
||||||
|
t.Errorf("iter %d: jitter %v outside [%v, %v]", i, got, min, max)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestJitterDuration_PctZero — pct=0 returns base unchanged
|
||||||
|
// (deterministic mode for tests).
|
||||||
|
func TestJitterDuration_PctZero(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
base := 100 * time.Millisecond
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
got := jitterDuration(base, 0)
|
||||||
|
if got != base {
|
||||||
|
t.Errorf("iter %d: pct=0 should return base, got %v", i, got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestPoll_DefaultsApplied — zero-value Config falls back to package
|
||||||
|
// defaults; Poll runs without panic.
|
||||||
|
func TestPoll_DefaultsApplied(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
// MaxWait will be 10m (the default); we Done immediately so the
|
||||||
|
// test runs in microseconds regardless.
|
||||||
|
res, err := Poll(context.Background(), Config{}, func(ctx context.Context) (Result, error) {
|
||||||
|
return Done, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Poll with defaults: unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
if res != Done {
|
||||||
|
t.Errorf("Poll with defaults: want Done, got %d", res)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -38,6 +38,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/shankar0123/certctl/internal/connector/issuer"
|
"github.com/shankar0123/certctl/internal/connector/issuer"
|
||||||
|
"github.com/shankar0123/certctl/internal/connector/issuer/asyncpoll"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Config represents the DigiCert CertCentral issuer connector configuration.
|
// Config represents the DigiCert CertCentral issuer connector configuration.
|
||||||
@@ -59,6 +60,25 @@ type Config struct {
|
|||||||
// Default: "https://www.digicert.com/services/v2".
|
// Default: "https://www.digicert.com/services/v2".
|
||||||
// Set via CERTCTL_DIGICERT_BASE_URL environment variable.
|
// Set via CERTCTL_DIGICERT_BASE_URL environment variable.
|
||||||
BaseURL string `json:"base_url"`
|
BaseURL string `json:"base_url"`
|
||||||
|
|
||||||
|
// PollMaxWaitSeconds caps how long GetOrderStatus blocks doing
|
||||||
|
// internal exponential-backoff polling before returning
|
||||||
|
// StillPending to the caller. Default 600 (10 minutes); 0 falls
|
||||||
|
// back to the asyncpoll package default. Bound only on the
|
||||||
|
// per-call wall-clock; the caller (scheduler) can re-invoke on
|
||||||
|
// the next tick if its policy allows.
|
||||||
|
//
|
||||||
|
// Set via CERTCTL_DIGICERT_POLL_MAX_WAIT_SECONDS. Audit fix #5.
|
||||||
|
PollMaxWaitSeconds int `json:"poll_max_wait_seconds,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// pollMaxWait returns the configured PollMaxWait as a time.Duration,
|
||||||
|
// or the asyncpoll package default if unset.
|
||||||
|
func (c *Config) pollMaxWait() time.Duration {
|
||||||
|
if c.PollMaxWaitSeconds <= 0 {
|
||||||
|
return asyncpoll.DefaultMaxWait
|
||||||
|
}
|
||||||
|
return time.Duration(c.PollMaxWaitSeconds) * time.Second
|
||||||
}
|
}
|
||||||
|
|
||||||
// Connector implements the issuer.Connector interface for DigiCert CertCentral.
|
// Connector implements the issuer.Connector interface for DigiCert CertCentral.
|
||||||
@@ -328,57 +348,124 @@ func (c *Connector) RevokeCertificate(ctx context.Context, request issuer.Revoca
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetOrderStatus checks the status of a DigiCert certificate order.
|
// GetOrderStatus checks the status of a DigiCert certificate order
|
||||||
// If the order is "issued", downloads the certificate and returns it.
|
// using bounded internal polling (asyncpoll.Poll). One call blocks
|
||||||
// If still "pending", returns pending status for continued polling.
|
// for up to PollMaxWait (default 10m) doing exponential backoff;
|
||||||
|
// returns Done with the cert, Failed with the rejection reason, or
|
||||||
|
// StillPending if the deadline expires (caller can re-invoke).
|
||||||
|
//
|
||||||
|
// Audit fix #5: previously this method made one HTTP call per
|
||||||
|
// scheduler tick. Under load that pile-drives the upstream rate
|
||||||
|
// limit. asyncpoll wraps the one-shot logic with backoff + jitter.
|
||||||
func (c *Connector) GetOrderStatus(ctx context.Context, orderID string) (*issuer.OrderStatus, error) {
|
func (c *Connector) GetOrderStatus(ctx context.Context, orderID string) (*issuer.OrderStatus, error) {
|
||||||
c.logger.Debug("checking DigiCert order status", "order_id", orderID)
|
c.logger.Debug("checking DigiCert order status", "order_id", orderID)
|
||||||
|
|
||||||
|
// Closure-scoped accumulators — Poll passes back only the Result;
|
||||||
|
// the cert / pending message land here for the wrapper to return.
|
||||||
|
var done *issuer.OrderStatus
|
||||||
|
var lastPendingMsg string
|
||||||
|
|
||||||
|
cfg := asyncpoll.Config{MaxWait: c.config.pollMaxWait()}
|
||||||
|
|
||||||
|
res, err := asyncpoll.Poll(ctx, cfg, func(ctx context.Context) (asyncpoll.Result, error) {
|
||||||
|
status, result, pollErr := c.pollOrderOnce(ctx, orderID)
|
||||||
|
if status != nil {
|
||||||
|
switch result {
|
||||||
|
case asyncpoll.Done:
|
||||||
|
done = status
|
||||||
|
case asyncpoll.StillPending:
|
||||||
|
if status.Message != nil {
|
||||||
|
lastPendingMsg = *status.Message
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result, pollErr
|
||||||
|
})
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
switch res {
|
||||||
|
case asyncpoll.Done:
|
||||||
|
return done, nil
|
||||||
|
case asyncpoll.Failed:
|
||||||
|
// Permanent failure — surface the upstream's error to the
|
||||||
|
// caller so handler middleware / scheduler can mark the job
|
||||||
|
// failed with the actual reason.
|
||||||
|
return nil, err
|
||||||
|
default: // StillPending — MaxWait or ctx cancel
|
||||||
|
msg := lastPendingMsg
|
||||||
|
if msg == "" {
|
||||||
|
msg = fmt.Sprintf("order %s still pending after PollMaxWait", orderID)
|
||||||
|
}
|
||||||
|
return &issuer.OrderStatus{
|
||||||
|
OrderID: orderID,
|
||||||
|
Status: "pending",
|
||||||
|
Message: &msg,
|
||||||
|
UpdatedAt: now,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// pollOrderOnce makes one HTTP GET against the DigiCert order-status
|
||||||
|
// endpoint and translates the response into an asyncpoll.Result plus
|
||||||
|
// (when applicable) a populated OrderStatus. Used by GetOrderStatus
|
||||||
|
// as the per-iteration closure for asyncpoll.Poll.
|
||||||
|
func (c *Connector) pollOrderOnce(ctx context.Context, orderID string) (*issuer.OrderStatus, asyncpoll.Result, error) {
|
||||||
statusURL := fmt.Sprintf("%s/order/certificate/%s", c.config.BaseURL, orderID)
|
statusURL := fmt.Sprintf("%s/order/certificate/%s", c.config.BaseURL, orderID)
|
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, statusURL, nil)
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, statusURL, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create status request: %w", err)
|
return nil, asyncpoll.Failed, fmt.Errorf("failed to create status request: %w", err)
|
||||||
}
|
}
|
||||||
req.Header.Set("X-DC-DEVKEY", c.config.APIKey)
|
req.Header.Set("X-DC-DEVKEY", c.config.APIKey)
|
||||||
req.Header.Set("Content-Type", "application/json")
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
|
||||||
resp, err := c.httpClient.Do(req)
|
resp, err := c.httpClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("DigiCert status request failed: %w", err)
|
// Transient network error — keep polling. Caller's MaxWait
|
||||||
|
// will eventually fire if it persists.
|
||||||
|
return nil, asyncpoll.StillPending, fmt.Errorf("DigiCert status request failed: %w", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
respBody, err := io.ReadAll(resp.Body)
|
respBody, err := io.ReadAll(resp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to read status response: %w", err)
|
return nil, asyncpoll.StillPending, fmt.Errorf("failed to read status response: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Status-code triage:
|
||||||
|
// 2xx → fall through to body parse below.
|
||||||
|
// 429 → StillPending (rate limited; retry with backoff).
|
||||||
|
// 5xx → StillPending (upstream unhealthy; transient).
|
||||||
|
// other 4xx → Failed (permanent client error: 400 bad
|
||||||
|
// request, 401 auth, 403 forbidden, 404 order
|
||||||
|
// doesn't exist). No amount of polling fixes
|
||||||
|
// these.
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
return nil, fmt.Errorf("DigiCert order status returned %d: %s", resp.StatusCode, string(respBody))
|
err := fmt.Errorf("DigiCert order status returned %d: %s", resp.StatusCode, string(respBody))
|
||||||
|
if resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode >= 500 {
|
||||||
|
return nil, asyncpoll.StillPending, err
|
||||||
|
}
|
||||||
|
return nil, asyncpoll.Failed, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var statusResp orderStatusResponse
|
var statusResp orderStatusResponse
|
||||||
if err := json.Unmarshal(respBody, &statusResp); err != nil {
|
if err := json.Unmarshal(respBody, &statusResp); err != nil {
|
||||||
return nil, fmt.Errorf("failed to parse status response: %w", err)
|
// Parse errors are permanent — the upstream's response shape
|
||||||
|
// changed or the body is corrupted. Retrying produces the
|
||||||
|
// same parse error.
|
||||||
|
return nil, asyncpoll.Failed, fmt.Errorf("failed to parse status response: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
switch statusResp.Status {
|
switch statusResp.Status {
|
||||||
case "issued":
|
case "issued":
|
||||||
if statusResp.Certificate.ID == 0 {
|
if statusResp.Certificate.ID == 0 {
|
||||||
return nil, fmt.Errorf("order is issued but certificate_id is missing")
|
return nil, asyncpoll.Failed, fmt.Errorf("order is issued but certificate_id is missing")
|
||||||
}
|
}
|
||||||
|
|
||||||
certPEM, chainPEM, serial, notBefore, notAfter, err := c.downloadCertificate(ctx, statusResp.Certificate.ID)
|
certPEM, chainPEM, serial, notBefore, notAfter, err := c.downloadCertificate(ctx, statusResp.Certificate.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to download certificate: %w", err)
|
return nil, asyncpoll.Failed, fmt.Errorf("failed to download certificate: %w", err)
|
||||||
}
|
}
|
||||||
|
c.logger.Info("DigiCert order completed", "order_id", orderID, "serial", serial)
|
||||||
c.logger.Info("DigiCert order completed",
|
|
||||||
"order_id", orderID,
|
|
||||||
"serial", serial)
|
|
||||||
|
|
||||||
return &issuer.OrderStatus{
|
return &issuer.OrderStatus{
|
||||||
OrderID: orderID,
|
OrderID: orderID,
|
||||||
Status: "completed",
|
Status: "completed",
|
||||||
@@ -388,7 +475,7 @@ func (c *Connector) GetOrderStatus(ctx context.Context, orderID string) (*issuer
|
|||||||
NotBefore: ¬Before,
|
NotBefore: ¬Before,
|
||||||
NotAfter: ¬After,
|
NotAfter: ¬After,
|
||||||
UpdatedAt: now,
|
UpdatedAt: now,
|
||||||
}, nil
|
}, asyncpoll.Done, nil
|
||||||
|
|
||||||
case "pending", "processing":
|
case "pending", "processing":
|
||||||
msg := fmt.Sprintf("order %s is %s", orderID, statusResp.Status)
|
msg := fmt.Sprintf("order %s is %s", orderID, statusResp.Status)
|
||||||
@@ -397,16 +484,20 @@ func (c *Connector) GetOrderStatus(ctx context.Context, orderID string) (*issuer
|
|||||||
Status: "pending",
|
Status: "pending",
|
||||||
Message: &msg,
|
Message: &msg,
|
||||||
UpdatedAt: now,
|
UpdatedAt: now,
|
||||||
}, nil
|
}, asyncpoll.StillPending, nil
|
||||||
|
|
||||||
case "rejected", "denied":
|
case "rejected", "denied":
|
||||||
|
// Completed-with-negative-answer. NOT a transient failure
|
||||||
|
// (the order won't un-reject itself), but also not a
|
||||||
|
// caller-facing Go error — wrap in OrderStatus{Status:"failed"}
|
||||||
|
// so the scheduler sees a definitive completion.
|
||||||
msg := fmt.Sprintf("order %s was %s", orderID, statusResp.Status)
|
msg := fmt.Sprintf("order %s was %s", orderID, statusResp.Status)
|
||||||
return &issuer.OrderStatus{
|
return &issuer.OrderStatus{
|
||||||
OrderID: orderID,
|
OrderID: orderID,
|
||||||
Status: "failed",
|
Status: "failed",
|
||||||
Message: &msg,
|
Message: &msg,
|
||||||
UpdatedAt: now,
|
UpdatedAt: now,
|
||||||
}, nil
|
}, asyncpoll.Done, nil
|
||||||
|
|
||||||
default:
|
default:
|
||||||
msg := fmt.Sprintf("unknown order status: %s", statusResp.Status)
|
msg := fmt.Sprintf("unknown order status: %s", statusResp.Status)
|
||||||
@@ -415,7 +506,7 @@ func (c *Connector) GetOrderStatus(ctx context.Context, orderID string) (*issuer
|
|||||||
Status: "pending",
|
Status: "pending",
|
||||||
Message: &msg,
|
Message: &msg,
|
||||||
UpdatedAt: now,
|
UpdatedAt: now,
|
||||||
}, nil
|
}, asyncpoll.StillPending, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -19,7 +19,11 @@ import (
|
|||||||
func buildDigicertConnector(t *testing.T, baseURL string) *digicert.Connector {
|
func buildDigicertConnector(t *testing.T, baseURL string) *digicert.Connector {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
c := digicert.New(nil, slog.Default())
|
c := digicert.New(nil, slog.Default())
|
||||||
cfg := digicert.Config{APIKey: "k", OrgID: "1", ProductType: "ssl_basic", BaseURL: baseURL}
|
// PollMaxWaitSeconds=1 keeps async-pending tests fast — pending
|
||||||
|
// status returns within ~1s instead of the 10-minute production
|
||||||
|
// default. Tests that exercise issued/failed/parse-error paths
|
||||||
|
// don't block on the wait.
|
||||||
|
cfg := digicert.Config{APIKey: "k", OrgID: "1", ProductType: "ssl_basic", BaseURL: baseURL, PollMaxWaitSeconds: 1}
|
||||||
raw, _ := json.Marshal(cfg)
|
raw, _ := json.Marshal(cfg)
|
||||||
if err := c.ValidateConfig(context.Background(), raw); err != nil {
|
if err := c.ValidateConfig(context.Background(), raw); err != nil {
|
||||||
t.Fatalf("ValidateConfig: %v", err)
|
t.Fatalf("ValidateConfig: %v", err)
|
||||||
|
|||||||
@@ -289,9 +289,10 @@ func TestDigiCertConnector(t *testing.T) {
|
|||||||
defer srv.Close()
|
defer srv.Close()
|
||||||
|
|
||||||
config := &digicert.Config{
|
config := &digicert.Config{
|
||||||
APIKey: "dc-test-key",
|
APIKey: "dc-test-key",
|
||||||
OrgID: "12345",
|
OrgID: "12345",
|
||||||
BaseURL: srv.URL,
|
BaseURL: srv.URL,
|
||||||
|
PollMaxWaitSeconds: 1, // keep async-pending tests fast
|
||||||
}
|
}
|
||||||
connector := digicert.New(config, logger)
|
connector := digicert.New(config, logger)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user