Core Concepts
Flux has five primitives: Capacitor, Watcher, Validator, State, and Codec.
Capacitor
The central type. A Capacitor watches a source, processes changes, and delivers valid configurations to your callback.
capacitor := flux.New[Config](
watcher, // Data source
func(ctx context.Context, prev, curr Config) error { // Callback
return app.Reconfigure(curr)
},
)
The generic parameter [Config] must implement the Validator interface. Flux deserializes to this type and calls Validate() before invoking your callback.
Callback Signature
func(ctx context.Context, prev, curr T) error
ctx- Context for cancellation and deadlinesprev- Previous valid config (zero value on first load)curr- New valid config- Return
nilto accept, error to reject
The callback only runs after successful deserialization and validation. If it returns an error, the Capacitor enters Degraded state and retains the previous config.
Lifecycle
// Create
capacitor := flux.New[Config](watcher, callback)
// Start watching (blocks until first value processed)
err := capacitor.Start(ctx)
// Access current config
cfg, ok := capacitor.Current()
// Check health
state := capacitor.State()
lastErr := capacitor.LastError()
Start() blocks until the first configuration is processed. It returns any error from initial load but continues watching in the background.
Watcher
A Watcher observes a data source and emits raw bytes when changes occur.
type Watcher interface {
Watch(ctx context.Context) (<-chan []byte, error)
}
Contract:
- Emit current value immediately upon
Watch()being called - Emit subsequent values when the source changes
- Close the channel when context is cancelled
Built-in Watchers
ChannelWatcher - Wraps an existing channel (useful for testing):
ch := make(chan []byte, 1)
ch <- []byte(`{"port": 8080}`)
watcher := flux.NewChannelWatcher(ch) // Async (uses internal goroutine)
watcher := flux.NewSyncChannelWatcher(ch) // Sync (for deterministic tests)
Provider Watchers
Each provider in pkg/ implements Watcher for a specific backend:
import "github.com/zoobz-io/flux/file"
import "github.com/zoobz-io/flux/redis"
import "github.com/zoobz-io/flux/consul"
file.New("/etc/myapp/config.json")
redis.New(client, "myapp:config")
consul.New(client, "myapp/config")
See Providers Guide for all options.
Validator
Configuration types must implement the Validator interface:
type Validator interface {
Validate() error
}
This gives you full control over validation logic:
type Config struct {
Port int `json:"port"`
Host string `json:"host"`
}
func (c Config) Validate() error {
if c.Port < 1 || c.Port > 65535 {
return fmt.Errorf("port must be between 1 and 65535")
}
if c.Host == "" {
return errors.New("host is required")
}
return nil
}
For struct tag validation, you can integrate libraries like go-playground/validator within your Validate() method:
import "github.com/go-playground/validator/v10"
var validate = validator.New()
func (c Config) Validate() error {
return validate.Struct(c)
}
State
Capacitor maintains one of four states:
const (
StateLoading State = iota // Waiting for first value
StateHealthy // Valid config active
StateDegraded // Update failed, previous retained
StateEmpty // No valid config ever obtained
)
Transitions
┌─────────┐ success ┌─────────┐
│ Loading │────────────▶│ Healthy │◀──┐
└─────────┘ └─────────┘ │
│ │ │
│ failure failure│ success
▼ ▼ │
┌─────────┐ ┌─────────┐───┘
│ Empty │ │Degraded │
└─────────┘ └─────────┘
- Loading → Healthy: First config loaded successfully
- Loading → Empty: First config failed (parse, validation, or callback error)
- Healthy → Degraded: Update failed, previous config retained
- Degraded → Healthy: Valid config arrived, recovered
Checking State
switch capacitor.State() {
case flux.StateHealthy:
cfg, _ := capacitor.Current()
// Use cfg
case flux.StateDegraded:
cfg, _ := capacitor.Current() // Previous valid config
log.Printf("degraded: %v", capacitor.LastError())
case flux.StateEmpty:
log.Fatalf("no config: %v", capacitor.LastError())
}
Codec
A Codec handles deserialization from raw bytes to your struct.
type Codec interface {
Unmarshal(data []byte, v any) error
}
Built-in Codecs
JSONCodec (default):
capacitor := flux.New[Config](watcher, callback)
// Uses JSON by default
YAMLCodec:
capacitor := flux.New[Config](
watcher,
callback,
).Codec(flux.YAMLCodec{})
Custom Codec
type TOMLCodec struct{}
func (TOMLCodec) Unmarshal(data []byte, v any) error {
return toml.Unmarshal(data, v)
}
capacitor := flux.New[Config](
watcher,
callback,
).Codec(TOMLCodec{})
Options
Capacitor accepts pipeline options as constructor arguments and instance configuration via chainable methods:
capacitor := flux.New[Config](
watcher,
callback,
// Pipeline options (retry, backoff, circuit breaker, etc.)
flux.WithRetry[Config](3),
flux.WithCircuitBreaker[Config](5, 30*time.Second),
).Debounce(200*time.Millisecond). // Instance configuration
Codec(flux.YAMLCodec{}). // Use YAML instead of JSON
SyncMode(). // Disable async (for testing)
Clock(fakeClock) // Custom clock (for testing)
Next Steps
- Architecture - Internals and event flow
- Testing Guide - Sync mode and fake clocks
- State Management - Recovery patterns