State Management
Flux maintains a state machine that tracks configuration health. This guide covers state transitions, failure handling, and recovery patterns.
State Overview
| State | Has Config | Has Error | Meaning |
|---|---|---|---|
| Loading | No | No | Waiting for first value |
| Healthy | Yes | No | Valid config active |
| Degraded | Yes | Yes | Update failed, previous retained |
| Empty | No | Yes | No valid config ever obtained |
Checking State
switch capacitor.State() {
case flux.StateLoading:
log.Println("waiting for configuration...")
case flux.StateHealthy:
cfg, _ := capacitor.Current()
log.Printf("using config: %+v", cfg)
case flux.StateDegraded:
cfg, _ := capacitor.Current()
err := capacitor.LastError()
log.Printf("using fallback config, error: %v", err)
case flux.StateEmpty:
err := capacitor.LastError()
log.Fatalf("no valid config: %v", err)
}
Initial Load Failure
When the first configuration fails:
err := capacitor.Start(ctx)
if err != nil {
// State is Empty
// Capacitor continues watching for valid config
switch capacitor.State() {
case flux.StateEmpty:
// Option 1: Fatal
log.Fatalf("cannot start without config: %v", err)
// Option 2: Use defaults
useDefaults()
log.Printf("using defaults, waiting for config: %v", err)
// Option 3: Wait for recovery
go waitForHealthy(capacitor)
}
}
Wait for Recovery
func waitForHealthy[T any](c *flux.Capacitor[T]) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
if c.State() == flux.StateHealthy {
log.Println("configuration recovered")
return
}
log.Printf("waiting for valid config: %v", c.LastError())
}
}
Degraded State
When an update fails but previous config exists:
capitan.Hook(flux.CapacitorStateChanged, func(ctx context.Context, e *capitan.Event) {
newState, _ := flux.KeyNewState.From(e)
if newState == flux.StateDegraded.String() {
alerting.Send(alert{
Severity: "warning",
Message: "config update failed, using previous version",
})
}
})
Monitoring Degraded Duration
func monitorState[T any](c *flux.Capacitor[T]) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for range ticker.C {
state := c.State()
metrics.Gauge("config_state", stateToNumber(state))
if state == flux.StateDegraded {
metrics.Increment("config_degraded_seconds", 10)
}
}
}
func stateToNumber(s flux.State) float64 {
switch s {
case flux.StateHealthy: return 1
case flux.StateDegraded: return 0.5
case flux.StateEmpty: return 0
default: return -1
}
}
Automatic Recovery
Recovery happens automatically when a valid config arrives:
capitan.Hook(flux.CapacitorStateChanged, func(ctx context.Context, e *capitan.Event) {
old, _ := flux.KeyOldState.From(e)
new, _ := flux.KeyNewState.From(e)
if old == flux.StateDegraded.String() && new == flux.StateHealthy.String() {
log.Println("configuration recovered from degraded state")
alerting.Resolve("config_degraded")
}
})
Error Classification
Different signals for different error types:
// Parse errors (JSON/YAML syntax)
capitan.Hook(flux.CapacitorTransformFailed, func(ctx context.Context, e *capitan.Event) {
err, _ := flux.KeyError.From(e)
log.Printf("config parse error: %s", err)
metrics.Increment("config_parse_errors")
})
// Validation errors (Validate() method returned error)
capitan.Hook(flux.CapacitorValidationFailed, func(ctx context.Context, e *capitan.Event) {
err, _ := flux.KeyError.From(e)
log.Printf("config validation error: %s", err)
metrics.Increment("config_validation_errors")
})
// Callback errors (application-level)
capitan.Hook(flux.CapacitorApplyFailed, func(ctx context.Context, e *capitan.Event) {
err, _ := flux.KeyError.From(e)
log.Printf("config apply error: %s", err)
metrics.Increment("config_apply_errors")
})
Graceful Degradation
Design your application to handle degraded state:
type Application struct {
capacitor *flux.Capacitor[Config]
}
func (a *Application) Config() Config {
cfg, ok := a.capacitor.Current()
if !ok {
// No valid config - return safe defaults
return Config{
MaxConnections: 10,
Timeout: 30 * time.Second,
}
}
return cfg
}
func (a *Application) IsHealthy() bool {
return a.capacitor.State() == flux.StateHealthy
}
Health Check Endpoint
func (a *Application) HealthHandler(w http.ResponseWriter, r *http.Request) {
state := a.capacitor.State()
response := struct {
Status string `json:"status"`
Config string `json:"config"`
Error string `json:"error,omitempty"`
}{
Status: state.String(),
}
switch state {
case flux.StateHealthy:
w.WriteHeader(http.StatusOK)
response.Config = "current"
case flux.StateDegraded:
w.WriteHeader(http.StatusOK) // Still serving
response.Config = "previous"
if err := a.capacitor.LastError(); err != nil {
response.Error = err.Error()
}
case flux.StateEmpty:
w.WriteHeader(http.StatusServiceUnavailable)
response.Config = "none"
if err := a.capacitor.LastError(); err != nil {
response.Error = err.Error()
}
default:
w.WriteHeader(http.StatusServiceUnavailable)
response.Config = "loading"
}
json.NewEncoder(w).Encode(response)
}
Error History
Track recent errors with ErrorHistorySize:
capacitor := flux.New[Config](
watcher,
callback,
).ErrorHistorySize(10) // Keep last 10 errors
capacitor.Start(ctx)
// After some failures...
for _, err := range capacitor.ErrorHistory() {
log.Printf("error: %v", err)
}
When enabled, ErrorHistory() returns errors oldest-first. Use LastError() for just the most recent error regardless of history setting.
Circuit Breaker
Protect against cascading failures with WithCircuitBreaker:
capacitor := flux.New[Config](
watcher,
callback,
flux.WithCircuitBreaker[Config](5, 30*time.Second),
)
After 5 consecutive failures, the circuit opens and rejects further processing until 30 seconds have passed. The circuit breaker has three states:
- Closed: Normal operation, requests pass through
- Open: After threshold failures, requests are rejected immediately
- Half-Open: After recovery timeout, one request is allowed to test recovery
The circuit breaker protects the entire pipeline. Any success resets the failure counter and closes the circuit.
Metrics Integration
Integrate with your metrics system via Metrics:
type MyMetrics struct{}
func (m *MyMetrics) OnStateChange(from, to flux.State) {
metrics.Gauge("config_state", stateToNumber(to))
metrics.Increment("config_state_changes")
}
func (m *MyMetrics) OnProcessSuccess(duration time.Duration) {
metrics.Histogram("config_process_duration_ms", duration.Milliseconds())
metrics.Increment("config_applies_total")
}
func (m *MyMetrics) OnProcessFailure(stage string, duration time.Duration) {
metrics.Increment("config_failures_total", "stage", stage)
}
func (m *MyMetrics) OnChangeReceived() {
metrics.Increment("config_changes_received_total")
}
// Usage
capacitor := flux.New[Config](
watcher,
callback,
).Metrics(&MyMetrics{})
Stages passed to OnProcessFailure: "unmarshal", "validate", "pipeline".
Shutdown Handling
Handle graceful shutdown with OnStop:
capacitor := flux.New[Config](
watcher,
callback,
).OnStop(func(finalState flux.State) {
log.Printf("capacitor stopped in state: %s", finalState)
metrics.Gauge("config_state", -1) // Mark as stopped
})
The callback is invoked when the watch goroutine exits (context cancelled).
State History
Track all transitions for debugging:
type StateChange struct {
Timestamp time.Time
From string
To string
}
var stateHistory []StateChange
var mu sync.Mutex
capitan.Hook(flux.CapacitorStateChanged, func(ctx context.Context, e *capitan.Event) {
old, _ := flux.KeyOldState.From(e)
new, _ := flux.KeyNewState.From(e)
mu.Lock()
stateHistory = append(stateHistory, StateChange{
Timestamp: time.Now(),
From: old,
To: new,
})
mu.Unlock()
})
Next Steps
- Best Practices - Production patterns
- Testing Guide - Testing state transitions