zoobzio December 10, 2025 Edit this page

State Management

Flux maintains a state machine that tracks configuration health. This guide covers state transitions, failure handling, and recovery patterns.

State Overview

StateHas ConfigHas ErrorMeaning
LoadingNoNoWaiting for first value
HealthyYesNoValid config active
DegradedYesYesUpdate failed, previous retained
EmptyNoYesNo valid config ever obtained

Checking State

switch capacitor.State() {
case flux.StateLoading:
    log.Println("waiting for configuration...")

case flux.StateHealthy:
    cfg, _ := capacitor.Current()
    log.Printf("using config: %+v", cfg)

case flux.StateDegraded:
    cfg, _ := capacitor.Current()
    err := capacitor.LastError()
    log.Printf("using fallback config, error: %v", err)

case flux.StateEmpty:
    err := capacitor.LastError()
    log.Fatalf("no valid config: %v", err)
}

Initial Load Failure

When the first configuration fails:

err := capacitor.Start(ctx)
if err != nil {
    // State is Empty
    // Capacitor continues watching for valid config

    switch capacitor.State() {
    case flux.StateEmpty:
        // Option 1: Fatal
        log.Fatalf("cannot start without config: %v", err)

        // Option 2: Use defaults
        useDefaults()
        log.Printf("using defaults, waiting for config: %v", err)

        // Option 3: Wait for recovery
        go waitForHealthy(capacitor)
    }
}

Wait for Recovery

func waitForHealthy[T any](c *flux.Capacitor[T]) {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        if c.State() == flux.StateHealthy {
            log.Println("configuration recovered")
            return
        }
        log.Printf("waiting for valid config: %v", c.LastError())
    }
}

Degraded State

When an update fails but previous config exists:

capitan.Hook(flux.CapacitorStateChanged, func(ctx context.Context, e *capitan.Event) {
    newState, _ := flux.KeyNewState.From(e)

    if newState == flux.StateDegraded.String() {
        alerting.Send(alert{
            Severity: "warning",
            Message:  "config update failed, using previous version",
        })
    }
})

Monitoring Degraded Duration

func monitorState[T any](c *flux.Capacitor[T]) {
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        state := c.State()
        metrics.Gauge("config_state", stateToNumber(state))

        if state == flux.StateDegraded {
            metrics.Increment("config_degraded_seconds", 10)
        }
    }
}

func stateToNumber(s flux.State) float64 {
    switch s {
    case flux.StateHealthy:  return 1
    case flux.StateDegraded: return 0.5
    case flux.StateEmpty:    return 0
    default:                 return -1
    }
}

Automatic Recovery

Recovery happens automatically when a valid config arrives:

capitan.Hook(flux.CapacitorStateChanged, func(ctx context.Context, e *capitan.Event) {
    old, _ := flux.KeyOldState.From(e)
    new, _ := flux.KeyNewState.From(e)

    if old == flux.StateDegraded.String() && new == flux.StateHealthy.String() {
        log.Println("configuration recovered from degraded state")
        alerting.Resolve("config_degraded")
    }
})

Error Classification

Different signals for different error types:

// Parse errors (JSON/YAML syntax)
capitan.Hook(flux.CapacitorTransformFailed, func(ctx context.Context, e *capitan.Event) {
    err, _ := flux.KeyError.From(e)
    log.Printf("config parse error: %s", err)
    metrics.Increment("config_parse_errors")
})

// Validation errors (Validate() method returned error)
capitan.Hook(flux.CapacitorValidationFailed, func(ctx context.Context, e *capitan.Event) {
    err, _ := flux.KeyError.From(e)
    log.Printf("config validation error: %s", err)
    metrics.Increment("config_validation_errors")
})

// Callback errors (application-level)
capitan.Hook(flux.CapacitorApplyFailed, func(ctx context.Context, e *capitan.Event) {
    err, _ := flux.KeyError.From(e)
    log.Printf("config apply error: %s", err)
    metrics.Increment("config_apply_errors")
})

Graceful Degradation

Design your application to handle degraded state:

type Application struct {
    capacitor *flux.Capacitor[Config]
}

func (a *Application) Config() Config {
    cfg, ok := a.capacitor.Current()
    if !ok {
        // No valid config - return safe defaults
        return Config{
            MaxConnections: 10,
            Timeout:        30 * time.Second,
        }
    }
    return cfg
}

func (a *Application) IsHealthy() bool {
    return a.capacitor.State() == flux.StateHealthy
}

Health Check Endpoint

func (a *Application) HealthHandler(w http.ResponseWriter, r *http.Request) {
    state := a.capacitor.State()

    response := struct {
        Status string `json:"status"`
        Config string `json:"config"`
        Error  string `json:"error,omitempty"`
    }{
        Status: state.String(),
    }

    switch state {
    case flux.StateHealthy:
        w.WriteHeader(http.StatusOK)
        response.Config = "current"

    case flux.StateDegraded:
        w.WriteHeader(http.StatusOK)  // Still serving
        response.Config = "previous"
        if err := a.capacitor.LastError(); err != nil {
            response.Error = err.Error()
        }

    case flux.StateEmpty:
        w.WriteHeader(http.StatusServiceUnavailable)
        response.Config = "none"
        if err := a.capacitor.LastError(); err != nil {
            response.Error = err.Error()
        }

    default:
        w.WriteHeader(http.StatusServiceUnavailable)
        response.Config = "loading"
    }

    json.NewEncoder(w).Encode(response)
}

Error History

Track recent errors with ErrorHistorySize:

capacitor := flux.New[Config](
    watcher,
    callback,
).ErrorHistorySize(10)  // Keep last 10 errors

capacitor.Start(ctx)

// After some failures...
for _, err := range capacitor.ErrorHistory() {
    log.Printf("error: %v", err)
}

When enabled, ErrorHistory() returns errors oldest-first. Use LastError() for just the most recent error regardless of history setting.

Circuit Breaker

Protect against cascading failures with WithCircuitBreaker:

capacitor := flux.New[Config](
    watcher,
    callback,
    flux.WithCircuitBreaker[Config](5, 30*time.Second),
)

After 5 consecutive failures, the circuit opens and rejects further processing until 30 seconds have passed. The circuit breaker has three states:

  • Closed: Normal operation, requests pass through
  • Open: After threshold failures, requests are rejected immediately
  • Half-Open: After recovery timeout, one request is allowed to test recovery

The circuit breaker protects the entire pipeline. Any success resets the failure counter and closes the circuit.

Metrics Integration

Integrate with your metrics system via Metrics:

type MyMetrics struct{}

func (m *MyMetrics) OnStateChange(from, to flux.State) {
    metrics.Gauge("config_state", stateToNumber(to))
    metrics.Increment("config_state_changes")
}

func (m *MyMetrics) OnProcessSuccess(duration time.Duration) {
    metrics.Histogram("config_process_duration_ms", duration.Milliseconds())
    metrics.Increment("config_applies_total")
}

func (m *MyMetrics) OnProcessFailure(stage string, duration time.Duration) {
    metrics.Increment("config_failures_total", "stage", stage)
}

func (m *MyMetrics) OnChangeReceived() {
    metrics.Increment("config_changes_received_total")
}

// Usage
capacitor := flux.New[Config](
    watcher,
    callback,
).Metrics(&MyMetrics{})

Stages passed to OnProcessFailure: "unmarshal", "validate", "pipeline".

Shutdown Handling

Handle graceful shutdown with OnStop:

capacitor := flux.New[Config](
    watcher,
    callback,
).OnStop(func(finalState flux.State) {
    log.Printf("capacitor stopped in state: %s", finalState)
    metrics.Gauge("config_state", -1)  // Mark as stopped
})

The callback is invoked when the watch goroutine exits (context cancelled).

State History

Track all transitions for debugging:

type StateChange struct {
    Timestamp time.Time
    From      string
    To        string
}

var stateHistory []StateChange
var mu sync.Mutex

capitan.Hook(flux.CapacitorStateChanged, func(ctx context.Context, e *capitan.Event) {
    old, _ := flux.KeyOldState.From(e)
    new, _ := flux.KeyNewState.From(e)

    mu.Lock()
    stateHistory = append(stateHistory, StateChange{
        Timestamp: time.Now(),
        From:      old,
        To:        new,
    })
    mu.Unlock()
})

Next Steps