mirror of
https://github.com/shankar0123/certctl.git
synced 2026-06-12 22:48:52 +00:00
Complete V1 scaffold
This commit is contained in:
@@ -0,0 +1,284 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/shankar0123/certctl/internal/domain"
|
||||
)
|
||||
|
||||
// JobRepository implements repository.JobRepository
|
||||
type JobRepository struct {
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
// NewJobRepository creates a new JobRepository
|
||||
func NewJobRepository(db *sql.DB) *JobRepository {
|
||||
return &JobRepository{db: db}
|
||||
}
|
||||
|
||||
// List returns all jobs
|
||||
func (r *JobRepository) List(ctx context.Context) ([]*domain.Job, error) {
|
||||
rows, err := r.db.QueryContext(ctx, `
|
||||
SELECT id, type, certificate_id, target_id, status, attempts, max_attempts,
|
||||
last_error, scheduled_at, started_at, completed_at, created_at
|
||||
FROM jobs
|
||||
ORDER BY created_at DESC
|
||||
`)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query jobs: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var jobs []*domain.Job
|
||||
for rows.Next() {
|
||||
job, err := scanJob(rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
jobs = append(jobs, job)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("error iterating job rows: %w", err)
|
||||
}
|
||||
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
// Get retrieves a job by ID
|
||||
func (r *JobRepository) Get(ctx context.Context, id string) (*domain.Job, error) {
|
||||
row := r.db.QueryRowContext(ctx, `
|
||||
SELECT id, type, certificate_id, target_id, status, attempts, max_attempts,
|
||||
last_error, scheduled_at, started_at, completed_at, created_at
|
||||
FROM jobs
|
||||
WHERE id = $1
|
||||
`, id)
|
||||
|
||||
job, err := scanJob(row)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, fmt.Errorf("job not found")
|
||||
}
|
||||
return nil, fmt.Errorf("failed to query job: %w", err)
|
||||
}
|
||||
|
||||
return job, nil
|
||||
}
|
||||
|
||||
// Create stores a new job
|
||||
func (r *JobRepository) Create(ctx context.Context, job *domain.Job) error {
|
||||
if job.ID == "" {
|
||||
job.ID = uuid.New().String()
|
||||
}
|
||||
|
||||
err := r.db.QueryRowContext(ctx, `
|
||||
INSERT INTO jobs (
|
||||
id, type, certificate_id, target_id, status, attempts, max_attempts,
|
||||
last_error, scheduled_at, started_at, completed_at, created_at
|
||||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
|
||||
RETURNING id
|
||||
`, job.ID, job.Type, job.CertificateID, job.TargetID, job.Status, job.Attempts,
|
||||
job.MaxAttempts, job.LastError, job.ScheduledAt, job.StartedAt, job.CompletedAt,
|
||||
job.CreatedAt).Scan(&job.ID)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create job: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update modifies an existing job
|
||||
func (r *JobRepository) Update(ctx context.Context, job *domain.Job) error {
|
||||
result, err := r.db.ExecContext(ctx, `
|
||||
UPDATE jobs SET
|
||||
type = $1,
|
||||
certificate_id = $2,
|
||||
target_id = $3,
|
||||
status = $4,
|
||||
attempts = $5,
|
||||
max_attempts = $6,
|
||||
last_error = $7,
|
||||
scheduled_at = $8,
|
||||
started_at = $9,
|
||||
completed_at = $10
|
||||
WHERE id = $11
|
||||
`, job.Type, job.CertificateID, job.TargetID, job.Status, job.Attempts,
|
||||
job.MaxAttempts, job.LastError, job.ScheduledAt, job.StartedAt,
|
||||
job.CompletedAt, job.ID)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update job: %w", err)
|
||||
}
|
||||
|
||||
rows, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get rows affected: %w", err)
|
||||
}
|
||||
|
||||
if rows == 0 {
|
||||
return fmt.Errorf("job not found")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete removes a job
|
||||
func (r *JobRepository) Delete(ctx context.Context, id string) error {
|
||||
result, err := r.db.ExecContext(ctx, "DELETE FROM jobs WHERE id = $1", id)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete job: %w", err)
|
||||
}
|
||||
|
||||
rows, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get rows affected: %w", err)
|
||||
}
|
||||
|
||||
if rows == 0 {
|
||||
return fmt.Errorf("job not found")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListByStatus returns jobs with a specific status
|
||||
func (r *JobRepository) ListByStatus(ctx context.Context, status domain.JobStatus) ([]*domain.Job, error) {
|
||||
rows, err := r.db.QueryContext(ctx, `
|
||||
SELECT id, type, certificate_id, target_id, status, attempts, max_attempts,
|
||||
last_error, scheduled_at, started_at, completed_at, created_at
|
||||
FROM jobs
|
||||
WHERE status = $1
|
||||
ORDER BY created_at DESC
|
||||
`, status)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query jobs by status: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var jobs []*domain.Job
|
||||
for rows.Next() {
|
||||
job, err := scanJob(rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
jobs = append(jobs, job)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("error iterating job rows: %w", err)
|
||||
}
|
||||
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
// ListByCertificate returns all jobs for a certificate
|
||||
func (r *JobRepository) ListByCertificate(ctx context.Context, certID string) ([]*domain.Job, error) {
|
||||
rows, err := r.db.QueryContext(ctx, `
|
||||
SELECT id, type, certificate_id, target_id, status, attempts, max_attempts,
|
||||
last_error, scheduled_at, started_at, completed_at, created_at
|
||||
FROM jobs
|
||||
WHERE certificate_id = $1
|
||||
ORDER BY created_at DESC
|
||||
`, certID)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query jobs for certificate: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var jobs []*domain.Job
|
||||
for rows.Next() {
|
||||
job, err := scanJob(rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
jobs = append(jobs, job)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("error iterating job rows: %w", err)
|
||||
}
|
||||
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
// UpdateStatus updates a job's status and optional error message
|
||||
func (r *JobRepository) UpdateStatus(ctx context.Context, id string, status domain.JobStatus, errMsg string) error {
|
||||
var lastError *string
|
||||
if errMsg != "" {
|
||||
lastError = &errMsg
|
||||
}
|
||||
|
||||
result, err := r.db.ExecContext(ctx, `
|
||||
UPDATE jobs SET status = $1, last_error = $2 WHERE id = $3
|
||||
`, status, lastError, id)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update job status: %w", err)
|
||||
}
|
||||
|
||||
rows, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get rows affected: %w", err)
|
||||
}
|
||||
|
||||
if rows == 0 {
|
||||
return fmt.Errorf("job not found")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetPendingJobs returns jobs not yet processed of a specific type
|
||||
func (r *JobRepository) GetPendingJobs(ctx context.Context, jobType domain.JobType) ([]*domain.Job, error) {
|
||||
rows, err := r.db.QueryContext(ctx, `
|
||||
SELECT id, type, certificate_id, target_id, status, attempts, max_attempts,
|
||||
last_error, scheduled_at, started_at, completed_at, created_at
|
||||
FROM jobs
|
||||
WHERE type = $1 AND status = $2
|
||||
ORDER BY scheduled_at ASC
|
||||
`, jobType, domain.JobStatusPending)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query pending jobs: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var jobs []*domain.Job
|
||||
for rows.Next() {
|
||||
job, err := scanJob(rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
jobs = append(jobs, job)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("error iterating job rows: %w", err)
|
||||
}
|
||||
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
// scanJob scans a job from a row or rows
|
||||
func scanJob(scanner interface {
|
||||
Scan(...interface{}) error
|
||||
}) (*domain.Job, error) {
|
||||
var job domain.Job
|
||||
err := scanner.Scan(&job.ID, &job.Type, &job.CertificateID, &job.TargetID,
|
||||
&job.Status, &job.Attempts, &job.MaxAttempts, &job.LastError,
|
||||
&job.ScheduledAt, &job.StartedAt, &job.CompletedAt, &job.CreatedAt)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to scan job: %w", err)
|
||||
}
|
||||
|
||||
return &job, nil
|
||||
}
|
||||
Reference in New Issue
Block a user