config
import "github.com/open-outbox/relay/internal/config"Package config handles the centralized configuration for the Outbox Relay. It follows a “Cloud-Native” priority hierarchy: 1. System Environment Variables (Highest Priority) 2. Local .env file overrides (Development) 3. Hardcoded Sanity Defaults (Fallback)
Constants
Section titled “Constants”DefaultRetryJitter prevents “Thundering Herd” issues by staggering retries.
const DefaultRetryJitter = 0.15DefaultTableName is the standard table name for the events if none is provided.
const DefaultTableName = "openoutbox_events"type Config
Section titled “type Config”Config represents the application’s entire configuration state. It uses mapstructure tags for automatic type conversion from environment strings.
type Config struct { // StorageType determines the type of outbox storage (e.g., "postgres", "mysql"). // Default: "postgres" StorageType string `mapstructure:"STORAGE_TYPE"`
// TableName is the name of the outbox table. // This allows users to avoid naming collisions in shared databases. // Default: "openoutbox_events" StorageTableName string `mapstructure:"STORAGE_TABLE_NAME"`
// PublisherType determines the message broker or output // (e.g., "nats", "kafka", "redis", "stdout"). // Default: "stdout" PublisherType string `mapstructure:"PUBLISHER_TYPE"`
// StorageURL is the connection string for the outbox storage in common // database URL format (e.g., "postgres://user:pass@host:5432/db"). StorageURL string `mapstructure:"STORAGE_URL"`
// PublisherURL is the address or connection string for the message broker (e.g., "kafka:9092"). PublisherURL string `mapstructure:"PUBLISHER_URL"`
// PollInterval defines how frequently the engine checks the outbox table for new events. // Default: "500ms" PollInterval time.Duration `mapstructure:"POLL_INTERVAL"`
// BatchSize is the maximum number of events to process in a single iteration. // Default: 100 BatchSize int `mapstructure:"BATCH_SIZE"`
// LeaseTimeout is the duration an event remains locked for processing before // being considered "stuck" and eligible for reaping. // Default: "3m" LeaseTimeout time.Duration `mapstructure:"LEASE_TIMEOUT"`
// ReapBatchSize is the number of stuck (expired lease) events to reset per cleanup cycle. // Default: 100 ReapBatchSize int `mapstructure:"REAP_BATCH_SIZE"`
// PublisherConnectRetryInterval defines how long the engine waits between // attempts to connect to the message broker (NATS, Kafka, etc.) during startup. // This prevents the relay from crashing if the broker is still booting up. // Default: "5s" PublisherConnectRetryInterval time.Duration `mapstructure:"PUBLISHER_CONNECT_RETRY_INTERVAL"`
// HealthCheckInterval defines the frequency of background health probes. // This determines how quickly the engine detects outages and flips the // relay state to Paused or Error. // Default: "5s" HealthCheckInterval time.Duration `mapstructure:"HEALTH_CHECK_INTERVAL"`
// EnableStats determines whether the engine performs background database // scans to calculate backlog metrics (e.g., PENDING counts and oldest age). // In high-scale environments with millions of rows, disabling this prevents // performance degradation and database I/O saturation. // Default: "true" EnableStats bool `mapstructure:"ENABLE_STATS"`
// ServerPort is the address/port for the HTTP health check and metrics server. // Default: ":8080" ServerPort string `mapstructure:"SERVER_PORT"`
// Environment specifies the mode (development/production) which affects logging and safety checks. // Default: "production" Environment Environment `mapstructure:"ENVIRONMENT"`
// RELAY_ID is a unique identifier for this instance, used for // locking events in the database to prevent collisions. // Default: os.Hostname() RelayID string `mapstructure:"RELAY_ID"`
// RetryMaxAttempts is the maximum number of times an event will be // retried before being marked as DEAD. // Default: 25 RetryMaxAttempts int `mapstructure:"RETRY_MAX_ATTEMPTS"`
// RetryBaseDelay is the starting delay for the exponential backoff strategy. // Default: "1s" RetryBaseDelay time.Duration `mapstructure:"RETRY_BASE_DELAY"`
// RetryMaxDelay is the upper limit for any single retry delay. // Default: "24h" RetryMaxDelay time.Duration `mapstructure:"RETRY_MAX_DELAY"`
// RetryJitter is the randomization factor (0.0 to 1.0) applied to retry delays to // prevent "Thundering Herd" issues. // 0.15 RetryJitter float64 `mapstructure:"RETRY_JITTER"`
// NatsPublishTimeout is the maximum time to wait for the NATS publisher to publish a message to // the NATs broker. // Default: "5s" NatsPublishTimeout time.Duration `mapstructure:"NATS_PUBLISH_TIMEOUT"`
// NatsConnectionTimeout is the maximum time to wait for the initial connection // and handshake with the NATS server. // Default: 5s NatsConnectionTimeout time.Duration `mapstructure:"NATS_CONNECTION_TIMEOUT"`
// RedisConnectionTimeout is the maximum time to wait for the redis client to connect to the // redis server. // Default: "5s" RedisConnectionTimeout time.Duration `mapstructure:"REDIS_CONNECTION_TIMEOUT"`
// RedisWriteTimeout is the maximum time allowed for each write operation (XADD) // to complete during the engine loop. // Default: "1s" RedisWriteTimeout time.Duration `mapstructure:"REDIS_WRITE_TIMEOUT"`
// KafkaMaxAttempts is the number of write attempts... // Default: 5 KafkaMaxAttempts int `mapstructure:"KAFKA_MAX_ATTEMPTS"`
// KafkaWriteTimeout is the deadline for writing a message batch to Kafka. // Default: "10s" KafkaWriteTimeout time.Duration `mapstructure:"KAFKA_WRITE_TIMEOUT"`
// KafkaReadTimeout is the deadline for reading a response (like ACKs) from the Kafka broker. // Default: "10s" KafkaReadTimeout time.Duration `mapstructure:"KAFKA_READ_TIMEOUT"`
// KafkaConnectionTimeout is the maximum time allowed to establish the initial // TCP connection and handshake with the Kafka brokers. // Default: 10s KafkaConnectionTimeout time.Duration `mapstructure:"KAFKA_CONNECTION_TIMEOUT"`
// KafkaBatchSize defines how many messages the writer collects before flushing to the broker. // // In this Relay, setting this to 1 is critical to bypass the Kafka client's internal // buffering. Since the Relay Engine already batches events at the database level, // increasing this value will introduce unnecessary latency (double-batching). // // Default: 1 // Note: Higher values may significantly decrease delivery speed in single-publish mode. KafkaBatchSize int `mapstructure:"KAFKA_BATCH_SIZE"`
// KafkaBatchBytes is the maximum total size of a batch in bytes before the Kafka writer flushes. // Default: 10485760 (10MB) KafkaBatchBytes int64 `mapstructure:"KAFKA_BATCH_BYTES"`
// KafkaBatchTimeout is the maximum time to wait before flushing a partial batch to Kafka. // Default: "10ms" KafkaBatchTimeout time.Duration `mapstructure:"KAFKA_BATCH_TIMEOUT"`
// KafkaAsync enables non-blocking writes to the Kafka broker. // // In an Outbox Relay, this is typically set to false to ensure "At-Least-Once" // delivery. Enabling async mode increases throughput but risks data loss, // as the relay may mark events as delivered before the broker acknowledges them. // // Default: false // // Caution: Only enable if your system can tolerate potential message loss // in exchange for maximum publishing throughput. KafkaAsync bool `mapstructure:"KAFKA_ASYNC"`
// KafkaCompression specifies the compression algorithm used for // messages (none, gzip, snappy, lz4, zstd). // Default: "snappy" KafkaCompression string `mapstructure:"KAFKA_COMPRESSION"`
// KafkaRequiredAcks defines the number of brokers that must acknowledge a // write before the relay considers the message successfully delivered. // // Supported values: // "all" - (Highest Durability) Waits for all in-sync replicas to acknowledge. // "one" - (Balanced) Waits only for the leader to acknowledge. // "none" - (Maximum Speed) Does not wait for any acknowledgment (risks data loss). // // Default: "all" // // Note: In an Outbox Relay, "all" is the recommended setting to maintain // strict "At-Least-Once" delivery guarantees. KafkaRequiredAcks string `mapstructure:"KAFKA_REQUIRED_ACKS"`
// RemoteConfigProvider determines the external source for configuration. // Supported providers: "etcd3", "consul", or "firestore". // Default: "" RemoteConfigProvider string `mapstructure:"REMOTE_CONFIG_PROVIDER"`
// RemoteConfigEndpoint is the network address of the configuration provider // (e.g., "127.0.0.1:8500" for Consul or "etcd-cluster:2379"). // Default: "" RemoteConfigEndpoint string `mapstructure:"REMOTE_CONFIG_ENDPOINT"`
// RemoteConfigPath specifies the key or file path within the provider // where the configuration blob is stored (e.g., "/config/relay.yaml"). // Default: "" RemoteConfigPath string `mapstructure:"REMOTE_CONFIG_PATH"`
// RemoteConfigType defines the format of the remote configuration blob. // This ensures the engine knows how to parse the incoming data (e.g., "yaml", "json"). // Default: "yaml" RemoteConfigType string `mapstructure:"REMOTE_CONFIG_TYPE"`}func Load
Section titled “func Load”func Load() (*Config, error)Load initializes the Config struct by merging defaults, environment variables, and an optional .env file. It prioritizes system environment variables to ensure compatibility with Docker and Kubernetes secrets.
type Environment
Section titled “type Environment”Environment represents the operational mode of the relay.
type Environment stringconst ( // Development mode enables extra logging and relaxed security checks. Development Environment = "development" // Production mode optimizes for performance and strict error handling. Production Environment = "production")