Architecture
Understanding flux internals helps you reason about behavior, timing, and failure modes.
Pipeline Overview
┌──────────┐ ┌───────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Watcher │───▶│ Debounce │───▶│ Codec │───▶│ Validate │───▶│ Callback │
│ │ │ │ │ │ │ │ │ │
│ []byte │ │ coalesce │ │ unmarshal│ │ struct │ │ prev,curr│
└──────────┘ └───────────┘ └──────────┘ └──────────┘ └──────────┘
│ │ │
▼ ▼ ▼
Transform Validation Apply
Failed Failed Failed
(signal) (signal) (signal)
Event Flow
Start Sequence
err := capacitor.Start(ctx)
Start()callswatcher.Watch(ctx)- Watcher emits current value immediately
- First value processed synchronously (no debounce)
- State transitions: Loading → Healthy (or Empty on failure)
Start()returns (error if initial load failed)- Background goroutine begins watching for changes
Change Sequence
- Watcher emits new value
- Debounce timer starts/resets
- After debounce duration, value is processed
- Codec unmarshals bytes to struct
- Validator checks struct tags
- Callback receives (prev, curr)
- State updated, signals emitted
Debouncing
Rapid changes are coalesced to prevent thrashing:
Time: 0ms 10ms 20ms 30ms 100ms 200ms
Events: E1 E2 E3 E4
Debounce: [--100ms--] [--100ms--] [--100ms--]→ Process E4
Only the last value is processed after the debounce window.
// Default: 100ms
flux.New[Config](watcher, callback).Debounce(200*time.Millisecond)
First Value Exception
The first value from Start() is processed immediately without debouncing. This ensures initial configuration loads without delay.
Goroutine Model
Main Goroutine Watch Goroutine
────────────── ───────────────
capacitor.Start(ctx)
│
├─▶ watcher.Watch(ctx)
│ │
│ └─▶ returns <-chan []byte
│
├─▶ receive first value
├─▶ process synchronously
├─▶ spawn watch goroutine ─────▶ for { select { ... } }
│ │
└─▶ return ├─▶ receive value
├─▶ debounce
└─▶ process
A single background goroutine handles all subsequent changes. The debounce timer runs within this goroutine.
State Machine
State is stored atomically and accessed lock-free:
type Capacitor[T any] struct {
state atomic.Int32
current atomic.Pointer[T]
lastError atomic.Pointer[error]
// ...
}
Thread Safety
State()- Always safe to callCurrent()- Always safe to callLastError()- Always safe to callStart()- Call once only (enforced by mutex)Process()- Sync mode only, single-threaded
Signal Emission
Flux emits capitan signals at key points:
| Signal | When |
|---|---|
CapacitorStarted | Start() called |
CapacitorChangeReceived | Watcher emits value |
CapacitorTransformFailed | Codec unmarshal failed |
CapacitorValidationFailed | Struct validation failed |
CapacitorApplyFailed | Callback returned error |
CapacitorApplySucceeded | Callback succeeded |
CapacitorStateChanged | State transition occurred |
CapacitorStopped | Watch goroutine exited |
Field Keys
Signals carry typed fields:
flux.KeyError // Error message (string)
flux.KeyOldState // Previous state (string)
flux.KeyNewState // Current state (string)
flux.KeyState // State at stop (string)
flux.KeyDebounce // Debounce duration (time.Duration)
Failure Handling
Unmarshal Failure
Value arrives → Codec.Unmarshal fails → CapacitorTransformFailed emitted
→ State: Degraded (or Empty)
→ Previous config retained
Validation Failure
Struct created → validate.Struct fails → CapacitorValidationFailed emitted
→ State: Degraded (or Empty)
→ Previous config retained
Callback Failure
Valid struct → callback returns error → CapacitorApplyFailed emitted
→ State: Degraded (or Empty)
→ Previous config retained
In all cases, the Capacitor continues watching. A subsequent valid value recovers the system.
Sync Mode
For testing, sync mode disables the background goroutine:
capacitor := flux.New[Config](
watcher,
callback,
).SyncMode()
// Start processes first value only
capacitor.Start(ctx)
// Manually process subsequent values
capacitor.Process(ctx) // Process next available value
This makes tests deterministic - no timing dependencies.
Clock Abstraction
Debouncing uses clockz for time operations:
fake := clockz.NewFakeClock()
capacitor := flux.New[Config](
watcher,
callback,
).Clock(fake)
// Advance time manually in tests
fake.Advance(100 * time.Millisecond)
Next Steps
- Testing Guide - Sync mode and fake clocks
- API Reference - Complete API documentation