zoobzio December 10, 2025 Edit this page

File Configuration

Recipe: Complete file-based configuration with observability, error handling, and graceful shutdown.

The Scenario

A web service that reads configuration from a YAML file. When the file changes, the service reloads without restart.

Project Structure

myapp/
├── main.go
├── config.yaml
└── go.mod

Configuration Type

// config.go
package main

import "time"

type Config struct {
    Server   ServerConfig   `yaml:"server"`
    Database DatabaseConfig `yaml:"database"`
    Features FeatureFlags   `yaml:"features"`
}

func (c Config) Validate() error {
    if err := c.Server.Validate(); err != nil {
        return fmt.Errorf("server: %w", err)
    }
    if err := c.Database.Validate(); err != nil {
        return fmt.Errorf("database: %w", err)
    }
    return nil
}

type ServerConfig struct {
    Port         int           `yaml:"port"`
    ReadTimeout  time.Duration `yaml:"read_timeout"`
    WriteTimeout time.Duration `yaml:"write_timeout"`
}

func (s ServerConfig) Validate() error {
    if s.Port < 1 || s.Port > 65535 {
        return errors.New("port must be between 1 and 65535")
    }
    if s.ReadTimeout < time.Second {
        return errors.New("read_timeout must be at least 1s")
    }
    if s.WriteTimeout < time.Second {
        return errors.New("write_timeout must be at least 1s")
    }
    return nil
}

type DatabaseConfig struct {
    URL             string `yaml:"url"`
    MaxConnections  int    `yaml:"max_connections"`
    IdleConnections int    `yaml:"idle_connections"`
}

func (d DatabaseConfig) Validate() error {
    if d.URL == "" {
        return errors.New("url is required")
    }
    if d.MaxConnections < 1 {
        return errors.New("max_connections must be at least 1")
    }
    if d.IdleConnections < 0 {
        return errors.New("idle_connections must be non-negative")
    }
    return nil
}

type FeatureFlags struct {
    EnableMetrics bool `yaml:"enable_metrics"`
    EnableTracing bool `yaml:"enable_tracing"`
    DebugMode     bool `yaml:"debug_mode"`
}

Config File

# config.yaml
server:
  port: 8080
  read_timeout: 30s
  write_timeout: 30s

database:
  url: postgres://localhost:5432/myapp
  max_connections: 100
  idle_connections: 10

features:
  enable_metrics: true
  enable_tracing: false
  debug_mode: false

Main Application

// main.go
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "os"
    "os/signal"
    "sync/atomic"
    "time"

    "github.com/zoobz-io/capitan"
    "github.com/zoobz-io/flux"
    "github.com/zoobz-io/flux/file"
)

type Application struct {
    config    atomic.Pointer[Config]
    capacitor *flux.Capacitor[Config]
}

func (a *Application) Config() Config {
    return *a.config.Load()
}

func (a *Application) UpdateConfig(ctx context.Context, prev, curr Config) error {
    // Validate business rules
    if curr.Database.IdleConnections > curr.Database.MaxConnections {
        return fmt.Errorf("idle_connections cannot exceed max_connections")
    }

    log.Printf("config updated: port=%d, max_conn=%d, debug=%v",
        curr.Server.Port,
        curr.Database.MaxConnections,
        curr.Features.DebugMode,
    )

    a.config.Store(&curr)
    return nil
}

func main() {
    app := &Application{}

    // Set up observability
    setupObservability()

    // Create capacitor
    app.capacitor = flux.New[Config](
        file.New("config.yaml"),
        app.UpdateConfig,
    ).Codec(flux.YAMLCodec{}).Debounce(100*time.Millisecond)

    // Start watching
    ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
    defer cancel()

    log.Println("starting configuration watcher...")

    if err := app.capacitor.Start(ctx); err != nil {
        log.Printf("WARNING: initial config failed: %v", err)
        log.Println("continuing to watch for valid configuration...")
    }

    logState(app.capacitor)

    // Start HTTP server
    server := startServer(app)

    // Wait for shutdown
    <-ctx.Done()

    log.Println("shutting down...")
    shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer shutdownCancel()
    server.Shutdown(shutdownCtx)
}

func setupObservability() {
    capitan.Hook(flux.CapacitorStateChanged, func(ctx context.Context, e *capitan.Event) {
        oldState, _ := flux.KeyOldState.From(e)
        newState, _ := flux.KeyNewState.From(e)
        log.Printf("config state: %s -> %s", oldState, newState)
    })

    capitan.Hook(flux.CapacitorChangeReceived, func(ctx context.Context, e *capitan.Event) {
        log.Println("config file change detected")
    })

    capitan.Hook(flux.CapacitorValidationFailed, func(ctx context.Context, e *capitan.Event) {
        errMsg, _ := flux.KeyError.From(e)
        log.Printf("config validation failed: %s", errMsg)
    })

    capitan.Hook(flux.CapacitorTransformFailed, func(ctx context.Context, e *capitan.Event) {
        errMsg, _ := flux.KeyError.From(e)
        log.Printf("config parse failed: %s", errMsg)
    })

    capitan.Hook(flux.CapacitorApplyFailed, func(ctx context.Context, e *capitan.Event) {
        errMsg, _ := flux.KeyError.From(e)
        log.Printf("config apply failed: %s", errMsg)
    })
}

func logState[T any](c *flux.Capacitor[T]) {
    switch c.State() {
    case flux.StateHealthy:
        log.Println("config status: HEALTHY")
    case flux.StateDegraded:
        log.Printf("config status: DEGRADED (error: %v)", c.LastError())
    case flux.StateEmpty:
        log.Printf("config status: EMPTY (error: %v)", c.LastError())
    case flux.StateLoading:
        log.Println("config status: LOADING")
    }
}

func startServer(app *Application) *http.Server {
    mux := http.NewServeMux()

    mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        state := app.capacitor.State()
        response := map[string]interface{}{
            "status": state.String(),
        }

        if state == flux.StateHealthy || state == flux.StateDegraded {
            w.WriteHeader(http.StatusOK)
        } else {
            w.WriteHeader(http.StatusServiceUnavailable)
        }

        if err := app.capacitor.LastError(); err != nil {
            response["error"] = err.Error()
        }

        json.NewEncoder(w).Encode(response)
    })

    mux.HandleFunc("/config", func(w http.ResponseWriter, r *http.Request) {
        cfg := app.Config()
        json.NewEncoder(w).Encode(cfg)
    })

    cfg := app.Config()
    server := &http.Server{
        Addr:         fmt.Sprintf(":%d", cfg.Server.Port),
        Handler:      mux,
        ReadTimeout:  cfg.Server.ReadTimeout,
        WriteTimeout: cfg.Server.WriteTimeout,
    }

    go func() {
        log.Printf("server listening on :%d", cfg.Server.Port)
        if err := server.ListenAndServe(); err != http.ErrServerClosed {
            log.Printf("server error: %v", err)
        }
    }()

    return server
}

Running

go run .
# Output:
# starting configuration watcher...
# config file change detected
# config updated: port=8080, max_conn=100, debug=false
# config state: loading -> healthy
# config status: HEALTHY
# server listening on :8080

Testing Changes

# Valid change
sed -i 's/port: 8080/port: 9090/' config.yaml
# Output:
# config file change detected
# config updated: port=9090, max_conn=100, debug=false

# Invalid change
sed -i 's/port: 9090/port: 99999/' config.yaml
# Output:
# config file change detected
# config validation failed: ...
# config state: healthy -> degraded

# Recovery
sed -i 's/port: 99999/port: 8080/' config.yaml
# Output:
# config file change detected
# config updated: port=8080, max_conn=100, debug=false
# config state: degraded -> healthy

Unit Testing

func TestConfigReload(t *testing.T) {
    ch := make(chan []byte, 2)

    ch <- []byte(`
server:
  port: 8080
  read_timeout: 30s
  write_timeout: 30s
database:
  url: postgres://localhost/test
  max_connections: 10
  idle_connections: 2
features:
  enable_metrics: false
`)

    ch <- []byte(`
server:
  port: 9090
  read_timeout: 60s
  write_timeout: 60s
database:
  url: postgres://localhost/test
  max_connections: 20
  idle_connections: 5
features:
  enable_metrics: true
`)

    var applied Config

    capacitor := flux.New[Config](
        flux.NewSyncChannelWatcher(ch),
        func(ctx context.Context, prev, curr Config) error {
            applied = curr
            return nil
        },
    ).Codec(flux.YAMLCodec{}).SyncMode()

    ctx := context.Background()

    err := capacitor.Start(ctx)
    require.NoError(t, err)
    assert.Equal(t, 8080, applied.Server.Port)

    capacitor.Process(ctx)
    assert.Equal(t, 9090, applied.Server.Port)
    assert.True(t, applied.Features.EnableMetrics)
}

Next Steps