Testing
Flux provides two mechanisms for deterministic testing:
- Sync Mode - Manual, step-by-step processing
- Fake Clock - Controlled time for debounce testing
Sync Mode
Sync mode disables async processing. You control exactly when values are processed.
Basic Usage
func TestConfigValidation(t *testing.T) {
ch := make(chan []byte, 2)
ch <- []byte(`{"port": 8080}`)
ch <- []byte(`{"port": -1}`) // Invalid
var applied Config
capacitor := flux.New[Config](
flux.NewSyncChannelWatcher(ch), // Use sync watcher for deterministic tests
func(ctx context.Context, prev, curr Config) error {
applied = curr
return nil
},
).SyncMode()
ctx := context.Background()
// Initial load - processed synchronously
err := capacitor.Start(ctx)
require.NoError(t, err)
assert.Equal(t, 8080, applied.Port)
assert.Equal(t, flux.StateHealthy, capacitor.State())
// Second value - must call Process() explicitly
capacitor.Process(ctx)
assert.Equal(t, 8080, applied.Port) // Unchanged - validation failed
assert.Equal(t, flux.StateDegraded, capacitor.State())
}
Key Points
- Use
NewSyncChannelWatcherwithSyncMode()for fully deterministic tests Start()processes the first value synchronouslyProcess(ctx)processes the next available valueProcess()returnstrueif a value was available- No background goroutines, no timing dependencies
Testing State Transitions
func TestStateTransitions(t *testing.T) {
ch := make(chan []byte, 3)
ch <- []byte(`{"valid": true}`) // Healthy
ch <- []byte(`{"valid": false}`) // Degraded (callback fails)
ch <- []byte(`{"valid": true}`) // Recovery
capacitor := flux.New[Config](
flux.NewSyncChannelWatcher(ch),
func(ctx context.Context, prev, curr Config) error {
if !curr.Valid {
return errors.New("invalid config")
}
return nil
},
).SyncMode()
ctx := context.Background()
capacitor.Start(ctx)
assert.Equal(t, flux.StateHealthy, capacitor.State())
capacitor.Process(ctx)
assert.Equal(t, flux.StateDegraded, capacitor.State())
assert.Error(t, capacitor.LastError())
capacitor.Process(ctx)
assert.Equal(t, flux.StateHealthy, capacitor.State())
assert.NoError(t, capacitor.LastError())
}
Fake Clock
For testing debounce behavior, use a fake clock:
import "github.com/zoobz-io/clockz"
func TestDebouncing(t *testing.T) {
fake := clockz.NewFakeClock()
ch := make(chan []byte, 3)
var callCount int
capacitor := flux.New[Config](
flux.NewSyncChannelWatcher(ch),
func(ctx context.Context, prev, curr Config) error {
callCount++
return nil
},
).Clock(fake).Debounce(100*time.Millisecond)
ctx := context.Background()
// Send initial value
ch <- []byte(`{"port": 8080}`)
capacitor.Start(ctx)
assert.Equal(t, 1, callCount) // Initial processed immediately
// Send rapid updates
ch <- []byte(`{"port": 8081}`)
ch <- []byte(`{"port": 8082}`)
ch <- []byte(`{"port": 8083}`)
// Advance time past debounce
fake.Advance(150 * time.Millisecond)
// Only last value processed
assert.Equal(t, 2, callCount)
}
Testing Signals
Use capitan's testing helpers to verify signal emission:
import capitantesting "github.com/zoobz-io/capitan/testing"
func TestSignalEmission(t *testing.T) {
c := capitan.New(capitan.WithSyncMode())
defer c.Shutdown()
capture := capitantesting.NewEventCapture()
c.Observe(capture.Observer())
ch := make(chan []byte, 1)
ch <- []byte(`{"port": "invalid"}`) // Wrong type
capacitor := flux.New[Config](
flux.NewSyncChannelWatcher(ch),
func(ctx context.Context, prev, curr Config) error { return nil },
).SyncMode()
capacitor.Start(context.Background())
// Verify transform failed signal
events := capture.Events()
assert.Contains(t, events, flux.CapacitorTransformFailed)
}
Testing Prev/Curr Values
The callback receives both previous and current values:
func TestPrevCurrValues(t *testing.T) {
ch := make(chan []byte, 2)
ch <- []byte(`{"version": 1}`)
ch <- []byte(`{"version": 2}`)
var prevVersions, currVersions []int
capacitor := flux.New[Config](
flux.NewSyncChannelWatcher(ch),
func(ctx context.Context, prev, curr Config) error {
prevVersions = append(prevVersions, prev.Version)
currVersions = append(currVersions, curr.Version)
return nil
},
).SyncMode()
ctx := context.Background()
capacitor.Start(ctx)
capacitor.Process(ctx)
assert.Equal(t, []int{0, 1}, prevVersions) // Zero value, then 1
assert.Equal(t, []int{1, 2}, currVersions)
}
Testing Validation Errors
type Config struct {
Port int `json:"port" validate:"min=1,max=65535"`
}
func TestValidationError(t *testing.T) {
ch := make(chan []byte, 1)
ch <- []byte(`{"port": 99999}`) // Exceeds max
capacitor := flux.New[Config](
flux.NewSyncChannelWatcher(ch),
func(ctx context.Context, prev, curr Config) error { return nil },
).SyncMode()
err := capacitor.Start(context.Background())
assert.Error(t, err)
assert.Contains(t, err.Error(), "validation failed")
assert.Equal(t, flux.StateEmpty, capacitor.State())
}
Testing Parse Errors
func TestParseError(t *testing.T) {
ch := make(chan []byte, 1)
ch <- []byte(`{invalid json}`)
capacitor := flux.New[Config](
flux.NewSyncChannelWatcher(ch),
func(ctx context.Context, prev, curr Config) error { return nil },
).SyncMode()
err := capacitor.Start(context.Background())
assert.Error(t, err)
assert.Contains(t, err.Error(), "unmarshal failed")
assert.Equal(t, flux.StateEmpty, capacitor.State())
}
Integration Testing with Providers
Provider packages include testcontainers-based tests. Run them:
# Requires Docker
go test ./pkg/redis/...
go test ./pkg/consul/...
go test ./pkg/etcd/...
For your own integration tests, follow the same pattern:
func TestRedisIntegration(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
// Set up testcontainer
ctx := context.Background()
container, _ := setupRedis(t)
client := connectRedis(container)
// Test your configuration flow
capacitor := flux.New[Config](
redis.New(client, "test:config"),
func(ctx context.Context, prev, curr Config) error { return nil },
)
client.Set(ctx, "test:config", `{"port": 8080}`, 0)
err := capacitor.Start(ctx)
assert.NoError(t, err)
}
Next Steps
- Providers Guide - Provider-specific details
- Architecture - How sync mode works internally