zoobzio December 10, 2025 Edit this page

Testing

Flux provides two mechanisms for deterministic testing:

  1. Sync Mode - Manual, step-by-step processing
  2. Fake Clock - Controlled time for debounce testing

Sync Mode

Sync mode disables async processing. You control exactly when values are processed.

Basic Usage

func TestConfigValidation(t *testing.T) {
    ch := make(chan []byte, 2)
    ch <- []byte(`{"port": 8080}`)
    ch <- []byte(`{"port": -1}`)  // Invalid

    var applied Config

    capacitor := flux.New[Config](
        flux.NewSyncChannelWatcher(ch),  // Use sync watcher for deterministic tests
        func(ctx context.Context, prev, curr Config) error {
            applied = curr
            return nil
        },
    ).SyncMode()

    ctx := context.Background()

    // Initial load - processed synchronously
    err := capacitor.Start(ctx)
    require.NoError(t, err)
    assert.Equal(t, 8080, applied.Port)
    assert.Equal(t, flux.StateHealthy, capacitor.State())

    // Second value - must call Process() explicitly
    capacitor.Process(ctx)
    assert.Equal(t, 8080, applied.Port)  // Unchanged - validation failed
    assert.Equal(t, flux.StateDegraded, capacitor.State())
}

Key Points

  • Use NewSyncChannelWatcher with SyncMode() for fully deterministic tests
  • Start() processes the first value synchronously
  • Process(ctx) processes the next available value
  • Process() returns true if a value was available
  • No background goroutines, no timing dependencies

Testing State Transitions

func TestStateTransitions(t *testing.T) {
    ch := make(chan []byte, 3)
    ch <- []byte(`{"valid": true}`)   // Healthy
    ch <- []byte(`{"valid": false}`)  // Degraded (callback fails)
    ch <- []byte(`{"valid": true}`)   // Recovery

    capacitor := flux.New[Config](
        flux.NewSyncChannelWatcher(ch),
        func(ctx context.Context, prev, curr Config) error {
            if !curr.Valid {
                return errors.New("invalid config")
            }
            return nil
        },
    ).SyncMode()

    ctx := context.Background()

    capacitor.Start(ctx)
    assert.Equal(t, flux.StateHealthy, capacitor.State())

    capacitor.Process(ctx)
    assert.Equal(t, flux.StateDegraded, capacitor.State())
    assert.Error(t, capacitor.LastError())

    capacitor.Process(ctx)
    assert.Equal(t, flux.StateHealthy, capacitor.State())
    assert.NoError(t, capacitor.LastError())
}

Fake Clock

For testing debounce behavior, use a fake clock:

import "github.com/zoobz-io/clockz"

func TestDebouncing(t *testing.T) {
    fake := clockz.NewFakeClock()

    ch := make(chan []byte, 3)

    var callCount int
    capacitor := flux.New[Config](
        flux.NewSyncChannelWatcher(ch),
        func(ctx context.Context, prev, curr Config) error {
            callCount++
            return nil
        },
    ).Clock(fake).Debounce(100*time.Millisecond)

    ctx := context.Background()

    // Send initial value
    ch <- []byte(`{"port": 8080}`)
    capacitor.Start(ctx)
    assert.Equal(t, 1, callCount)  // Initial processed immediately

    // Send rapid updates
    ch <- []byte(`{"port": 8081}`)
    ch <- []byte(`{"port": 8082}`)
    ch <- []byte(`{"port": 8083}`)

    // Advance time past debounce
    fake.Advance(150 * time.Millisecond)

    // Only last value processed
    assert.Equal(t, 2, callCount)
}

Testing Signals

Use capitan's testing helpers to verify signal emission:

import capitantesting "github.com/zoobz-io/capitan/testing"

func TestSignalEmission(t *testing.T) {
    c := capitan.New(capitan.WithSyncMode())
    defer c.Shutdown()

    capture := capitantesting.NewEventCapture()
    c.Observe(capture.Observer())

    ch := make(chan []byte, 1)
    ch <- []byte(`{"port": "invalid"}`)  // Wrong type

    capacitor := flux.New[Config](
        flux.NewSyncChannelWatcher(ch),
        func(ctx context.Context, prev, curr Config) error { return nil },
    ).SyncMode()

    capacitor.Start(context.Background())

    // Verify transform failed signal
    events := capture.Events()
    assert.Contains(t, events, flux.CapacitorTransformFailed)
}

Testing Prev/Curr Values

The callback receives both previous and current values:

func TestPrevCurrValues(t *testing.T) {
    ch := make(chan []byte, 2)
    ch <- []byte(`{"version": 1}`)
    ch <- []byte(`{"version": 2}`)

    var prevVersions, currVersions []int

    capacitor := flux.New[Config](
        flux.NewSyncChannelWatcher(ch),
        func(ctx context.Context, prev, curr Config) error {
            prevVersions = append(prevVersions, prev.Version)
            currVersions = append(currVersions, curr.Version)
            return nil
        },
    ).SyncMode()

    ctx := context.Background()
    capacitor.Start(ctx)
    capacitor.Process(ctx)

    assert.Equal(t, []int{0, 1}, prevVersions)  // Zero value, then 1
    assert.Equal(t, []int{1, 2}, currVersions)
}

Testing Validation Errors

type Config struct {
    Port int `json:"port" validate:"min=1,max=65535"`
}

func TestValidationError(t *testing.T) {
    ch := make(chan []byte, 1)
    ch <- []byte(`{"port": 99999}`)  // Exceeds max

    capacitor := flux.New[Config](
        flux.NewSyncChannelWatcher(ch),
        func(ctx context.Context, prev, curr Config) error { return nil },
    ).SyncMode()

    err := capacitor.Start(context.Background())

    assert.Error(t, err)
    assert.Contains(t, err.Error(), "validation failed")
    assert.Equal(t, flux.StateEmpty, capacitor.State())
}

Testing Parse Errors

func TestParseError(t *testing.T) {
    ch := make(chan []byte, 1)
    ch <- []byte(`{invalid json}`)

    capacitor := flux.New[Config](
        flux.NewSyncChannelWatcher(ch),
        func(ctx context.Context, prev, curr Config) error { return nil },
    ).SyncMode()

    err := capacitor.Start(context.Background())

    assert.Error(t, err)
    assert.Contains(t, err.Error(), "unmarshal failed")
    assert.Equal(t, flux.StateEmpty, capacitor.State())
}

Integration Testing with Providers

Provider packages include testcontainers-based tests. Run them:

# Requires Docker
go test ./pkg/redis/...
go test ./pkg/consul/...
go test ./pkg/etcd/...

For your own integration tests, follow the same pattern:

func TestRedisIntegration(t *testing.T) {
    if testing.Short() {
        t.Skip("skipping integration test")
    }

    // Set up testcontainer
    ctx := context.Background()
    container, _ := setupRedis(t)
    client := connectRedis(container)

    // Test your configuration flow
    capacitor := flux.New[Config](
        redis.New(client, "test:config"),
        func(ctx context.Context, prev, curr Config) error { return nil },
    )

    client.Set(ctx, "test:config", `{"port": 8080}`, 0)

    err := capacitor.Start(ctx)
    assert.NoError(t, err)
}

Next Steps