zoobzio December 10, 2025 Edit this page

Architecture

Understanding flux internals helps you reason about behavior, timing, and failure modes.

Pipeline Overview

┌──────────┐    ┌───────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐
│  Watcher │───▶│  Debounce │───▶│  Codec   │───▶│ Validate │───▶│ Callback │
│          │    │           │    │          │    │          │    │          │
│ []byte   │    │ coalesce  │    │ unmarshal│    │ struct   │    │ prev,curr│
└──────────┘    └───────────┘    └──────────┘    └──────────┘    └──────────┘
                                       │              │               │
                                       ▼              ▼               ▼
                                   Transform     Validation       Apply
                                   Failed        Failed           Failed
                                   (signal)      (signal)         (signal)

Event Flow

Start Sequence

err := capacitor.Start(ctx)
  1. Start() calls watcher.Watch(ctx)
  2. Watcher emits current value immediately
  3. First value processed synchronously (no debounce)
  4. State transitions: Loading → Healthy (or Empty on failure)
  5. Start() returns (error if initial load failed)
  6. Background goroutine begins watching for changes

Change Sequence

  1. Watcher emits new value
  2. Debounce timer starts/resets
  3. After debounce duration, value is processed
  4. Codec unmarshals bytes to struct
  5. Validator checks struct tags
  6. Callback receives (prev, curr)
  7. State updated, signals emitted

Debouncing

Rapid changes are coalesced to prevent thrashing:

Time:     0ms    10ms   20ms   30ms   100ms  200ms
Events:   E1     E2     E3     E4
Debounce: [--100ms--]   [--100ms--]   [--100ms--]→ Process E4

Only the last value is processed after the debounce window.

// Default: 100ms
flux.New[Config](watcher, callback).Debounce(200*time.Millisecond)

First Value Exception

The first value from Start() is processed immediately without debouncing. This ensures initial configuration loads without delay.

Goroutine Model

Main Goroutine                    Watch Goroutine
──────────────                    ───────────────
capacitor.Start(ctx)
    │
    ├─▶ watcher.Watch(ctx)
    │       │
    │       └─▶ returns <-chan []byte
    │
    ├─▶ receive first value
    ├─▶ process synchronously
    ├─▶ spawn watch goroutine ─────▶ for { select { ... } }
    │                                     │
    └─▶ return                            ├─▶ receive value
                                          ├─▶ debounce
                                          └─▶ process

A single background goroutine handles all subsequent changes. The debounce timer runs within this goroutine.

State Machine

State is stored atomically and accessed lock-free:

type Capacitor[T any] struct {
    state     atomic.Int32
    current   atomic.Pointer[T]
    lastError atomic.Pointer[error]
    // ...
}

Thread Safety

  • State() - Always safe to call
  • Current() - Always safe to call
  • LastError() - Always safe to call
  • Start() - Call once only (enforced by mutex)
  • Process() - Sync mode only, single-threaded

Signal Emission

Flux emits capitan signals at key points:

SignalWhen
CapacitorStartedStart() called
CapacitorChangeReceivedWatcher emits value
CapacitorTransformFailedCodec unmarshal failed
CapacitorValidationFailedStruct validation failed
CapacitorApplyFailedCallback returned error
CapacitorApplySucceededCallback succeeded
CapacitorStateChangedState transition occurred
CapacitorStoppedWatch goroutine exited

Field Keys

Signals carry typed fields:

flux.KeyError     // Error message (string)
flux.KeyOldState  // Previous state (string)
flux.KeyNewState  // Current state (string)
flux.KeyState     // State at stop (string)
flux.KeyDebounce  // Debounce duration (time.Duration)

Failure Handling

Unmarshal Failure

Value arrives → Codec.Unmarshal fails → CapacitorTransformFailed emitted
                                      → State: Degraded (or Empty)
                                      → Previous config retained

Validation Failure

Struct created → validate.Struct fails → CapacitorValidationFailed emitted
                                       → State: Degraded (or Empty)
                                       → Previous config retained

Callback Failure

Valid struct → callback returns error → CapacitorApplyFailed emitted
                                      → State: Degraded (or Empty)
                                      → Previous config retained

In all cases, the Capacitor continues watching. A subsequent valid value recovers the system.

Sync Mode

For testing, sync mode disables the background goroutine:

capacitor := flux.New[Config](
    watcher,
    callback,
).SyncMode()

// Start processes first value only
capacitor.Start(ctx)

// Manually process subsequent values
capacitor.Process(ctx)  // Process next available value

This makes tests deterministic - no timing dependencies.

Clock Abstraction

Debouncing uses clockz for time operations:

fake := clockz.NewFakeClock()

capacitor := flux.New[Config](
    watcher,
    callback,
).Clock(fake)

// Advance time manually in tests
fake.Advance(100 * time.Millisecond)

Next Steps