zoobzio December 10, 2025 Edit this page

Core Concepts

Flux has five primitives: Capacitor, Watcher, Validator, State, and Codec.

Capacitor

The central type. A Capacitor watches a source, processes changes, and delivers valid configurations to your callback.

capacitor := flux.New[Config](
    watcher,                                          // Data source
    func(ctx context.Context, prev, curr Config) error {  // Callback
        return app.Reconfigure(curr)
    },
)

The generic parameter [Config] must implement the Validator interface. Flux deserializes to this type and calls Validate() before invoking your callback.

Callback Signature

func(ctx context.Context, prev, curr T) error
  • ctx - Context for cancellation and deadlines
  • prev - Previous valid config (zero value on first load)
  • curr - New valid config
  • Return nil to accept, error to reject

The callback only runs after successful deserialization and validation. If it returns an error, the Capacitor enters Degraded state and retains the previous config.

Lifecycle

// Create
capacitor := flux.New[Config](watcher, callback)

// Start watching (blocks until first value processed)
err := capacitor.Start(ctx)

// Access current config
cfg, ok := capacitor.Current()

// Check health
state := capacitor.State()
lastErr := capacitor.LastError()

Start() blocks until the first configuration is processed. It returns any error from initial load but continues watching in the background.

Watcher

A Watcher observes a data source and emits raw bytes when changes occur.

type Watcher interface {
    Watch(ctx context.Context) (<-chan []byte, error)
}

Contract:

  1. Emit current value immediately upon Watch() being called
  2. Emit subsequent values when the source changes
  3. Close the channel when context is cancelled

Built-in Watchers

ChannelWatcher - Wraps an existing channel (useful for testing):

ch := make(chan []byte, 1)
ch <- []byte(`{"port": 8080}`)

watcher := flux.NewChannelWatcher(ch)         // Async (uses internal goroutine)
watcher := flux.NewSyncChannelWatcher(ch)     // Sync (for deterministic tests)

Provider Watchers

Each provider in pkg/ implements Watcher for a specific backend:

import "github.com/zoobz-io/flux/file"
import "github.com/zoobz-io/flux/redis"
import "github.com/zoobz-io/flux/consul"

file.New("/etc/myapp/config.json")
redis.New(client, "myapp:config")
consul.New(client, "myapp/config")

See Providers Guide for all options.

Validator

Configuration types must implement the Validator interface:

type Validator interface {
    Validate() error
}

This gives you full control over validation logic:

type Config struct {
    Port int    `json:"port"`
    Host string `json:"host"`
}

func (c Config) Validate() error {
    if c.Port < 1 || c.Port > 65535 {
        return fmt.Errorf("port must be between 1 and 65535")
    }
    if c.Host == "" {
        return errors.New("host is required")
    }
    return nil
}

For struct tag validation, you can integrate libraries like go-playground/validator within your Validate() method:

import "github.com/go-playground/validator/v10"

var validate = validator.New()

func (c Config) Validate() error {
    return validate.Struct(c)
}

State

Capacitor maintains one of four states:

const (
    StateLoading  State = iota  // Waiting for first value
    StateHealthy                // Valid config active
    StateDegraded               // Update failed, previous retained
    StateEmpty                  // No valid config ever obtained
)

Transitions

┌─────────┐   success   ┌─────────┐
│ Loading │────────────▶│ Healthy │◀──┐
└─────────┘             └─────────┘   │
     │                       │        │
     │ failure          failure│   success
     ▼                       ▼        │
┌─────────┐             ┌─────────┐───┘
│  Empty  │             │Degraded │
└─────────┘             └─────────┘
  • Loading → Healthy: First config loaded successfully
  • Loading → Empty: First config failed (parse, validation, or callback error)
  • Healthy → Degraded: Update failed, previous config retained
  • Degraded → Healthy: Valid config arrived, recovered

Checking State

switch capacitor.State() {
case flux.StateHealthy:
    cfg, _ := capacitor.Current()
    // Use cfg
case flux.StateDegraded:
    cfg, _ := capacitor.Current()  // Previous valid config
    log.Printf("degraded: %v", capacitor.LastError())
case flux.StateEmpty:
    log.Fatalf("no config: %v", capacitor.LastError())
}

Codec

A Codec handles deserialization from raw bytes to your struct.

type Codec interface {
    Unmarshal(data []byte, v any) error
}

Built-in Codecs

JSONCodec (default):

capacitor := flux.New[Config](watcher, callback)
// Uses JSON by default

YAMLCodec:

capacitor := flux.New[Config](
    watcher,
    callback,
).Codec(flux.YAMLCodec{})

Custom Codec

type TOMLCodec struct{}

func (TOMLCodec) Unmarshal(data []byte, v any) error {
    return toml.Unmarshal(data, v)
}

capacitor := flux.New[Config](
    watcher,
    callback,
).Codec(TOMLCodec{})

Options

Capacitor accepts pipeline options as constructor arguments and instance configuration via chainable methods:

capacitor := flux.New[Config](
    watcher,
    callback,
    // Pipeline options (retry, backoff, circuit breaker, etc.)
    flux.WithRetry[Config](3),
    flux.WithCircuitBreaker[Config](5, 30*time.Second),
).Debounce(200*time.Millisecond).  // Instance configuration
  Codec(flux.YAMLCodec{}).          // Use YAML instead of JSON
  SyncMode().                       // Disable async (for testing)
  Clock(fakeClock)                  // Custom clock (for testing)

Next Steps