storage
import "github.com/open-outbox/relay/internal/storage"Package storage provides the persistence layer implementations for the Outbox Relay.
It defines the logic for interacting with various databases to manage the lifecycle of outbox events. The core responsibilities of this package include:
- Claiming batches of pending events using row-level locking (e.g., FOR UPDATE SKIP LOCKED).
- Updating event status after successful or failed delivery.
- Reaping “stuck” events whose processing leases have expired.
- Providing metrics and health statistics about the outbox table.
The primary implementation is PostgreSQL, which leverages pgx for high-performance connection pooling and advanced SQL features like UNNEST for batch updates.
All storage implementations must satisfy the relay.Storage interface to ensure pluggability within the relay engine.
- func ValidateTableName(name string) error
- type Postgres
- func NewPostgres(pool *pgxpool.Pool, tableName string, logger *zap.Logger) (*Postgres, error)
- func (p *Postgres) ClaimBatch(ctx context.Context, relayID string, batchSize int, buf []relay.Event) ([]relay.Event, error)
- func (p *Postgres) Close(_ context.Context) error
- func (p *Postgres) GetStats(ctx context.Context) (relay.Stats, error)
- func (p *Postgres) MarkDeliveredBatch(ctx context.Context, ids []uuid.UUID, relayID string) error
- func (p *Postgres) MarkFailedBatch(ctx context.Context, failures []relay.FailedEvent, relayID string) error
- func (p *Postgres) Ping(ctx context.Context) error
- func (p *Postgres) Prune(ctx context.Context, opts relay.PruneOptions) (relay.PruneResult, error)
- func (p *Postgres) ReapExpiredLeases(ctx context.Context, leaseTimeout time.Duration, limit int) (int64, error)
func ValidateTableName
Section titled “func ValidateTableName”func ValidateTableName(name string) errorValidateTableName ensures the name is safe for SQL injection and fits standard DB naming conventions.
type Postgres
Section titled “type Postgres”Postgres is a PostgreSQL-backed implementation of the relay.Storage interface. It uses the jackc/pgx/v5 library for efficient connection pooling and PostgreSQL-specific optimizations.
type Postgres struct { // contains filtered or unexported fields}func NewPostgres
Section titled “func NewPostgres”func NewPostgres(pool *pgxpool.Pool, tableName string, logger *zap.Logger) (*Postgres, error)NewPostgres creates a new Postgres storage instance using the provided pgx connection pool.
func (*Postgres) ClaimBatch
Section titled “func (*Postgres) ClaimBatch”func (p *Postgres) ClaimBatch(ctx context.Context, relayID string, batchSize int, buf []relay.Event) ([]relay.Event, error)ClaimBatch atomically selects and locks a batch of pending events for the current relay instance.
It uses a Common Table Expression (CTE) with ‘FOR UPDATE SKIP LOCKED’ to ensure that: 1. Multiple relay instances can process the table concurrently without colliding. 2. Only events that are PENDING and past their available_at time are selected. 3. Selected events are immediately marked as DELIVERING to “lease” them to this instance.
func (*Postgres) Close
Section titled “func (*Postgres) Close”func (p *Postgres) Close(_ context.Context) errorClose gracefully shuts down the underlying pgx connection pool.
func (*Postgres) GetStats
Section titled “func (*Postgres) GetStats”func (p *Postgres) GetStats(ctx context.Context) (relay.Stats, error)GetStats retrieves high-level metrics from the outbox table, such as the total number of pending events and the age of the oldest message in the queue.
func (*Postgres) MarkDeliveredBatch
Section titled “func (*Postgres) MarkDeliveredBatch”func (p *Postgres) MarkDeliveredBatch(ctx context.Context, ids []uuid.UUID, relayID string) errorMarkDeliveredBatch updates a set of events to the DELIVERED status. It requires the relayID to match the current lock to ensure that an instance doesn’t accidentally mark an event as delivered if it has already been reaped.
func (*Postgres) MarkFailedBatch
Section titled “func (*Postgres) MarkFailedBatch”func (p *Postgres) MarkFailedBatch(ctx context.Context, failures []relay.FailedEvent, relayID string) errorMarkFailedBatch updates multiple events that failed during publishing.
It uses the PostgreSQL UNNEST function to perform a single atomic batch update, which is significantly more efficient than individual UPDATE statements. It updates the status, increases the attempt count, and sets the next available_at time.
func (*Postgres) Ping
Section titled “func (*Postgres) Ping”func (p *Postgres) Ping(ctx context.Context) errorPing checks the health of the Postgres connection pool. It ensures that the database is reachable and accepting commands. This is used primarily by the health check endpoint for liveness probes.
func (*Postgres) Prune
Section titled “func (*Postgres) Prune”func (p *Postgres) Prune(ctx context.Context, opts relay.PruneOptions) (relay.PruneResult, error)Prune removes historical event data from the outbox based on the provided age thresholds.
It handles two types of cleanup:
- DELIVERED: Successfully processed events that are no longer needed for auditing.
- DEAD: Events that failed all retry attempts and have been quarantined.
If opts.DryRun is true, it performs a non-destructive count of the records that meet the criteria. Otherwise, it executes the deletions within a transaction to ensure consistency and returns the total number of rows removed.
func (*Postgres) ReapExpiredLeases
Section titled “func (*Postgres) ReapExpiredLeases”func (p *Postgres) ReapExpiredLeases(ctx context.Context, leaseTimeout time.Duration, limit int) (int64, error)ReapExpiredLeases identifies and resets events that have been stuck in the DELIVERING state for longer than the specified leaseTimeout.
This is a critical recovery mechanism. If a relay instance crashes while processing a batch, this function allows other instances to eventually pick up those “orphaned” events by moving them back to the PENDING state.