API Reference
Complete API reference for github.com/zoobz-io/flux.
Types
Watcher
Interface for configuration sources.
type Watcher interface {
Watch(ctx context.Context) (<-chan []byte, error)
}
Implementations must emit the current value immediately upon Watch() being called, then emit subsequent values when the source changes. The channel closes when context is cancelled.
Request
Carries configuration data through the processing pipeline.
type Request[T Validator] struct {
Previous T // Last successfully applied configuration
Current T // Newly parsed and validated configuration
Raw []byte // Original bytes from watcher
}
Reducer
Merges multiple configuration sources into a single configuration.
type Reducer[T Validator] func(ctx context.Context, prev, curr []T) (T, error)
Receives the previous merged values (nil on first call) and the current parsed values from each source in the same order as the sources were provided.
Capacitor
Watches a single source for configuration changes.
type Capacitor[T Validator] struct {
// unexported fields
}
New
func New[T Validator](
watcher Watcher,
fn func(ctx context.Context, prev, curr T) error,
opts ...Option[T],
) *Capacitor[T]
Creates a Capacitor that watches a source for configuration changes. The type T must implement the Validator interface. The watcher emits raw bytes, which are unmarshaled to type T using the configured codec (JSON by default). Validation is performed by calling T.Validate(). On success, the callback is invoked with the previous and current configuration values.
Start
func (c *Capacitor[T]) Start(ctx context.Context) error
Begins watching. Blocks until the first value is processed, then continues asynchronously. Returns error if initial load fails (but continues watching). Can only be called once.
State
func (c *Capacitor[T]) State() State
Returns the current state: StateLoading, StateHealthy, StateDegraded, or StateEmpty.
Current
func (c *Capacitor[T]) Current() (T, bool)
Returns the current valid configuration and true, or zero value and false if none exists.
LastError
func (c *Capacitor[T]) LastError() error
Returns the last error, or nil if no error occurred.
ErrorHistory
func (c *Capacitor[T]) ErrorHistory() []error
Returns recent errors (oldest first) if error history is enabled via ErrorHistorySize. Returns nil if not enabled.
Process
func (c *Capacitor[T]) Process(ctx context.Context) bool
Manually processes the next value. Only available in sync mode. Returns true if a value was processed.
CompositeCapacitor
Watches multiple sources and merges their configurations.
type CompositeCapacitor[T Validator] struct {
// unexported fields
}
Compose
func Compose[T Validator](
reducer Reducer[T],
sources []Watcher,
opts ...Option[T],
) *CompositeCapacitor[T]
Creates a CompositeCapacitor. The type T must implement the Validator interface. Each source emits raw bytes, unmarshaled and validated to type T. The reducer receives previous and current slices (in source order) and returns the merged configuration.
Methods
CompositeCapacitor has the same methods as Capacitor: Start, State, Current, LastError, ErrorHistory, Process.
Additional method:
func (c *CompositeCapacitor[T]) SourceErrors() []SourceError
Returns errors from individual sources for granular diagnostics.
State
Configuration health state.
type State int
const (
StateLoading State = iota // Waiting for first value
StateHealthy // Valid config active
StateDegraded // Update failed, previous retained
StateEmpty // No valid config ever obtained
)
String
func (s State) String() string
Returns string representation: "loading", "healthy", "degraded", or "empty".
Codec
Interface for configuration deserialization.
type Codec interface {
Unmarshal(data []byte, v any) error
ContentType() string
}
JSONCodec
JSON deserialization codec (default).
type JSONCodec struct{}
func (JSONCodec) Unmarshal(data []byte, v any) error
YAMLCodec
YAML deserialization codec.
type YAMLCodec struct{}
func (YAMLCodec) Unmarshal(data []byte, v any) error
SourceError
Error from a specific source in CompositeCapacitor.
type SourceError struct {
Index int
Error error
}
Instance Configuration
Instance configuration methods are chainable and configure the Capacitor or CompositeCapacitor behavior. These methods can be called on the returned instance after calling New() or Compose().
Debounce
func (c *Capacitor[T]) Debounce(d time.Duration) *Capacitor[T]
Sets debounce duration. Changes within this duration are coalesced. Default: 100ms.
Codec
func (c *Capacitor[T]) Codec(codec Codec) *Capacitor[T]
Sets the codec for deserialization. Default: JSONCodec{}.
SyncMode
func (c *Capacitor[T]) SyncMode() *Capacitor[T]
Enables synchronous processing for testing. Disables async goroutines and debouncing.
Clock
func (c *Capacitor[T]) Clock(clock clockz.Clock) *Capacitor[T]
Sets a custom clock for time operations. Use with clockz.FakeClock for testing debounce behavior.
StartupTimeout
func (c *Capacitor[T]) StartupTimeout(d time.Duration) *Capacitor[T]
Sets the maximum duration to wait for the initial configuration value from the watcher. If the watcher fails to emit within this duration, Start() returns an error. Default: no timeout (wait indefinitely).
Metrics
func (c *Capacitor[T]) Metrics(provider MetricsProvider) *Capacitor[T]
Sets a metrics provider for observability integration. The provider receives callbacks on state changes, processing success/failure, and change events. See MetricsProvider.
OnStop
func (c *Capacitor[T]) OnStop(fn func(State)) *Capacitor[T]
Sets a callback invoked when the capacitor stops watching. The callback receives the final state. Useful for graceful shutdown scenarios.
ErrorHistorySize
func (c *Capacitor[T]) ErrorHistorySize(n int) *Capacitor[T]
Sets the number of recent errors to retain. When set, ErrorHistory() returns up to this many recent errors (oldest first). Use 0 (default) to only track the most recent error via LastError().
Pipeline Options
Pipeline options wrap the processing pipeline with middleware at the boundary level. These are powered by pipz.
WithRetry
func WithRetry[T Validator](maxAttempts int) Option[T]
Wraps the pipeline with retry logic. Failed operations are retried immediately up to maxAttempts times.
WithBackoff
func WithBackoff[T Validator](maxAttempts int, baseDelay time.Duration) Option[T]
Wraps the pipeline with exponential backoff retry logic. Failed operations are retried with increasing delays: baseDelay, 2*baseDelay, 4*baseDelay, etc.
WithTimeout
func WithTimeout[T Validator](d time.Duration) Option[T]
Wraps the pipeline with a timeout. If processing takes longer than the specified duration, the operation fails with a timeout error.
WithFallback
func WithFallback[T Validator](fallbacks ...pipz.Chainable[*Request[T]]) Option[T]
Wraps the pipeline with fallback processors. If the primary pipeline (including the callback) fails, each fallback is tried in order until one succeeds.
WithCircuitBreaker
func WithCircuitBreaker[T Validator](failures int, recovery time.Duration) Option[T]
Wraps the pipeline with circuit breaker protection. After failures consecutive failures, the circuit opens and rejects further requests until recovery time has passed.
The circuit breaker has three states:
- Closed: Normal operation, requests pass through
- Open: After threshold failures, requests are rejected immediately
- Half-Open: After recovery timeout, one request is allowed to test recovery
WithErrorHandler
func WithErrorHandler[T Validator](handler pipz.Chainable[*pipz.Error[*Request[T]]]) Option[T]
Adds error observation to the pipeline. Errors are passed to the handler for logging, metrics, or alerting, but the error still propagates. Use this for observability, not recovery.
WithPipeline
func WithPipeline[T Validator](identity pipz.Identity) Option[T]
Wraps the entire processing pipeline with a pipz.Pipeline for correlated tracing. Each Process() call generates a unique execution ID, while the pipeline ID remains stable (derived from the identity).
Use pipz.ExecutionIDFromContext and pipz.PipelineIDFromContext in middleware or signal handlers to extract correlation IDs for observability.
This option should typically be applied last (outermost) to ensure all nested processors have access to the correlation context.
Example:
var configPipelineID = pipz.NewIdentity("myapp:config", "Configuration pipeline")
capacitor := flux.New[Config](
watcher,
callback,
flux.WithRetry[Config](3),
flux.WithPipeline[Config](configPipelineID),
)
WithMiddleware
func WithMiddleware[T Validator](processors ...pipz.Chainable[*Request[T]]) Option[T]
Wraps the pipeline with a sequence of processors. Processors execute in order, with the wrapped pipeline (callback) last. Use the Use* functions to create processors for common patterns, or provide custom pipz.Chainable implementations directly.
Example:
var (
logID = pipz.NewIdentity("myapp:log", "Logs config changes")
enrichID = pipz.NewIdentity("myapp:enrich", "Enriches config")
markID = pipz.NewIdentity("myapp:mark", "Marks config")
)
flux.New[Config](
watcher,
callback,
flux.WithMiddleware(
flux.UseEffect[Config](logID, logFn),
flux.UseApply[Config](enrichID, enrichFn),
flux.UseRateLimit[Config](10, 5,
flux.UseTransform[Config](markID, markFn),
),
),
flux.WithCircuitBreaker[Config](5, 30*time.Second),
)
Middleware Processors
These functions create processors for use inside WithMiddleware. They transform or observe the request as it flows through the pipeline.
Adapters
Adapters convert functions into pipz processors.
UseTransform
func UseTransform[T Validator](identity pipz.Identity, fn func(context.Context, *Request[T]) *Request[T]) pipz.Chainable[*Request[T]]
Creates a processor that transforms the request. Cannot fail. Use for pure transformations that always succeed.
UseApply
func UseApply[T Validator](identity pipz.Identity, fn func(context.Context, *Request[T]) (*Request[T], error)) pipz.Chainable[*Request[T]]
Creates a processor that can transform the request and fail. Use for operations like enrichment, validation, or transformation that may produce errors.
UseEffect
func UseEffect[T Validator](identity pipz.Identity, fn func(context.Context, *Request[T]) error) pipz.Chainable[*Request[T]]
Creates a processor that performs a side effect. The request passes through unchanged. Use for logging, metrics, or notifications.
UseMutate
func UseMutate[T Validator](identity pipz.Identity, transformer func(context.Context, *Request[T]) *Request[T], condition func(context.Context, *Request[T]) bool) pipz.Chainable[*Request[T]]
Creates a processor that conditionally transforms the request. The transformer is only applied if the condition returns true.
UseEnrich
func UseEnrich[T Validator](identity pipz.Identity, fn func(context.Context, *Request[T]) (*Request[T], error)) pipz.Chainable[*Request[T]]
Creates a processor that attempts optional enhancement. If the enrichment fails, the error is logged but processing continues with the original request. Use for non-critical enhancements.
Wrapping Processors
These wrap another processor with reliability logic.
UseRetry
func UseRetry[T Validator](maxAttempts int, processor pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]
Wraps a processor with retry logic. Failed operations are retried immediately up to maxAttempts times.
UseBackoff
func UseBackoff[T Validator](maxAttempts int, baseDelay time.Duration, processor pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]
Wraps a processor with exponential backoff retry logic. Failed operations are retried with increasing delays.
UseTimeout
func UseTimeout[T Validator](d time.Duration, processor pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]
Wraps a processor with a deadline. If processing takes longer than the specified duration, the operation fails.
UseFallback
func UseFallback[T Validator](primary pipz.Chainable[*Request[T]], fallbacks ...pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]
Wraps a processor with fallback alternatives. If the primary fails, each fallback is tried in order.
UseFilter
func UseFilter[T Validator](identity pipz.Identity, condition func(context.Context, *Request[T]) bool, processor pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]
Wraps a processor with a condition. If the condition returns false, the request passes through unchanged.
UseRateLimit
func UseRateLimit[T Validator](rate float64, burst int, processor pipz.Chainable[*Request[T]]) pipz.Chainable[*Request[T]]
Wraps a processor with rate limiting. Uses a token bucket algorithm with the specified rate (tokens per second) and burst size. When tokens are exhausted, requests wait for availability.
MetricsProvider
Interface for metrics integration.
type MetricsProvider interface {
OnStateChange(from, to State)
OnProcessSuccess(duration time.Duration)
OnProcessFailure(stage string, duration time.Duration)
OnChangeReceived()
}
OnStateChange- Called on state transitionsOnProcessSuccess- Called after successful processing with durationOnProcessFailure- Called on failure with stage name ("unmarshal", "validate", "pipeline", or "reducer" for Compose) and durationOnChangeReceived- Called when raw data arrives from watcher
NoOpMetricsProvider
type NoOpMetricsProvider struct{}
A no-op implementation for testing or when metrics are not needed.
Validator
Interface that configuration types must implement.
type Validator interface {
Validate() error
}
Return nil for valid configuration, or an error describing the validation failure.
Built-in Watchers
NewChannelWatcher
func NewChannelWatcher(ch <-chan []byte) *ChannelWatcher
Creates a watcher that forwards values from the given channel through an internal goroutine.
NewSyncChannelWatcher
func NewSyncChannelWatcher(ch <-chan []byte) *ChannelWatcher
Creates a watcher that returns the source channel directly without an intermediate goroutine. Use with SyncMode() for deterministic testing.
Constants
DefaultDebounce
const DefaultDebounce = 100 * time.Millisecond
Default debounce duration for change processing.
Next Steps
- Fields Reference - Signals and keys for observability
- Providers Reference - Provider-specific API