zoobzio December 11, 2025 Edit this page

API Reference

Complete API reference for github.com/zoobz-io/flux.

Types

Watcher

Interface for configuration sources.

type Watcher interface {
    Watch(ctx context.Context) (<-chan []byte, error)
}

Implementations must emit the current value immediately upon Watch() being called, then emit subsequent values when the source changes. The channel closes when context is cancelled.

Request

Carries configuration data through the processing pipeline.

type Request[T Validator] struct {
    Previous T      // Last successfully applied configuration
    Current  T      // Newly parsed and validated configuration
    Raw      []byte // Original bytes from watcher
}

Reducer

Merges multiple configuration sources into a single configuration.

type Reducer[T Validator] func(ctx context.Context, prev, curr []T) (T, error)

Receives the previous merged values (nil on first call) and the current parsed values from each source in the same order as the sources were provided.

Capacitor

Watches a single source for configuration changes.

type Capacitor[T Validator] struct {
    // unexported fields
}

New

func New[T Validator](
    watcher Watcher,
    fn func(ctx context.Context, prev, curr T) error,
    opts ...Option[T],
) *Capacitor[T]

Creates a Capacitor that watches a source for configuration changes. The type T must implement the Validator interface. The watcher emits raw bytes, which are unmarshaled to type T using the configured codec (JSON by default). Validation is performed by calling T.Validate(). On success, the callback is invoked with the previous and current configuration values.

Start

func (c *Capacitor[T]) Start(ctx context.Context) error

Begins watching. Blocks until the first value is processed, then continues asynchronously. Returns error if initial load fails (but continues watching). Can only be called once.

State

func (c *Capacitor[T]) State() State

Returns the current state: StateLoading, StateHealthy, StateDegraded, or StateEmpty.

Current

func (c *Capacitor[T]) Current() (T, bool)

Returns the current valid configuration and true, or zero value and false if none exists.

LastError

func (c *Capacitor[T]) LastError() error

Returns the last error, or nil if no error occurred.

ErrorHistory

func (c *Capacitor[T]) ErrorHistory() []error

Returns recent errors (oldest first) if error history is enabled via ErrorHistorySize. Returns nil if not enabled.

Process

func (c *Capacitor[T]) Process(ctx context.Context) bool

Manually processes the next value. Only available in sync mode. Returns true if a value was processed.

CompositeCapacitor

Watches multiple sources and merges their configurations.

type CompositeCapacitor[T Validator] struct {
    // unexported fields
}

Compose

func Compose[T Validator](
    reducer Reducer[T],
    sources []Watcher,
    opts ...Option[T],
) *CompositeCapacitor[T]

Creates a CompositeCapacitor. The type T must implement the Validator interface. Each source emits raw bytes, unmarshaled and validated to type T. The reducer receives previous and current slices (in source order) and returns the merged configuration.

Methods

CompositeCapacitor has the same methods as Capacitor: Start, State, Current, LastError, ErrorHistory, Process.

Additional method:

func (c *CompositeCapacitor[T]) SourceErrors() []SourceError

Returns errors from individual sources for granular diagnostics.

State

Configuration health state.

type State int

const (
    StateLoading  State = iota  // Waiting for first value
    StateHealthy                // Valid config active
    StateDegraded               // Update failed, previous retained
    StateEmpty                  // No valid config ever obtained
)

String

func (s State) String() string

Returns string representation: "loading", "healthy", "degraded", or "empty".

Codec

Interface for configuration deserialization.

type Codec interface {
    Unmarshal(data []byte, v any) error
    ContentType() string
}

JSONCodec

JSON deserialization codec (default).

type JSONCodec struct{}

func (JSONCodec) Unmarshal(data []byte, v any) error

YAMLCodec

YAML deserialization codec.

type YAMLCodec struct{}

func (YAMLCodec) Unmarshal(data []byte, v any) error

SourceError

Error from a specific source in CompositeCapacitor.

type SourceError struct {
    Index int
    Error error
}

Instance Configuration

Instance configuration methods are chainable and configure the Capacitor or CompositeCapacitor behavior. These methods can be called on the returned instance after calling New() or Compose().

Debounce

func (c *Capacitor[T]) Debounce(d time.Duration) *Capacitor[T]

Sets debounce duration. Changes within this duration are coalesced. Default: 100ms.

Codec

func (c *Capacitor[T]) Codec(codec Codec) *Capacitor[T]

Sets the codec for deserialization. Default: JSONCodec{}.

SyncMode

func (c *Capacitor[T]) SyncMode() *Capacitor[T]

Enables synchronous processing for testing. Disables async goroutines and debouncing.

Clock

func (c *Capacitor[T]) Clock(clock clockz.Clock) *Capacitor[T]

Sets a custom clock for time operations. Use with clockz.FakeClock for testing debounce behavior.

StartupTimeout

func (c *Capacitor[T]) StartupTimeout(d time.Duration) *Capacitor[T]

Sets the maximum duration to wait for the initial configuration value from the watcher. If the watcher fails to emit within this duration, Start() returns an error. Default: no timeout (wait indefinitely).

Metrics

func (c *Capacitor[T]) Metrics(provider MetricsProvider) *Capacitor[T]

Sets a metrics provider for observability integration. The provider receives callbacks on state changes, processing success/failure, and change events. See MetricsProvider.

OnStop

func (c *Capacitor[T]) OnStop(fn func(State)) *Capacitor[T]

Sets a callback invoked when the capacitor stops watching. The callback receives the final state. Useful for graceful shutdown scenarios.

ErrorHistorySize

func (c *Capacitor[T]) ErrorHistorySize(n int) *Capacitor[T]

Sets the number of recent errors to retain. When set, ErrorHistory() returns up to this many recent errors (oldest first). Use 0 (default) to only track the most recent error via LastError().

Pipeline Options

Pipeline options wrap the processing pipeline with middleware at the boundary level. These are powered by pipz.

WithRetry

func WithRetry[T Validator](maxAttempts int) Option[T]

Wraps the pipeline with retry logic. Failed operations are retried immediately up to maxAttempts times.

WithBackoff

func WithBackoff[T Validator](maxAttempts int, baseDelay time.Duration) Option[T]

Wraps the pipeline with exponential backoff retry logic. Failed operations are retried with increasing delays: baseDelay, 2*baseDelay, 4*baseDelay, etc.

WithTimeout

func WithTimeout[T Validator](d time.Duration) Option[T]

Wraps the pipeline with a timeout. If processing takes longer than the specified duration, the operation fails with a timeout error.

WithFallback

func WithFallback[T Validator](fallbacks ...pipz.Chainable[*Request[T]]) Option[T]

Wraps the pipeline with fallback processors. If the primary pipeline (including the callback) fails, each fallback is tried in order until one succeeds.

WithCircuitBreaker

func WithCircuitBreaker[T Validator](failures int, recovery time.Duration) Option[T]

Wraps the pipeline with circuit breaker protection. After failures consecutive failures, the circuit opens and rejects further requests until recovery time has passed.

The circuit breaker has three states:

  • Closed: Normal operation, requests pass through
  • Open: After threshold failures, requests are rejected immediately
  • Half-Open: After recovery timeout, one request is allowed to test recovery

WithErrorHandler

func WithErrorHandler[T Validator](handler pipz.Chainable[*pipz.Error[*Request[T]]]) Option[T]

Adds error observation to the pipeline. Errors are passed to the handler for logging, metrics, or alerting, but the error still propagates. Use this for observability, not recovery.

WithPipeline

func WithPipeline[T Validator](identity pipz.Identity) Option[T]

Wraps the entire processing pipeline with a pipz.Pipeline for correlated tracing. Each Process() call generates a unique execution ID, while the pipeline ID remains stable (derived from the identity).

Use pipz.ExecutionIDFromContext and pipz.PipelineIDFromContext in middleware or signal handlers to extract correlation IDs for observability.

This option should typically be applied last (outermost) to ensure all nested processors have access to the correlation context.

Example:

var configPipelineID = pipz.NewIdentity("myapp:config", "Configuration pipeline")

capacitor := flux.New[Config](
    watcher,
    callback,
    flux.WithRetry[Config](3),
    flux.WithPipeline[Config](configPipelineID),
)

WithMiddleware

func WithMiddleware[T Validator](processors ...pipz.Chainable[*Request[T]]) Option[T]

Wraps the pipeline with a sequence of processors. Processors execute in order, with the wrapped pipeline (callback) last. Use the Use* functions to create processors for common patterns, or provide custom pipz.Chainable implementations directly.

Example:

var (
    logID    = pipz.NewIdentity("myapp:log", "Logs config changes")
    enrichID = pipz.NewIdentity("myapp:enrich", "Enriches config")
    markID   = pipz.NewIdentity("myapp:mark", "Marks config")
)

flux.New[Config](
    watcher,
    callback,
    flux.WithMiddleware(
        flux.UseEffect[Config](logID, logFn),
        flux.UseApply[Config](enrichID, enrichFn),
        flux.UseRateLimit[Config](10, 5,
            flux.UseTransform[Config](markID, markFn),
        ),
    ),
    flux.WithCircuitBreaker[Config](5, 30*time.Second),
)

Middleware Processors

These functions create processors for use inside WithMiddleware. They transform or observe the request as it flows through the pipeline.

Adapters

Adapters convert functions into pipz processors.

UseTransform

func UseTransform[T Validator](identity pipz.Identity, fn func(context.Context, *Request[T]) *Request[T]) pipz.Chainable[*Request[T]]

Creates a processor that transforms the request. Cannot fail. Use for pure transformations that always succeed.

UseApply

func UseApply[T Validator](identity pipz.Identity, fn func(context.Context, *Request[T]) (*Request[T], error)) pipz.Chainable[*Request[T]]

Creates a processor that can transform the request and fail. Use for operations like enrichment, validation, or transformation that may produce errors.

UseEffect

func UseEffect[T Validator](identity pipz.Identity, fn func(context.Context, *Request[T]) error) pipz.Chainable[*Request[T]]

Creates a processor that performs a side effect. The request passes through unchanged. Use for logging, metrics, or notifications.

UseMutate

func UseMutate[T Validator](identity pipz.Identity, transformer func(context.Context, *Request[T]) *Request[T], condition func(context.Context, *Request[T]) bool) pipz.Chainable[*Request[T]]

Creates a processor that conditionally transforms the request. The transformer is only applied if the condition returns true.

UseEnrich

func UseEnrich[T Validator](identity pipz.Identity, fn func(context.Context, *Request[T]) (*Request[T], error)) pipz.Chainable[*Request[T]]

Creates a processor that attempts optional enhancement. If the enrichment fails, the error is logged but processing continues with the original request. Use for non-critical enhancements.

Wrapping Processors

These wrap another processor with reliability logic.

UseRetry

func UseRetry[T Validator](maxAttempts int, processor pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]

Wraps a processor with retry logic. Failed operations are retried immediately up to maxAttempts times.

UseBackoff

func UseBackoff[T Validator](maxAttempts int, baseDelay time.Duration, processor pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]

Wraps a processor with exponential backoff retry logic. Failed operations are retried with increasing delays.

UseTimeout

func UseTimeout[T Validator](d time.Duration, processor pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]

Wraps a processor with a deadline. If processing takes longer than the specified duration, the operation fails.

UseFallback

func UseFallback[T Validator](primary pipz.Chainable[*Request[T]], fallbacks ...pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]

Wraps a processor with fallback alternatives. If the primary fails, each fallback is tried in order.

UseFilter

func UseFilter[T Validator](identity pipz.Identity, condition func(context.Context, *Request[T]) bool, processor pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]

Wraps a processor with a condition. If the condition returns false, the request passes through unchanged.

UseRateLimit

func UseRateLimit[T Validator](rate float64, burst int, processor pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]

Wraps a processor with rate limiting. Uses a token bucket algorithm with the specified rate (tokens per second) and burst size. When tokens are exhausted, requests wait for availability.

MetricsProvider

Interface for metrics integration.

type MetricsProvider interface {
    OnStateChange(from, to State)
    OnProcessSuccess(duration time.Duration)
    OnProcessFailure(stage string, duration time.Duration)
    OnChangeReceived()
}
  • OnStateChange - Called on state transitions
  • OnProcessSuccess - Called after successful processing with duration
  • OnProcessFailure - Called on failure with stage name ("unmarshal", "validate", "pipeline", or "reducer" for Compose) and duration
  • OnChangeReceived - Called when raw data arrives from watcher

NoOpMetricsProvider

type NoOpMetricsProvider struct{}

A no-op implementation for testing or when metrics are not needed.

Validator

Interface that configuration types must implement.

type Validator interface {
    Validate() error
}

Return nil for valid configuration, or an error describing the validation failure.

Built-in Watchers

NewChannelWatcher

func NewChannelWatcher(ch <-chan []byte) *ChannelWatcher

Creates a watcher that forwards values from the given channel through an internal goroutine.

NewSyncChannelWatcher

func NewSyncChannelWatcher(ch <-chan []byte) *ChannelWatcher

Creates a watcher that returns the source channel directly without an intermediate goroutine. Use with SyncMode() for deterministic testing.

Constants

DefaultDebounce

const DefaultDebounce = 100 * time.Millisecond

Default debounce duration for change processing.

Next Steps