Skip to content

storage

import "github.com/open-outbox/relay/internal/storage"

Package storage provides the persistence layer implementations for the Outbox Relay.

It defines the logic for interacting with various databases to manage the lifecycle of outbox events. The core responsibilities of this package include:

  • Claiming batches of pending events using row-level locking (e.g., FOR UPDATE SKIP LOCKED).
  • Updating event status after successful or failed delivery.
  • Reaping “stuck” events whose processing leases have expired.
  • Providing metrics and health statistics about the outbox table.

The primary implementation is PostgreSQL, which leverages pgx for high-performance connection pooling and advanced SQL features like UNNEST for batch updates.

All storage implementations must satisfy the relay.Storage interface to ensure pluggability within the relay engine.

func ValidateTableName(name string) error

ValidateTableName ensures the name is safe for SQL injection and fits standard DB naming conventions.

Instrumented decorates a storage implementation with observability features. It records metrics, traces, and logs for all underlying storage operations.

type Instrumented struct {
// contains filtered or unexported fields
}
func NewInstrumented(s relay.Storage, tel telemetry.Telemetry, relayID string) *Instrumented

NewInstrumented initializes the storage decorator with the required telemetry providers. It scopes the logger to “storage” and preserves the relay identity for metric attribution.

func (i *Instrumented) ClaimBatch(ctx context.Context, batchSize int, buffer []relay.Event) ([]relay.Event, error)

ClaimBatch wraps the storage call to identify and lock events for processing. It records request latency and the actual number of events retrieved for observability.

func (i *Instrumented) Close(ctx context.Context) error

Close gracefully shuts down the underlying storage.

func (i *Instrumented) GetStats(ctx context.Context) (relay.Stats, error)

GetStats retrieves current backlog metrics and event counts from storage. It instruments the request to ensure health-monitoring queries remain performant.

func (i *Instrumented) MarkDeliveredBatch(ctx context.Context, ids []uuid.UUID) (int64, error)

MarkDeliveredBatch transitions a batch of events to the final delivered state. It records completion latency and result status to track storage reliability.

func (i *Instrumented) MarkFailedBatch(ctx context.Context, failures []relay.FailedEvent) (int64, error)

MarkFailedBatch updates events with failure details and schedules retries. It tracks the storage latency and records whether the update was successful.

func (i *Instrumented) Ping(ctx context.Context) error

Ping verifies the connectivity and health of the underlying storage system.

func (i *Instrumented) Prune(ctx context.Context, opts relay.PruneOptions) (relay.PruneResult, error)

Prune removes processed and failed events from storage based on the provided retention policy. It tracks the operation’s duration and success rate to monitor database maintenance health.

func (i *Instrumented) ReapExpiredLeases(ctx context.Context, leaseTimeout time.Duration, limit int) (int64, error)

ReapExpiredLeases recovers events stuck in ‘DELIVERING’ status due to worker failure. It instruments the cleanup process to monitor the health of the self-healing mechanism, and records the count of recovered events to monitor relay health and worker stability.

Postgres is a PostgreSQL-backed implementation of the relay.Storage interface. It uses the jackc/pgx/v5 library for efficient connection pooling and PostgreSQL-specific optimizations.

type Postgres struct {
// contains filtered or unexported fields
}
func NewPostgres(pool *pgxpool.Pool, tableName string, relayID string, tel telemetry.Telemetry) (*Postgres, error)

NewPostgres creates a new Postgres storage instance using the provided pgx connection pool.

func (p *Postgres) ClaimBatch(ctx context.Context, batchSize int, buf []relay.Event) ([]relay.Event, error)

ClaimBatch atomically selects and locks a batch of pending events for the current relay instance.

It uses a Common Table Expression (CTE) with ‘FOR UPDATE SKIP LOCKED’ to ensure that: 1. Multiple relay instances can process the table concurrently without colliding. 2. Only events that are PENDING and past their available_at time are selected. 3. Selected events are immediately marked as DELIVERING to “lease” them to this instance.

func (p *Postgres) Close(_ context.Context) error

Close gracefully shuts down the underlying pgx connection pool.

func (p *Postgres) GetStats(ctx context.Context) (relay.Stats, error)

GetStats retrieves high-level metrics from the outbox table, such as the total number of pending events and the age of the oldest message in the queue.

func (p *Postgres) MarkDeliveredBatch(ctx context.Context, ids []uuid.UUID) (int64, error)

MarkDeliveredBatch transitions a set of events to the DELIVERED status. It verifies the lock against the current relay instance to prevent updates on events that may have been reaped or re-assigned.

func (p *Postgres) MarkFailedBatch(ctx context.Context, failures []relay.FailedEvent) (int64, error)

MarkFailedBatch updates multiple events that failed during publishing.

It uses the PostgreSQL UNNEST function to perform a single atomic batch update, which is significantly more efficient than individual UPDATE statements. It updates the status, increases the attempt count, and sets the next available_at time.

func (p *Postgres) Ping(ctx context.Context) error

Ping checks the health of the Postgres connection pool. It ensures that the database is reachable and accepting commands. This is used primarily by the health check endpoint for liveness probes.

func (p *Postgres) Prune(ctx context.Context, opts relay.PruneOptions) (relay.PruneResult, error)

Prune removes historical event data from the outbox based on the provided age thresholds.

It handles two types of cleanup:

  1. DELIVERED: Successfully processed events that are no longer needed for auditing.
  2. DEAD: Events that failed all retry attempts and have been quarantined.

If opts.DryRun is true, it performs a non-destructive count of the records that meet the criteria. Otherwise, it executes the deletions within a transaction to ensure consistency and returns the total number of rows removed.

func (p *Postgres) ReapExpiredLeases(ctx context.Context, leaseTimeout time.Duration, limit int) (int64, error)

ReapExpiredLeases identifies and resets events that have been stuck in the DELIVERING state for longer than the specified leaseTimeout.

This is a critical recovery mechanism. If a relay instance crashes while processing a batch, this function allows other instances to eventually pick up those “orphaned” events by moving them back to the PENDING state.