zoobzio December 10, 2025 Edit this page

Multi-Source Configuration

Recipe: Combine multiple configuration sources with priority-based merging.

The Scenario

An application needs configuration from multiple sources:

  1. Defaults - Hardcoded fallback values
  2. File - Local config file
  3. Remote - Consul/etcd for dynamic overrides

Later sources override earlier ones.

The Pattern

┌──────────┐   ┌──────────┐   ┌──────────┐
│ Defaults │   │   File   │   │  Remote  │
└────┬─────┘   └────┬─────┘   └────┬─────┘
     │              │              │
     ▼              ▼              ▼
   [T]            [T]            [T]
     │              │              │
     └──────────────┼──────────────┘
                    │
                    ▼
              ┌──────────┐
              │  Reducer │
              └────┬─────┘
                   │
                   ▼
                 [T]

Using Compose

type Config struct {
    Port        int           `json:"port"`
    Timeout     time.Duration `json:"timeout"`
    MaxRetries  int           `json:"max_retries"`
    FeatureFlag bool          `json:"feature_flag"`
}

func (c Config) Validate() error {
    if c.Port < 1 || c.Port > 65535 {
        return errors.New("port must be between 1 and 65535")
    }
    if c.MaxRetries < 0 {
        return errors.New("max_retries must be non-negative")
    }
    return nil
}

capacitor := flux.Compose[Config](
    // Reducer: merge sources with priority
    func(ctx context.Context, prev, curr []Config) (Config, error) {
        // curr[0] = defaults, curr[1] = file, curr[2] = remote
        merged := curr[0]

        // File overrides defaults
        if curr[1].Port != 0 {
            merged.Port = curr[1].Port
        }
        if curr[1].Timeout != 0 {
            merged.Timeout = curr[1].Timeout
        }
        if curr[1].MaxRetries != 0 {
            merged.MaxRetries = curr[1].MaxRetries
        }

        // Remote overrides file
        if curr[2].Port != 0 {
            merged.Port = curr[2].Port
        }
        if curr[2].FeatureFlag {
            merged.FeatureFlag = curr[2].FeatureFlag
        }

        return merged, nil
    },
    // Sources in priority order (lowest to highest)
    []flux.Watcher{
        defaultsWatcher,
        file.New("config.json"),
        consul.New(client, "myapp/config"),
    },
)

Complete Example

package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "time"

    "github.com/hashicorp/consul/api"
    "github.com/zoobz-io/flux"
    "github.com/zoobz-io/flux/consul"
    "github.com/zoobz-io/flux/file"
)

type Config struct {
    Port        int           `json:"port"`
    Timeout     time.Duration `json:"timeout"`
    MaxRetries  int           `json:"max_retries"`
    FeatureFlag bool          `json:"feature_flag"`
}

func (c Config) Validate() error {
    if c.Port < 1 || c.Port > 65535 {
        return errors.New("port must be between 1 and 65535")
    }
    if c.Timeout < time.Second {
        return errors.New("timeout must be at least 1s")
    }
    if c.MaxRetries < 0 || c.MaxRetries > 10 {
        return errors.New("max_retries must be between 0 and 10")
    }
    return nil
}

func main() {
    ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
    defer cancel()

    // Consul client
    consulClient, _ := api.NewClient(api.DefaultConfig())

    // Defaults via channel watcher
    defaultsCh := make(chan []byte, 1)
    defaultsCh <- []byte(`{
        "port": 8080,
        "timeout": "30s",
        "max_retries": 3,
        "feature_flag": false
    }`)

    capacitor := flux.Compose[Config](
        mergeConfigs,
        []flux.Watcher{
            flux.NewChannelWatcher(defaultsCh),
            file.New("config.json"),
            consul.New(consulClient, "myapp/config"),
        },
    )

    if err := capacitor.Start(ctx); err != nil {
        log.Fatalf("failed to start: %v", err)
    }

    cfg, _ := capacitor.Current()
    log.Printf("merged config: port=%d, timeout=%s, feature=%v",
        cfg.Port, cfg.Timeout, cfg.FeatureFlag)

    <-ctx.Done()
}

func mergeConfigs(ctx context.Context, prev, curr []Config) (Config, error) {
    // Start with defaults
    merged := curr[0]

    // Apply file config (non-zero values only)
    fileConfig := curr[1]
    if fileConfig.Port != 0 {
        merged.Port = fileConfig.Port
    }
    if fileConfig.Timeout != 0 {
        merged.Timeout = fileConfig.Timeout
    }
    if fileConfig.MaxRetries != 0 {
        merged.MaxRetries = fileConfig.MaxRetries
    }

    // Apply remote config (highest priority)
    remoteConfig := curr[2]
    if remoteConfig.Port != 0 {
        merged.Port = remoteConfig.Port
    }
    // Feature flags always from remote
    merged.FeatureFlag = remoteConfig.FeatureFlag

    return merged, nil
}

Detecting Changes

The reducer receives both previous and current slices:

func mergeConfigs(ctx context.Context, prev, curr []Config) (Config, error) {
    merged := merge(curr...)

    // prev is nil on initial load
    if prev != nil {
        // Check what changed
        if prev[2].FeatureFlag != curr[2].FeatureFlag {
            log.Printf("feature flag changed via remote")
        }
    }

    return merged, nil
}

Handling Source Errors

Individual source failures don't block other sources:

// Check which sources had errors
errs := capacitor.SourceErrors()
for _, se := range errs {
    log.Printf("source %d error: %v", se.Index, se.Error)
}

// Overall state
switch capacitor.State() {
case flux.StateHealthy:
    // All sources valid
case flux.StateDegraded:
    // Some source failed, using previous merged config
case flux.StateEmpty:
    // Never got valid config from all sources
}

Partial Source Updates

Each source updates independently. When any source changes, all sources are re-merged:

Time 0:  defaults=✓  file=✓  remote=✓  → merge all → Config
Time 1:  defaults=✓  file=✓  remote=✓* → merge all → Config (remote changed)
Time 2:  defaults=✓  file=✓* remote=✓  → merge all → Config (file changed)

Testing

func TestCompose(t *testing.T) {
    defaultsCh := make(chan []byte, 1)
    fileCh := make(chan []byte, 1)
    remoteCh := make(chan []byte, 1)

    defaultsCh <- []byte(`{"port": 8080}`)
    fileCh <- []byte(`{"port": 9090}`)
    remoteCh <- []byte(`{"feature_flag": true}`)

    var merged Config

    capacitor := flux.Compose[Config](
        func(ctx context.Context, prev, curr []Config) (Config, error) {
            m := curr[0]
            if curr[1].Port != 0 {
                m.Port = curr[1].Port
            }
            m.FeatureFlag = curr[2].FeatureFlag
            return m, nil
        },
        []flux.Watcher{
            flux.NewSyncChannelWatcher(defaultsCh),
            flux.NewSyncChannelWatcher(fileCh),
            flux.NewSyncChannelWatcher(remoteCh),
        },
    ).SyncMode()

    err := capacitor.Start(context.Background())
    require.NoError(t, err)

    merged, _ = capacitor.Current()
    assert.Equal(t, 9090, merged.Port)         // From file
    assert.True(t, merged.FeatureFlag)         // From remote
}

Use Cases

PatternSourcesPriority
Defaults + FileChannel, FileFile wins
Local + RemoteFile, Consul/etcdRemote wins
Environment LayersDev, Staging, ProdLater wins
Feature FlagsBase, OverridesOverrides win

Next Steps