relay
import "github.com/open-outbox/relay/internal/relay"Package relay contains the core domain logic and orchestration for the Outbox Relay.
This package serves as the heart of the system, defining the fundamental interfaces and data models that allow the relay to remain agnostic of specific database or message broker implementations.
The primary component is the Engine, which coordinates the lifecycle of an event: claiming it from Storage, attempting delivery via a Publisher, and handling failures through a RetryPolicy.
Core Abstractions:
- Engine: Orchestrates the polling loop and background maintenance tasks.
- Storage: Defines how events are persisted, claimed, and updated in the database.
- Publisher: Defines how events are delivered to external systems.
- Event: The central data structure carrying the payload and delivery metadata.
By adhering to the Transactional Outbox pattern, this package ensures reliable message delivery with at-least-once guarantees.
- Variables
- func CreateNoopTelemetry() (telemetry.Telemetry, error)
- type Engine
- type EngineParams
- type Event
- type EventStatus
- type ExponentialBackoff
- type FailedEvent
- type MockPublisher
- type MockStorage
- func (m *MockStorage) ClaimBatch(ctx context.Context, relayID string, size int, buffer []Event) ([]Event, error)
- func (m *MockStorage) Close(ctx context.Context) error
- func (m *MockStorage) GetStats(ctx context.Context) (Stats, error)
- func (m *MockStorage) MarkDeliveredBatch(ctx context.Context, ids []uuid.UUID, relayID string) error
- func (m *MockStorage) MarkFailedBatch(ctx context.Context, failed []FailedEvent, relayID string) error
- func (m *MockStorage) Ping(ctx context.Context) error
- func (m *MockStorage) Prune(ctx context.Context, opts PruneOptions) (PruneResult, error)
- func (m *MockStorage) ReapExpiredLeases(ctx context.Context, leaseTimeout time.Duration, limit int) (int64, error)
- type PruneOptions
- type PruneResult
- type PublishError
- type Publisher
- type RetryPolicy
- type Server
- type State
- type Stats
- type Storage
Variables
Section titled “Variables”ErrPublisherPaused is returned when the engine cannot proceed because the publisher (e.g., Kafka, NATS, Redis) is currently unreachable or down.
var ErrPublisherPaused = errors.New("publisher is paused")func CreateNoopTelemetry
Section titled “func CreateNoopTelemetry”func CreateNoopTelemetry() (telemetry.Telemetry, error)CreateNoopTelemetry initializes a Telemetry container with discarded logs, no-op metrics, and no-op traces. Useful for isolating logic in unit tests.
type Engine
Section titled “type Engine”Engine coordinates the movement of events from Storage to Publisher. It manages the polling loop, background maintenance tasks like lease reaping, and ensures that events are processed according to the configured batching and retry policies.
type Engine struct { // contains filtered or unexported fields}func NewEngine
Section titled “func NewEngine”func NewEngine(storage Storage, publisher Publisher, params EngineParams, tel telemetry.Telemetry) (*Engine, error)NewEngine initializes and returns a new Engine instance. It sets up the internal state, pre-allocates memory buffers for batching, and ensures that a unique RelayID is assigned if one is not provided in the parameters.
func (*Engine) Start
Section titled “func (*Engine) Start”func (e *Engine) Start(ctx context.Context) errorStart initiates the relay’s operational loops in the background. It launches three concurrent processes: 1. A metrics watcher that periodically updates backlog statistics. 2. A lease reaper that recovers “stuck” events from crashed instances. 3. The main event processing loop that moves messages from storage to the publisher. It blocks until the context is cancelled or a critical error occurs.
func (*Engine) Stop
Section titled “func (*Engine) Stop”func (e *Engine) Stop(ctx context.Context) errorStop performs a graceful shutdown of the Engine. It closes the underlying storage and publisher connections to ensure no data loss.
type EngineParams
Section titled “type EngineParams”EngineParams handles the tuning and identity. It encapsulates all the operational parameters required to initialize and configure the relay engine’s behavior.
type EngineParams struct { RelayID string Interval time.Duration BatchSize int LeaseTimeout time.Duration ReapBatchSize int PublisherConnectRetryInterval time.Duration HealthCheckInterval time.Duration RetryPolicy RetryPolicy EnableBatchPublish bool EnableStats bool}type Event
Section titled “type Event”Event represents a single unit of work from the outbox table. It contains the message payload, metadata for routing, and delivery tracking information.
type Event struct { // ID is the unique identifier for the event (UUID v4/v7 recommended). ID uuid.UUID `db:"event_id" json:"id"` // Type defines the subject/topic the message should be published to. Type string `db:"event_type" json:"type"` // PartitionKey is used for load balancing on the broker side if supported PartitionKey *string `db:"partition_key" json:"partition_key"` // Payload is the raw message body. Payload []byte `db:"payload" json:"payload"` // Headers is a JSON blob containing custom message attributes/headers. Headers json.RawMessage `db:"headers" json:"headers"`
// Attempts tracks the number of times this event has been tried for delivery. Attempts int `db:"attempts" json:"attempts"`
// CreatedAt is the timestamp when the event was first inserted into the database. CreatedAt time.Time `db:"created_at" json:"created_at"`}func (Event) GetPartitionKey
Section titled “func (Event) GetPartitionKey”func (e Event) GetPartitionKey() stringGetPartitionKey safely dereferences the optional PartitionKey. It returns the value if present, or an empty string if the value was NULL in the database. This prevents nil-pointer panics when publishers access the key.
type EventStatus
Section titled “type EventStatus”EventStatus represents the lifecycle stage of an event in the outbox.
type EventStatus stringconst ( // EventStatusPending indicates the event is ready to be picked up by a relay instance. EventStatusPending EventStatus = "PENDING" // EventStatusDelivering indicates the event is currently locked and being // processed by a relay instance. EventStatusDelivering EventStatus = "DELIVERING" // EventStatusDelivered indicates the event was successfully published to the message broker. EventStatusDelivered EventStatus = "DELIVERED" // EventStatusDead indicates the event failed delivery attempts beyond the // retry limit and is quarantined. EventStatusDead EventStatus = "DEAD")type ExponentialBackoff
Section titled “type ExponentialBackoff”ExponentialBackoff implements a binary exponential backoff strategy with randomized jitter. This is the recommended policy for high-throughput production systems to avoid overwhelming downstream brokers after a failure.
type ExponentialBackoff struct { // MaxAttempts is the hard limit for delivery attempts. MaxAttempts int // BaseDelay is the initial backoff duration (e.g., 1s). BaseDelay time.Duration // MaxDelay is the maximum duration any single backoff can reach. MaxDelay time.Duration // Jitter is a factor (0.0 to 1.0) used to randomize the backoff interval. Jitter float64}func (ExponentialBackoff) NextBackoff
Section titled “func (ExponentialBackoff) NextBackoff”func (p ExponentialBackoff) NextBackoff(attempts int) (time.Duration, bool)NextBackoff calculates the next delay using the formula: BaseDelay * 2^(attempts-1). It ensures the delay does not exceed MaxDelay and applies a random jitter to stagger retries across multiple relay instances.
type FailedEvent
Section titled “type FailedEvent”FailedEvent is a container used to report processing failures back to the storage layer. It includes the updated status and scheduling information for the next retry.
type FailedEvent struct { // ID of the event that failed. ID uuid.UUID `json:"id" db:"id"` // NewStatus is the state the event should transition to (PENDING or DEAD). NewStatus EventStatus `json:"new_status" db:"status"` // AvailableAt is the time when the event becomes eligible for retry. AvailableAt time.Time `json:"available_at" db:"available_at"` // Attempts is the incremented count of delivery tries. Attempts int `json:"attempts" db:"attempts"` // LastError captures the error message from the publisher for diagnostics. LastError string `json:"last_error" db:"last_error"`}type MockPublisher
Section titled “type MockPublisher”MockPublisher is a test double that implements the Publisher interface. It is used to verify that events are correctly dispatched to messaging systems.
type MockPublisher struct { mock.Mock}func (*MockPublisher) Close
Section titled “func (*MockPublisher) Close”func (m *MockPublisher) Close(ctx context.Context) errorClose mocks the graceful shutdown of publisher resources.
func (*MockPublisher) Connect
Section titled “func (*MockPublisher) Connect”func (m *MockPublisher) Connect(ctx context.Context) errorConnect mocks the connect method of publisher.
func (*MockPublisher) Ping
Section titled “func (*MockPublisher) Ping”func (m *MockPublisher) Ping(ctx context.Context) errorPing mocks the connectivity check for the messaging backend.
func (*MockPublisher) Publish
Section titled “func (*MockPublisher) Publish”func (m *MockPublisher) Publish(ctx context.Context, event Event) errorPublish mocks the dispatching of a single event to the target messaging system.
type MockStorage
Section titled “type MockStorage”MockStorage is a test double that implements the Storage interface. It allows for fine-grained control and assertions over database interactions.
type MockStorage struct { mock.Mock}func (*MockStorage) ClaimBatch
Section titled “func (*MockStorage) ClaimBatch”func (m *MockStorage) ClaimBatch(ctx context.Context, relayID string, size int, buffer []Event) ([]Event, error)ClaimBatch mocks the retrieval and locking of a batch of events.
func (*MockStorage) Close
Section titled “func (*MockStorage) Close”func (m *MockStorage) Close(ctx context.Context) errorClose mocks the graceful shutdown of storage resources.
func (*MockStorage) GetStats
Section titled “func (*MockStorage) GetStats”func (m *MockStorage) GetStats(ctx context.Context) (Stats, error)GetStats mocks the retrieval of operational metrics from the storage layer.
func (*MockStorage) MarkDeliveredBatch
Section titled “func (*MockStorage) MarkDeliveredBatch”func (m *MockStorage) MarkDeliveredBatch(ctx context.Context, ids []uuid.UUID, relayID string) errorMarkDeliveredBatch mocks the finalization of successfully processed events.
func (*MockStorage) MarkFailedBatch
Section titled “func (*MockStorage) MarkFailedBatch”func (m *MockStorage) MarkFailedBatch(ctx context.Context, failed []FailedEvent, relayID string) errorMarkFailedBatch mocks the recording of processing failures and retry metadata.
func (*MockStorage) Ping
Section titled “func (*MockStorage) Ping”func (m *MockStorage) Ping(ctx context.Context) errorPing mocks the connectivity check for the storage backend.
func (*MockStorage) Prune
Section titled “func (*MockStorage) Prune”func (m *MockStorage) Prune(ctx context.Context, opts PruneOptions) (PruneResult, error)Prune mocks the archival or deletion of old processed events.
func (*MockStorage) ReapExpiredLeases
Section titled “func (*MockStorage) ReapExpiredLeases”func (m *MockStorage) ReapExpiredLeases(ctx context.Context, leaseTimeout time.Duration, limit int) (int64, error)ReapExpiredLeases mocks the recovery of events held by inactive relay instances.
type PruneOptions
Section titled “type PruneOptions”PruneOptions defines the criteria for cleaning up old records.
type PruneOptions struct { // DeliveredAge defines the duration threshold for DELIVERED events. // The string must follow the format "[number][unit]" where unit is: // 'd' for days, 'h' for hours, or 'm' for minutes (e.g., "7d", "24h", "60m"). // An empty string or "0" indicates that no pruning should be performed // for this status. DeliveredAge string
// DeadAge defines the duration threshold for DEAD events. // Follows the same format as DeliveredAge (e.g., "30d"). // Use this to clear out "quarantined" events after a period of time. DeadAge string
// DryRun, if true, instructs the storage implementation to calculate // and return the count of rows that meet the criteria without // actually performing the deletion. DryRun bool}type PruneResult
Section titled “type PruneResult”PruneResult provides feedback on the cleanup operation, returning the number of records affected by the maintenance task.
type PruneResult struct { // DeliveredDeleted is the total number of events with status 'DELIVERED' // that were successfully removed from the storage. DeliveredDeleted int64
// DeadDeleted is the total number of events with status 'DEAD' // that were successfully removed from the storage. DeadDeleted int64}type PublishError
Section titled “type PublishError”PublishError is a specialized error type used by Publisher implementations to communicate the nature of a failure back to the Engine.
type PublishError struct { // Err is the underlying error returned by the transport client. Err error // IsRetryable indicates if the failure is transient (e.g., network timeout) // or permanent (e.g., authentication failure, invalid subject). // This field determines whether the Engine will retry the event or move it to 'DEAD' status. IsRetryable bool // Code is a machine-readable string used for categorizing errors in metrics and logs. Code string // e.g., "BROKER_NACK", "AUTH_EXPIRED", "VALIDATION_ERROR"}func (*PublishError) Error
Section titled “func (*PublishError) Error”func (e *PublishError) Error() stringfunc (*PublishError) Unwrap
Section titled “func (*PublishError) Unwrap”func (e *PublishError) Unwrap() errortype Publisher
Section titled “type Publisher”Publisher is the common interface for all egress transports. It abstracts the specifics of message brokers (Kafka, NATS, Redis, etc.) providing a unified contract for the Relay Engine.
type Publisher interface {
// Connect establishes the initial connection to the message broker. // It should perform handshakes, authenticate, and initialize necessary // resources (like NATS JetStream contexts or Kafka writers). Connect(ctx context.Context) error
// Publish sends a single event to the downstream system. // It should block until the broker provides a delivery acknowledgment // or the provided context is cancelled. Publish(ctx context.Context, event Event) error
// Close performs a graceful shutdown of the publisher, ensuring any // buffered data is flushed and network resources are released. Close(ctx context.Context) error
// Ping verifies the connectivity to the message broker (e.g., Kafka, NATS). // It ensures the publisher is authenticated and capable of sending messages. Ping(ctx context.Context) error}type RetryPolicy
Section titled “type RetryPolicy”RetryPolicy defines the contract for calculating backoff durations between event delivery attempts. It allows the relay engine to be flexible with different retry strategies (e.g., constant, linear, or exponential).
type RetryPolicy interface { // NextBackoff returns the duration to wait before the next attempt and // a boolean indicating whether the retry limit has been reached. NextBackoff(attempts int) (time.Duration, bool)}type Server
Section titled “type Server”Server is an HTTP server that exposes administrative and observability endpoints. It serves as a diagnostic window into the relay’s operation, providing metrics and health status.
type Server struct { // contains filtered or unexported fields}func NewServer
Section titled “func NewServer”func NewServer(ctx context.Context, s Storage, p Publisher, addr string, logger *zap.Logger) *ServerNewServer creates an instrumented HTTP server. It uses otelhttp to automatically generate trace spans for every incoming request.
func (*Server) Start
Section titled “func (*Server) Start”func (s *Server) Start(ctx context.Context) (err error)Start runs the HTTP server. It blocks until the provided context is canceled or the underlying listener returns an error. When the context is canceled, it performs a graceful shutdown with a 5-second timeout.
type State
Section titled “type State”State represents the current operational state of the engine. It is reported via the StateGauge to provide observability into whether the engine is healthy, throttled, or failing.
type State int64const ( // StateActive: Everything is fine. The engine is polling. StateActive State = 1 // StatePaused: The publisher (Kafka/NATS) is down. We are standing by. StatePaused State = 2 // StateError: A critical error occurred (like a DB connection failure). StateError State = 3)type Stats
Section titled “type Stats”Stats represents a snapshot of the outbox table’s current state.
type Stats struct { // PendingCount is the total number of events currently in 'PENDING' status. PendingCount int64 `json:"pending_count"` // RetryingCount is the number of events in 'PENDING' status that have // failed at least once (attempts > 0). RetryingCount int64 `json:"retrying_count"` // OldestAgeSec is the age in seconds of the oldest event waiting to be processed. OldestAgeSec int64 `json:"oldest_age_sec"`}type Storage
Section titled “type Storage”Storage defines the contract for how the Relay reads and updates events. Implementations are responsible for managing the persistence of outbox events
type Storage interface { // ClaimBatch identifies and locks a set of pending events for a specific relay instance. // It transitions events to the 'DELIVERING' status and associates them with the relayID. // The 'buffer' parameter allows for reusing a slice to minimize allocations. ClaimBatch( ctx context.Context, relayID string, batchSize int, buffer []Event, ) ([]Event, error)
// MarkDeliveredBatch moves events to the final 'DELIVERED' state. // It must verify that the events are still locked by the provided relayID // to prevent race conditions with the lease reaper. MarkDeliveredBatch(ctx context.Context, ids []uuid.UUID, relayID string) error
// MarkFailedBatch handles events that encountered errors during publishing. // It updates event metadata (attempts, last_error) and determines if the event // should be retried (PENDING) or quarantined (DEAD). MarkFailedBatch(ctx context.Context, failures []FailedEvent, relayID string) error
// ReapExpiredLeases identifies events stuck in the 'DELIVERING' state past their // lease duration and resets them to 'PENDING', allowing other instances to pick them up. ReapExpiredLeases(ctx context.Context, leaseTimeout time.Duration, limit int) (int64, error)
// GetStats retrieves high-level operational metrics about the outbox table, // such as the current backlog size and the age of the oldest pending message. GetStats(ctx context.Context) (Stats, error)
// Prune removes old DELIVERED and DEAD events from storage to maintain performance. // This is typically called by the CLI or a background maintenance job. Prune(ctx context.Context, opts PruneOptions) (PruneResult, error)
// Close releases any resources held by the storage implementation, such as // database connection pools. Close(ctx context.Context) error
// Ping verifies the connectivity to the underlying database. It should // return an error if the storage backend is unreachable or misconfigured. Ping(ctx context.Context) error}