publishers
import "github.com/open-outbox/relay/internal/publishers"Package publishers provides concrete implementations for various message delivery backends.
including:
- Kafka: For publishing messages to Apache Kafka topics.
- Nats: For publishing messages to NATS JetStream streams.
- Redis: For publishing messages to Redis Streams.
- Stdout: A basic publisher for logging events to standard output, useful for debugging.
- Null: A no-op publisher, primarily used for performance benchmarking the relay engine without external system latency.
Each publisher implementation is designed to handle message serialization, transport-specific configurations, and robust error handling. A key aspect is determining if a publishing error is transient (retryable) or permanent, which informs the relay engine’s retry policies.
This package also includes an `InstrumentedPublisher` which wraps any `relay.Publisher` to add OpenTelemetry metrics and tracing automatically.
- type InstrumentedPublisher
- func NewInstrumentedPublisher(p relay.Publisher, tel telemetry.Telemetry, relayID string) *InstrumentedPublisher
- func (ip *InstrumentedPublisher) Close(ctx context.Context) error
- func (ip *InstrumentedPublisher) Connect(ctx context.Context) error
- func (ip *InstrumentedPublisher) Ping(ctx context.Context) error
- func (ip *InstrumentedPublisher) Publish(ctx context.Context, event relay.Event) error
- type Kafka
- func NewKafka(cfg KafkaConfig) (*Kafka, error)
- func (k *Kafka) Close(_ context.Context) error
- func (k *Kafka) Connect(ctx context.Context) error
- func (k *Kafka) Ping(ctx context.Context) error
- func (k *Kafka) Publish(ctx context.Context, event relay.Event) error
- func (k *Kafka) PublishBatch(ctx context.Context, events []relay.Event) error
- type KafkaConfig
- type Nats
- type Null
- type Redis
- func NewRedis(url string, writeTimeout, connectionTimeout time.Duration) (*Redis, error)
- func (r *Redis) Close(_ context.Context) error
- func (r *Redis) Connect(ctx context.Context) error
- func (r *Redis) Ping(ctx context.Context) error
- func (r *Redis) Publish(ctx context.Context, event relay.Event) error
- type Stdout
type InstrumentedPublisher
Section titled “type InstrumentedPublisher”InstrumentedPublisher is a decorator that wraps a base relay.Publisher to provide transparent observability. It automatically records OpenTelemetry traces, Prometheus metrics, and structured logs for every publish operation.
type InstrumentedPublisher struct { // contains filtered or unexported fields}func NewInstrumentedPublisher
Section titled “func NewInstrumentedPublisher”func NewInstrumentedPublisher(p relay.Publisher, tel telemetry.Telemetry, relayID string) *InstrumentedPublisherNewInstrumentedPublisher returns a new publisher wrapper configured with the provided telemetry components.
func (*InstrumentedPublisher) Close
Section titled “func (*InstrumentedPublisher) Close”func (ip *InstrumentedPublisher) Close(ctx context.Context) errorClose closes the underlying publisher.
func (*InstrumentedPublisher) Connect
Section titled “func (*InstrumentedPublisher) Connect”func (ip *InstrumentedPublisher) Connect(ctx context.Context) errorConnect establishes the connection to the underlying message broker. can initialize the connection through the instrumentation layer.
func (*InstrumentedPublisher) Ping
Section titled “func (*InstrumentedPublisher) Ping”func (ip *InstrumentedPublisher) Ping(ctx context.Context) errorPing verifies the connectivity of the underlying publisher to the message broker.
func (*InstrumentedPublisher) Publish
Section titled “func (*InstrumentedPublisher) Publish”func (ip *InstrumentedPublisher) Publish(ctx context.Context, event relay.Event) errorPublish wraps the underlying publisher’s Publish method. It creates a new trace span, records the start time for latency metrics, and captures any errors. It ensures that delivery metrics include both the event type and the final outcome (success/failure) for granular monitoring.
type Kafka
Section titled “type Kafka”Kafka is a publisher that writes messages to an Apache Kafka cluster. It implements the relay.Publisher interface.
type Kafka struct { // contains filtered or unexported fields}func NewKafka
Section titled “func NewKafka”func NewKafka(cfg KafkaConfig) (*Kafka, error)NewKafka initializes a new Kafka writer with strict ordering and safety. It handles the parsing of broker URLs (stripping kafka:// prefixes) and configures the underlying writer with a Hash balancer to ensure messages with the same PartitionKey are always routed to the same Kafka partition.
func (*Kafka) Close
Section titled “func (*Kafka) Close”func (k *Kafka) Close(_ context.Context) errorClose gracefully shuts down the Kafka publisher. It blocks until all buffered messages are flushed or the context expires.
func (*Kafka) Connect
Section titled “func (*Kafka) Connect”func (k *Kafka) Connect(ctx context.Context) errorConnect satisfies the relay.Publisher interface. It initializes the Kafka writer using the stored configuration.
func (*Kafka) Ping
Section titled “func (*Kafka) Ping”func (k *Kafka) Ping(ctx context.Context) errorPing verifies the connectivity to the Kafka brokers by attempting to fetch metadata or checking the underlying connection state.
func (*Kafka) Publish
Section titled “func (*Kafka) Publish”func (k *Kafka) Publish(ctx context.Context, event relay.Event) errorPublish sends a single event to Kafka. It maps the domain event to a Kafka message, using the Event.Type as the topic. If the operation fails, it wraps the error in a relay.PublishError, classifying it as retryable based on the Kafka error code.
func (*Kafka) PublishBatch
Section titled “func (*Kafka) PublishBatch”func (k *Kafka) PublishBatch(ctx context.Context, events []relay.Event) errorPublishBatch writes a slice of events to Kafka in a single transaction/request. This is highly efficient for high-volume relays. If any individual message mapping fails (e.g., malformed headers), the entire batch operation returns an error immediately. The segmentio driver handles the actual transport level batching and acknowledgment.
type KafkaConfig
Section titled “type KafkaConfig”KafkaConfig holds the configuration for the Kafka publisher. It maps directly to the settings used by the segmentio/kafka-go writer, allowing for fine-grained control over batching, timeouts, and durability.
Note: In the context of this relay, BatchSize is typically set to 1 to ensure the relay’s internal batching logic remains the primary driver of delivery frequency.
type KafkaConfig struct { Brokers []string MaxAttempts int WriteTimeout time.Duration ReadTimeout time.Duration ConnectionTimeout time.Duration BatchSize int BatchBytes int64 BatchTimeout time.Duration Async bool Compression kafka.Compression RequiredAcks kafka.RequiredAcks}type Nats
Section titled “type Nats”Nats is a JetStream-powered publisher for At-Least-Once delivery. It implements the relay.Publisher interface by publishing messages to NATS subjects that are backed by a JetStream stream for durability.
type Nats struct { // contains filtered or unexported fields}func NewNats
Section titled “func NewNats”func NewNats(url string, publishTimeout, connectionTimeout time.Duration) (*Nats, error)NewNats establishes a connection to a NATS server and initializes a JetStream context. It sets a client name “Open-Outbox-Relay” on the connection to facilitate easier identification in NATS monitoring tools.
func (*Nats) Close
Section titled “func (*Nats) Close”func (n *Nats) Close(_ context.Context) errorClose gracefully shuts down the NATS connection.
func (*Nats) Connect
Section titled “func (*Nats) Connect”func (n *Nats) Connect(_ context.Context) errorConnect establishes the connection to the NATS server.
func (*Nats) Ping
Section titled “func (*Nats) Ping”func (n *Nats) Ping(_ context.Context) errorPing verifies the connectivity to the NATS server. It checks if the underlying connection is active and capable of communicating with the NATS cluster.
func (*Nats) Publish
Section titled “func (*Nats) Publish”func (n *Nats) Publish(ctx context.Context, event relay.Event) errorPublish sends an event to NATS JetStream. It maps the domain Event.Type to the NATS subject and translates JSON headers into NATS message headers. It automatically sets the “Nats-Msg-Id” header using the Event ID to enable JetStream’s built-in idempotent publishing (message deduplication).
If the connection is closed or the publish fails, it returns a relay.PublishError, classifying the failure as retryable based on NATS-specific error codes.
type Null
Section titled “type Null”Null represents a publisher that does nothing. It is primarily used for performance benchmarking the engine’s orchestration and storage overhead without transport latency.
type Null struct{}func NewNull
Section titled “func NewNull”func NewNull() *NullNewNull creates a new instance of the Null publisher.
func (*Null) Close
Section titled “func (*Null) Close”func (n *Null) Close(_ context.Context) errorClose does nothing on the Null publisher
func (*Null) Connect
Section titled “func (*Null) Connect”func (n *Null) Connect(_ context.Context) errorConnect satisfies the relay.Publisher interface. For the Null publisher, this is a no-op that always reports success, allowing the engine to start immediately without a physical broker.
func (*Null) Ping
Section titled “func (*Null) Ping”func (n *Null) Ping(_ context.Context) errorPing does nothing on the Null publisher
func (*Null) Publish
Section titled “func (*Null) Publish”func (n *Null) Publish(_ context.Context, _ relay.Event) errorPublish satisfies the relay.Publisher interface. It effectively “black holes” the event and immediately reports success.
type Redis
Section titled “type Redis”Redis is a publisher that pushes events into Redis Streams using the XADD command. It implements the relay.Publisher interface.
type Redis struct { // contains filtered or unexported fields}func NewRedis
Section titled “func NewRedis”func NewRedis(url string, writeTimeout, connectionTimeout time.Duration) (*Redis, error)NewRedis establishes a connection to a Redis server. It validates the connection with a Ping before returning. It accepts redis URLs like “redis://<user>:<pass>@localhost:6379/0” .
func (*Redis) Close
Section titled “func (*Redis) Close”func (r *Redis) Close(_ context.Context) errorClose gracefully shuts down the Redis client and its underlying connection pool.
func (*Redis) Connect
Section titled “func (*Redis) Connect”func (r *Redis) Connect(ctx context.Context) errorConnect establishes the connection to the Redis server. It parses the URL, configures the client, and verifies connectivity with a Ping.
func (*Redis) Ping
Section titled “func (*Redis) Ping”func (r *Redis) Ping(ctx context.Context) errorPing verifies the connectivity to the Redis server. It ensures the client can communicate with the storage backend and that the connection pool is healthy.
func (*Redis) Publish
Section titled “func (*Redis) Publish”func (r *Redis) Publish(ctx context.Context, event relay.Event) errorPublish appends the event to a Redis Stream. It uses the Event.Type as the stream name. The event ID and Payload are stored as fields within the stream entry.
If publishing fails, it classifies the error as retryable if it indicates a transient issue like network disruption or a cluster resharding event.
type Stdout
Section titled “type Stdout”Stdout represents a publisher that writes event data to the console. It is primarily used for local development, debugging, and piping output to other CLI tools.
type Stdout struct{}func NewStdout
Section titled “func NewStdout”func NewStdout() *StdoutNewStdout creates a new instance of the Stdout publisher.
func (*Stdout) Close
Section titled “func (*Stdout) Close”func (s *Stdout) Close(_ context.Context) errorClose does nothing on the Stdout publisher
func (*Stdout) Connect
Section titled “func (*Stdout) Connect”func (s *Stdout) Connect(_ context.Context) errorConnect satisfies the relay.Publisher interface. For the Stdout publisher, this is a no-op that always reports success, as the standard output stream is assumed to be available at runtime.
func (*Stdout) Ping
Section titled “func (*Stdout) Ping”func (s *Stdout) Ping(_ context.Context) errorPing does nothing on the Stdout publisher
func (*Stdout) Publish
Section titled “func (*Stdout) Publish”func (s *Stdout) Publish(_ context.Context, event relay.Event) errorPublish satisfies the relay.Publisher interface. It formats the event as a string and writes it to standard output.