Skip to content

publishers

import "github.com/open-outbox/relay/internal/publishers"

Package publishers provides concrete implementations for various message delivery backends.

including:

  • Kafka: For publishing messages to Apache Kafka topics.
  • Nats: For publishing messages to NATS JetStream streams.
  • Redis: For publishing messages to Redis Streams.
  • Stdout: A basic publisher for logging events to standard output, useful for debugging.
  • Null: A no-op publisher, primarily used for performance benchmarking the relay engine without external system latency.

Each publisher implementation is designed to handle message serialization, transport-specific configurations, and robust error handling. A key aspect is determining if a publishing error is transient (retryable) or permanent, which informs the relay engine’s retry policies.

This package also includes an `InstrumentedPublisher` which wraps any `relay.Publisher` to add OpenTelemetry metrics and tracing automatically.

InstrumentedPublisher is a decorator that wraps a base relay.Publisher to provide transparent observability. It automatically records OpenTelemetry traces, Prometheus metrics, and structured logs for every publish operation.

type InstrumentedPublisher struct {
// contains filtered or unexported fields
}
func NewInstrumentedPublisher(p relay.Publisher, tel telemetry.Telemetry, relayID string) *InstrumentedPublisher

NewInstrumentedPublisher returns a new publisher wrapper configured with the provided telemetry components.

func (ip *InstrumentedPublisher) Close(ctx context.Context) error

Close closes the underlying publisher.

func (ip *InstrumentedPublisher) Connect(ctx context.Context) error

Connect establishes the connection to the underlying message broker. can initialize the connection through the instrumentation layer.

func (ip *InstrumentedPublisher) Ping(ctx context.Context) error

Ping verifies the connectivity of the underlying publisher to the message broker.

func (ip *InstrumentedPublisher) Publish(ctx context.Context, event relay.Event) error

Publish wraps the underlying publisher’s Publish method. It creates a new trace span, records the start time for latency metrics, and captures any errors. It ensures that delivery metrics include both the event type and the final outcome (success/failure) for granular monitoring.

Kafka is a publisher that writes messages to an Apache Kafka cluster. It implements the relay.Publisher interface.

type Kafka struct {
// contains filtered or unexported fields
}
func NewKafka(cfg KafkaConfig) (*Kafka, error)

NewKafka initializes a new Kafka writer with strict ordering and safety. It handles the parsing of broker URLs (stripping kafka:// prefixes) and configures the underlying writer with a Hash balancer to ensure messages with the same PartitionKey are always routed to the same Kafka partition.

func (k *Kafka) Close(_ context.Context) error

Close gracefully shuts down the Kafka publisher. It blocks until all buffered messages are flushed or the context expires.

func (k *Kafka) Connect(ctx context.Context) error

Connect satisfies the relay.Publisher interface. It initializes the Kafka writer using the stored configuration.

func (k *Kafka) Ping(ctx context.Context) error

Ping verifies the connectivity to the Kafka brokers by attempting to fetch metadata or checking the underlying connection state.

func (k *Kafka) Publish(ctx context.Context, event relay.Event) error

Publish sends a single event to Kafka. It maps the domain event to a Kafka message, using the Event.Type as the topic. If the operation fails, it wraps the error in a relay.PublishError, classifying it as retryable based on the Kafka error code.

func (k *Kafka) PublishBatch(ctx context.Context, events []relay.Event) error

PublishBatch writes a slice of events to Kafka in a single transaction/request. This is highly efficient for high-volume relays. If any individual message mapping fails (e.g., malformed headers), the entire batch operation returns an error immediately. The segmentio driver handles the actual transport level batching and acknowledgment.

KafkaConfig holds the configuration for the Kafka publisher. It maps directly to the settings used by the segmentio/kafka-go writer, allowing for fine-grained control over batching, timeouts, and durability.

Note: In the context of this relay, BatchSize is typically set to 1 to ensure the relay’s internal batching logic remains the primary driver of delivery frequency.

type KafkaConfig struct {
Brokers []string
MaxAttempts int
WriteTimeout time.Duration
ReadTimeout time.Duration
ConnectionTimeout time.Duration
BatchSize int
BatchBytes int64
BatchTimeout time.Duration
Async bool
Compression kafka.Compression
RequiredAcks kafka.RequiredAcks
}

Nats is a JetStream-powered publisher for At-Least-Once delivery. It implements the relay.Publisher interface by publishing messages to NATS subjects that are backed by a JetStream stream for durability.

type Nats struct {
// contains filtered or unexported fields
}
func NewNats(url string, publishTimeout, connectionTimeout time.Duration) (*Nats, error)

NewNats establishes a connection to a NATS server and initializes a JetStream context. It sets a client name “Open-Outbox-Relay” on the connection to facilitate easier identification in NATS monitoring tools.

func (n *Nats) Close(_ context.Context) error

Close gracefully shuts down the NATS connection.

func (n *Nats) Connect(_ context.Context) error

Connect establishes the connection to the NATS server.

func (n *Nats) Ping(_ context.Context) error

Ping verifies the connectivity to the NATS server. It checks if the underlying connection is active and capable of communicating with the NATS cluster.

func (n *Nats) Publish(ctx context.Context, event relay.Event) error

Publish sends an event to NATS JetStream. It maps the domain Event.Type to the NATS subject and translates JSON headers into NATS message headers. It automatically sets the “Nats-Msg-Id” header using the Event ID to enable JetStream’s built-in idempotent publishing (message deduplication).

If the connection is closed or the publish fails, it returns a relay.PublishError, classifying the failure as retryable based on NATS-specific error codes.

Null represents a publisher that does nothing. It is primarily used for performance benchmarking the engine’s orchestration and storage overhead without transport latency.

type Null struct{}
func NewNull() *Null

NewNull creates a new instance of the Null publisher.

func (n *Null) Close(_ context.Context) error

Close does nothing on the Null publisher

func (n *Null) Connect(_ context.Context) error

Connect satisfies the relay.Publisher interface. For the Null publisher, this is a no-op that always reports success, allowing the engine to start immediately without a physical broker.

func (n *Null) Ping(_ context.Context) error

Ping does nothing on the Null publisher

func (n *Null) Publish(_ context.Context, _ relay.Event) error

Publish satisfies the relay.Publisher interface. It effectively “black holes” the event and immediately reports success.

Redis is a publisher that pushes events into Redis Streams using the XADD command. It implements the relay.Publisher interface.

type Redis struct {
// contains filtered or unexported fields
}
func NewRedis(url string, writeTimeout, connectionTimeout time.Duration) (*Redis, error)

NewRedis establishes a connection to a Redis server. It validates the connection with a Ping before returning. It accepts redis URLs like “redis://<user>:<pass>@localhost:6379/0” .

func (r *Redis) Close(_ context.Context) error

Close gracefully shuts down the Redis client and its underlying connection pool.

func (r *Redis) Connect(ctx context.Context) error

Connect establishes the connection to the Redis server. It parses the URL, configures the client, and verifies connectivity with a Ping.

func (r *Redis) Ping(ctx context.Context) error

Ping verifies the connectivity to the Redis server. It ensures the client can communicate with the storage backend and that the connection pool is healthy.

func (r *Redis) Publish(ctx context.Context, event relay.Event) error

Publish appends the event to a Redis Stream. It uses the Event.Type as the stream name. The event ID and Payload are stored as fields within the stream entry.

If publishing fails, it classifies the error as retryable if it indicates a transient issue like network disruption or a cluster resharding event.

Stdout represents a publisher that writes event data to the console. It is primarily used for local development, debugging, and piping output to other CLI tools.

type Stdout struct{}
func NewStdout() *Stdout

NewStdout creates a new instance of the Stdout publisher.

func (s *Stdout) Close(_ context.Context) error

Close does nothing on the Stdout publisher

func (s *Stdout) Connect(_ context.Context) error

Connect satisfies the relay.Publisher interface. For the Stdout publisher, this is a no-op that always reports success, as the standard output stream is assumed to be available at runtime.

func (s *Stdout) Ping(_ context.Context) error

Ping does nothing on the Stdout publisher

func (s *Stdout) Publish(_ context.Context, event relay.Event) error

Publish satisfies the relay.Publisher interface. It formats the event as a string and writes it to standard output.