Multi-Source Configuration
Recipe: Combine multiple configuration sources with priority-based merging.
The Scenario
An application needs configuration from multiple sources:
- Defaults - Hardcoded fallback values
- File - Local config file
- Remote - Consul/etcd for dynamic overrides
Later sources override earlier ones.
The Pattern
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Defaults │ │ File │ │ Remote │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
▼ ▼ ▼
[T] [T] [T]
│ │ │
└──────────────┼──────────────┘
│
▼
┌──────────┐
│ Reducer │
└────┬─────┘
│
▼
[T]
Using Compose
type Config struct {
Port int `json:"port"`
Timeout time.Duration `json:"timeout"`
MaxRetries int `json:"max_retries"`
FeatureFlag bool `json:"feature_flag"`
}
func (c Config) Validate() error {
if c.Port < 1 || c.Port > 65535 {
return errors.New("port must be between 1 and 65535")
}
if c.MaxRetries < 0 {
return errors.New("max_retries must be non-negative")
}
return nil
}
capacitor := flux.Compose[Config](
// Reducer: merge sources with priority
func(ctx context.Context, prev, curr []Config) (Config, error) {
// curr[0] = defaults, curr[1] = file, curr[2] = remote
merged := curr[0]
// File overrides defaults
if curr[1].Port != 0 {
merged.Port = curr[1].Port
}
if curr[1].Timeout != 0 {
merged.Timeout = curr[1].Timeout
}
if curr[1].MaxRetries != 0 {
merged.MaxRetries = curr[1].MaxRetries
}
// Remote overrides file
if curr[2].Port != 0 {
merged.Port = curr[2].Port
}
if curr[2].FeatureFlag {
merged.FeatureFlag = curr[2].FeatureFlag
}
return merged, nil
},
// Sources in priority order (lowest to highest)
[]flux.Watcher{
defaultsWatcher,
file.New("config.json"),
consul.New(client, "myapp/config"),
},
)
Complete Example
package main
import (
"context"
"log"
"os"
"os/signal"
"time"
"github.com/hashicorp/consul/api"
"github.com/zoobz-io/flux"
"github.com/zoobz-io/flux/consul"
"github.com/zoobz-io/flux/file"
)
type Config struct {
Port int `json:"port"`
Timeout time.Duration `json:"timeout"`
MaxRetries int `json:"max_retries"`
FeatureFlag bool `json:"feature_flag"`
}
func (c Config) Validate() error {
if c.Port < 1 || c.Port > 65535 {
return errors.New("port must be between 1 and 65535")
}
if c.Timeout < time.Second {
return errors.New("timeout must be at least 1s")
}
if c.MaxRetries < 0 || c.MaxRetries > 10 {
return errors.New("max_retries must be between 0 and 10")
}
return nil
}
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
// Consul client
consulClient, _ := api.NewClient(api.DefaultConfig())
// Defaults via channel watcher
defaultsCh := make(chan []byte, 1)
defaultsCh <- []byte(`{
"port": 8080,
"timeout": "30s",
"max_retries": 3,
"feature_flag": false
}`)
capacitor := flux.Compose[Config](
mergeConfigs,
[]flux.Watcher{
flux.NewChannelWatcher(defaultsCh),
file.New("config.json"),
consul.New(consulClient, "myapp/config"),
},
)
if err := capacitor.Start(ctx); err != nil {
log.Fatalf("failed to start: %v", err)
}
cfg, _ := capacitor.Current()
log.Printf("merged config: port=%d, timeout=%s, feature=%v",
cfg.Port, cfg.Timeout, cfg.FeatureFlag)
<-ctx.Done()
}
func mergeConfigs(ctx context.Context, prev, curr []Config) (Config, error) {
// Start with defaults
merged := curr[0]
// Apply file config (non-zero values only)
fileConfig := curr[1]
if fileConfig.Port != 0 {
merged.Port = fileConfig.Port
}
if fileConfig.Timeout != 0 {
merged.Timeout = fileConfig.Timeout
}
if fileConfig.MaxRetries != 0 {
merged.MaxRetries = fileConfig.MaxRetries
}
// Apply remote config (highest priority)
remoteConfig := curr[2]
if remoteConfig.Port != 0 {
merged.Port = remoteConfig.Port
}
// Feature flags always from remote
merged.FeatureFlag = remoteConfig.FeatureFlag
return merged, nil
}
Detecting Changes
The reducer receives both previous and current slices:
func mergeConfigs(ctx context.Context, prev, curr []Config) (Config, error) {
merged := merge(curr...)
// prev is nil on initial load
if prev != nil {
// Check what changed
if prev[2].FeatureFlag != curr[2].FeatureFlag {
log.Printf("feature flag changed via remote")
}
}
return merged, nil
}
Handling Source Errors
Individual source failures don't block other sources:
// Check which sources had errors
errs := capacitor.SourceErrors()
for _, se := range errs {
log.Printf("source %d error: %v", se.Index, se.Error)
}
// Overall state
switch capacitor.State() {
case flux.StateHealthy:
// All sources valid
case flux.StateDegraded:
// Some source failed, using previous merged config
case flux.StateEmpty:
// Never got valid config from all sources
}
Partial Source Updates
Each source updates independently. When any source changes, all sources are re-merged:
Time 0: defaults=✓ file=✓ remote=✓ → merge all → Config
Time 1: defaults=✓ file=✓ remote=✓* → merge all → Config (remote changed)
Time 2: defaults=✓ file=✓* remote=✓ → merge all → Config (file changed)
Testing
func TestCompose(t *testing.T) {
defaultsCh := make(chan []byte, 1)
fileCh := make(chan []byte, 1)
remoteCh := make(chan []byte, 1)
defaultsCh <- []byte(`{"port": 8080}`)
fileCh <- []byte(`{"port": 9090}`)
remoteCh <- []byte(`{"feature_flag": true}`)
var merged Config
capacitor := flux.Compose[Config](
func(ctx context.Context, prev, curr []Config) (Config, error) {
m := curr[0]
if curr[1].Port != 0 {
m.Port = curr[1].Port
}
m.FeatureFlag = curr[2].FeatureFlag
return m, nil
},
[]flux.Watcher{
flux.NewSyncChannelWatcher(defaultsCh),
flux.NewSyncChannelWatcher(fileCh),
flux.NewSyncChannelWatcher(remoteCh),
},
).SyncMode()
err := capacitor.Start(context.Background())
require.NoError(t, err)
merged, _ = capacitor.Current()
assert.Equal(t, 9090, merged.Port) // From file
assert.True(t, merged.FeatureFlag) // From remote
}
Use Cases
| Pattern | Sources | Priority |
|---|---|---|
| Defaults + File | Channel, File | File wins |
| Local + Remote | File, Consul/etcd | Remote wins |
| Environment Layers | Dev, Staging, Prod | Later wins |
| Feature Flags | Base, Overrides | Overrides win |
Next Steps
- Custom Watcher - Build your own source
- Providers Guide - All available providers