File Configuration
Recipe: Complete file-based configuration with observability, error handling, and graceful shutdown.
The Scenario
A web service that reads configuration from a YAML file. When the file changes, the service reloads without restart.
Project Structure
myapp/
├── main.go
├── config.yaml
└── go.mod
Configuration Type
// config.go
package main
import "time"
type Config struct {
Server ServerConfig `yaml:"server"`
Database DatabaseConfig `yaml:"database"`
Features FeatureFlags `yaml:"features"`
}
func (c Config) Validate() error {
if err := c.Server.Validate(); err != nil {
return fmt.Errorf("server: %w", err)
}
if err := c.Database.Validate(); err != nil {
return fmt.Errorf("database: %w", err)
}
return nil
}
type ServerConfig struct {
Port int `yaml:"port"`
ReadTimeout time.Duration `yaml:"read_timeout"`
WriteTimeout time.Duration `yaml:"write_timeout"`
}
func (s ServerConfig) Validate() error {
if s.Port < 1 || s.Port > 65535 {
return errors.New("port must be between 1 and 65535")
}
if s.ReadTimeout < time.Second {
return errors.New("read_timeout must be at least 1s")
}
if s.WriteTimeout < time.Second {
return errors.New("write_timeout must be at least 1s")
}
return nil
}
type DatabaseConfig struct {
URL string `yaml:"url"`
MaxConnections int `yaml:"max_connections"`
IdleConnections int `yaml:"idle_connections"`
}
func (d DatabaseConfig) Validate() error {
if d.URL == "" {
return errors.New("url is required")
}
if d.MaxConnections < 1 {
return errors.New("max_connections must be at least 1")
}
if d.IdleConnections < 0 {
return errors.New("idle_connections must be non-negative")
}
return nil
}
type FeatureFlags struct {
EnableMetrics bool `yaml:"enable_metrics"`
EnableTracing bool `yaml:"enable_tracing"`
DebugMode bool `yaml:"debug_mode"`
}
Config File
# config.yaml
server:
port: 8080
read_timeout: 30s
write_timeout: 30s
database:
url: postgres://localhost:5432/myapp
max_connections: 100
idle_connections: 10
features:
enable_metrics: true
enable_tracing: false
debug_mode: false
Main Application
// main.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"sync/atomic"
"time"
"github.com/zoobz-io/capitan"
"github.com/zoobz-io/flux"
"github.com/zoobz-io/flux/file"
)
type Application struct {
config atomic.Pointer[Config]
capacitor *flux.Capacitor[Config]
}
func (a *Application) Config() Config {
return *a.config.Load()
}
func (a *Application) UpdateConfig(ctx context.Context, prev, curr Config) error {
// Validate business rules
if curr.Database.IdleConnections > curr.Database.MaxConnections {
return fmt.Errorf("idle_connections cannot exceed max_connections")
}
log.Printf("config updated: port=%d, max_conn=%d, debug=%v",
curr.Server.Port,
curr.Database.MaxConnections,
curr.Features.DebugMode,
)
a.config.Store(&curr)
return nil
}
func main() {
app := &Application{}
// Set up observability
setupObservability()
// Create capacitor
app.capacitor = flux.New[Config](
file.New("config.yaml"),
app.UpdateConfig,
).Codec(flux.YAMLCodec{}).Debounce(100*time.Millisecond)
// Start watching
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
log.Println("starting configuration watcher...")
if err := app.capacitor.Start(ctx); err != nil {
log.Printf("WARNING: initial config failed: %v", err)
log.Println("continuing to watch for valid configuration...")
}
logState(app.capacitor)
// Start HTTP server
server := startServer(app)
// Wait for shutdown
<-ctx.Done()
log.Println("shutting down...")
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()
server.Shutdown(shutdownCtx)
}
func setupObservability() {
capitan.Hook(flux.CapacitorStateChanged, func(ctx context.Context, e *capitan.Event) {
oldState, _ := flux.KeyOldState.From(e)
newState, _ := flux.KeyNewState.From(e)
log.Printf("config state: %s -> %s", oldState, newState)
})
capitan.Hook(flux.CapacitorChangeReceived, func(ctx context.Context, e *capitan.Event) {
log.Println("config file change detected")
})
capitan.Hook(flux.CapacitorValidationFailed, func(ctx context.Context, e *capitan.Event) {
errMsg, _ := flux.KeyError.From(e)
log.Printf("config validation failed: %s", errMsg)
})
capitan.Hook(flux.CapacitorTransformFailed, func(ctx context.Context, e *capitan.Event) {
errMsg, _ := flux.KeyError.From(e)
log.Printf("config parse failed: %s", errMsg)
})
capitan.Hook(flux.CapacitorApplyFailed, func(ctx context.Context, e *capitan.Event) {
errMsg, _ := flux.KeyError.From(e)
log.Printf("config apply failed: %s", errMsg)
})
}
func logState[T any](c *flux.Capacitor[T]) {
switch c.State() {
case flux.StateHealthy:
log.Println("config status: HEALTHY")
case flux.StateDegraded:
log.Printf("config status: DEGRADED (error: %v)", c.LastError())
case flux.StateEmpty:
log.Printf("config status: EMPTY (error: %v)", c.LastError())
case flux.StateLoading:
log.Println("config status: LOADING")
}
}
func startServer(app *Application) *http.Server {
mux := http.NewServeMux()
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
state := app.capacitor.State()
response := map[string]interface{}{
"status": state.String(),
}
if state == flux.StateHealthy || state == flux.StateDegraded {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusServiceUnavailable)
}
if err := app.capacitor.LastError(); err != nil {
response["error"] = err.Error()
}
json.NewEncoder(w).Encode(response)
})
mux.HandleFunc("/config", func(w http.ResponseWriter, r *http.Request) {
cfg := app.Config()
json.NewEncoder(w).Encode(cfg)
})
cfg := app.Config()
server := &http.Server{
Addr: fmt.Sprintf(":%d", cfg.Server.Port),
Handler: mux,
ReadTimeout: cfg.Server.ReadTimeout,
WriteTimeout: cfg.Server.WriteTimeout,
}
go func() {
log.Printf("server listening on :%d", cfg.Server.Port)
if err := server.ListenAndServe(); err != http.ErrServerClosed {
log.Printf("server error: %v", err)
}
}()
return server
}
Running
go run .
# Output:
# starting configuration watcher...
# config file change detected
# config updated: port=8080, max_conn=100, debug=false
# config state: loading -> healthy
# config status: HEALTHY
# server listening on :8080
Testing Changes
# Valid change
sed -i 's/port: 8080/port: 9090/' config.yaml
# Output:
# config file change detected
# config updated: port=9090, max_conn=100, debug=false
# Invalid change
sed -i 's/port: 9090/port: 99999/' config.yaml
# Output:
# config file change detected
# config validation failed: ...
# config state: healthy -> degraded
# Recovery
sed -i 's/port: 99999/port: 8080/' config.yaml
# Output:
# config file change detected
# config updated: port=8080, max_conn=100, debug=false
# config state: degraded -> healthy
Unit Testing
func TestConfigReload(t *testing.T) {
ch := make(chan []byte, 2)
ch <- []byte(`
server:
port: 8080
read_timeout: 30s
write_timeout: 30s
database:
url: postgres://localhost/test
max_connections: 10
idle_connections: 2
features:
enable_metrics: false
`)
ch <- []byte(`
server:
port: 9090
read_timeout: 60s
write_timeout: 60s
database:
url: postgres://localhost/test
max_connections: 20
idle_connections: 5
features:
enable_metrics: true
`)
var applied Config
capacitor := flux.New[Config](
flux.NewSyncChannelWatcher(ch),
func(ctx context.Context, prev, curr Config) error {
applied = curr
return nil
},
).Codec(flux.YAMLCodec{}).SyncMode()
ctx := context.Background()
err := capacitor.Start(ctx)
require.NoError(t, err)
assert.Equal(t, 8080, applied.Server.Port)
capacitor.Process(ctx)
assert.Equal(t, 9090, applied.Server.Port)
assert.True(t, applied.Features.EnableMetrics)
}
Next Steps
- Multi-Source - Combine multiple config sources
- Custom Watcher - Build your own provider