Skip to content

storage

import "github.com/open-outbox/relay/internal/storage"

Package storage provides the persistence layer implementations for the Outbox Relay.

It defines the logic for interacting with various databases to manage the lifecycle of outbox events. The core responsibilities of this package include:

  • Claiming batches of pending events using row-level locking (e.g., FOR UPDATE SKIP LOCKED).
  • Updating event status after successful or failed delivery.
  • Reaping “stuck” events whose processing leases have expired.
  • Providing metrics and health statistics about the outbox table.

The primary implementation is PostgreSQL, which leverages pgx for high-performance connection pooling and advanced SQL features like UNNEST for batch updates.

All storage implementations must satisfy the relay.Storage interface to ensure pluggability within the relay engine.

func ValidateTableName(name string) error

ValidateTableName ensures the name is safe for SQL injection and fits standard DB naming conventions.

Postgres is a PostgreSQL-backed implementation of the relay.Storage interface. It uses the jackc/pgx/v5 library for efficient connection pooling and PostgreSQL-specific optimizations.

type Postgres struct {
// contains filtered or unexported fields
}
func NewPostgres(pool *pgxpool.Pool, tableName string, logger *zap.Logger) (*Postgres, error)

NewPostgres creates a new Postgres storage instance using the provided pgx connection pool.

func (p *Postgres) ClaimBatch(ctx context.Context, relayID string, batchSize int, buf []relay.Event) ([]relay.Event, error)

ClaimBatch atomically selects and locks a batch of pending events for the current relay instance.

It uses a Common Table Expression (CTE) with ‘FOR UPDATE SKIP LOCKED’ to ensure that: 1. Multiple relay instances can process the table concurrently without colliding. 2. Only events that are PENDING and past their available_at time are selected. 3. Selected events are immediately marked as DELIVERING to “lease” them to this instance.

func (p *Postgres) Close(_ context.Context) error

Close gracefully shuts down the underlying pgx connection pool.

func (p *Postgres) GetStats(ctx context.Context) (relay.Stats, error)

GetStats retrieves high-level metrics from the outbox table, such as the total number of pending events and the age of the oldest message in the queue.

func (p *Postgres) MarkDeliveredBatch(ctx context.Context, ids []uuid.UUID, relayID string) error

MarkDeliveredBatch updates a set of events to the DELIVERED status. It requires the relayID to match the current lock to ensure that an instance doesn’t accidentally mark an event as delivered if it has already been reaped.

func (p *Postgres) MarkFailedBatch(ctx context.Context, failures []relay.FailedEvent, relayID string) error

MarkFailedBatch updates multiple events that failed during publishing.

It uses the PostgreSQL UNNEST function to perform a single atomic batch update, which is significantly more efficient than individual UPDATE statements. It updates the status, increases the attempt count, and sets the next available_at time.

func (p *Postgres) Ping(ctx context.Context) error

Ping checks the health of the Postgres connection pool. It ensures that the database is reachable and accepting commands. This is used primarily by the health check endpoint for liveness probes.

func (p *Postgres) Prune(ctx context.Context, opts relay.PruneOptions) (relay.PruneResult, error)

Prune removes historical event data from the outbox based on the provided age thresholds.

It handles two types of cleanup:

  1. DELIVERED: Successfully processed events that are no longer needed for auditing.
  2. DEAD: Events that failed all retry attempts and have been quarantined.

If opts.DryRun is true, it performs a non-destructive count of the records that meet the criteria. Otherwise, it executes the deletions within a transaction to ensure consistency and returns the total number of rows removed.

func (p *Postgres) ReapExpiredLeases(ctx context.Context, leaseTimeout time.Duration, limit int) (int64, error)

ReapExpiredLeases identifies and resets events that have been stuck in the DELIVERING state for longer than the specified leaseTimeout.

This is a critical recovery mechanism. If a relay instance crashes while processing a batch, this function allows other instances to eventually pick up those “orphaned” events by moving them back to the PENDING state.