zoobzio December 10, 2025 Edit this page

Custom Watcher

Recipe: Build a custom watcher for configuration sources not covered by built-in providers.

The Interface

type Watcher interface {
    Watch(ctx context.Context) (<-chan []byte, error)
}

Requirements

  1. Emit current value immediately - First emission enables initial load
  2. Emit on change - Subsequent emissions trigger the pipeline
  3. Close channel on shutdown - When context is cancelled
  4. Return error if watching cannot start - Invalid config, connection failure

Example: HTTP Polling Watcher

Poll an HTTP endpoint at intervals:

package httpconfig

import (
    "bytes"
    "context"
    "fmt"
    "io"
    "net/http"
    "time"
)

type Watcher struct {
    url      string
    interval time.Duration
    client   *http.Client
}

func New(url string, interval time.Duration) *Watcher {
    return &Watcher{
        url:      url,
        interval: interval,
        client:   &http.Client{Timeout: 10 * time.Second},
    }
}

func (w *Watcher) Watch(ctx context.Context) (<-chan []byte, error) {
    // Fetch initial value to validate endpoint
    initial, err := w.fetch(ctx)
    if err != nil {
        return nil, fmt.Errorf("initial fetch failed: %w", err)
    }

    out := make(chan []byte)

    go func() {
        defer close(out)

        // Emit initial value
        select {
        case out <- initial:
        case <-ctx.Done():
            return
        }

        current := initial
        ticker := time.NewTicker(w.interval)
        defer ticker.Stop()

        for {
            select {
            case <-ctx.Done():
                return
            case <-ticker.C:
                next, err := w.fetch(ctx)
                if err != nil {
                    continue // Skip failed fetches
                }
                if !bytes.Equal(current, next) {
                    current = next
                    select {
                    case out <- current:
                    case <-ctx.Done():
                        return
                    }
                }
            }
        }
    }()

    return out, nil
}

func (w *Watcher) fetch(ctx context.Context) ([]byte, error) {
    req, err := http.NewRequestWithContext(ctx, "GET", w.url, nil)
    if err != nil {
        return nil, err
    }

    resp, err := w.client.Do(req)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        return nil, fmt.Errorf("unexpected status: %d", resp.StatusCode)
    }

    return io.ReadAll(resp.Body)
}

Usage

type Config struct {
    Port int `json:"port"`
}

func (c Config) Validate() error {
    if c.Port < 1 || c.Port > 65535 {
        return errors.New("port must be between 1 and 65535")
    }
    return nil
}

capacitor := flux.New[Config](
    httpconfig.New("https://config.example.com/app.json", 30*time.Second),
    func(ctx context.Context, prev, curr Config) error {
        return app.Apply(curr)
    },
)

Example: AWS Parameter Store Watcher

Watch AWS SSM Parameter Store:

package ssm

import (
    "context"
    "time"

    "github.com/aws/aws-sdk-go-v2/service/ssm"
)

type Watcher struct {
    client   *ssm.Client
    name     string
    interval time.Duration
}

func New(client *ssm.Client, name string, interval time.Duration) *Watcher {
    return &Watcher{
        client:   client,
        name:     name,
        interval: interval,
    }
}

func (w *Watcher) Watch(ctx context.Context) (<-chan []byte, error) {
    initial, err := w.fetch(ctx)
    if err != nil {
        return nil, err
    }

    out := make(chan []byte)

    go func() {
        defer close(out)

        select {
        case out <- initial:
        case <-ctx.Done():
            return
        }

        current := string(initial)
        ticker := time.NewTicker(w.interval)
        defer ticker.Stop()

        for {
            select {
            case <-ctx.Done():
                return
            case <-ticker.C:
                next, err := w.fetch(ctx)
                if err != nil {
                    continue
                }
                if string(next) != current {
                    current = string(next)
                    select {
                    case out <- next:
                    case <-ctx.Done():
                        return
                    }
                }
            }
        }
    }()

    return out, nil
}

func (w *Watcher) fetch(ctx context.Context) ([]byte, error) {
    out, err := w.client.GetParameter(ctx, &ssm.GetParameterInput{
        Name:           &w.name,
        WithDecryption: aws.Bool(true),
    })
    if err != nil {
        return nil, err
    }
    return []byte(*out.Parameter.Value), nil
}

Example: gRPC Streaming Watcher

Watch a gRPC streaming endpoint:

package grpcconfig

import (
    "context"

    pb "myapp/proto/config"
)

type Watcher struct {
    client pb.ConfigServiceClient
    key    string
}

func New(client pb.ConfigServiceClient, key string) *Watcher {
    return &Watcher{client: client, key: key}
}

func (w *Watcher) Watch(ctx context.Context) (<-chan []byte, error) {
    stream, err := w.client.WatchConfig(ctx, &pb.WatchRequest{Key: w.key})
    if err != nil {
        return nil, err
    }

    out := make(chan []byte)

    go func() {
        defer close(out)

        for {
            resp, err := stream.Recv()
            if err != nil {
                return
            }

            select {
            case out <- resp.Value:
            case <-ctx.Done():
                return
            }
        }
    }()

    return out, nil
}

Common Patterns

Deduplication

Only emit when value changes:

if !bytes.Equal(current, next) {
    current = next
    out <- current
}

Backoff on Error

Don't spam a failing source:

backoff := time.Second
maxBackoff := time.Minute

for {
    data, err := fetch(ctx)
    if err != nil {
        time.Sleep(backoff)
        backoff = min(backoff*2, maxBackoff)
        continue
    }
    backoff = time.Second  // Reset on success
    // ...
}

Graceful Send

Always respect context during channel sends:

select {
case out <- value:
case <-ctx.Done():
    return
}

Options Pattern

type Option func(*Watcher)

func WithInterval(d time.Duration) Option {
    return func(w *Watcher) {
        w.interval = d
    }
}

func WithClient(c *http.Client) Option {
    return func(w *Watcher) {
        w.client = c
    }
}

func New(url string, opts ...Option) *Watcher {
    w := &Watcher{
        url:      url,
        interval: 30 * time.Second,
        client:   http.DefaultClient,
    }
    for _, opt := range opts {
        opt(w)
    }
    return w
}

Testing Custom Watchers

func TestHTTPWatcher(t *testing.T) {
    server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        w.Write([]byte(`{"port": 8080}`))
    }))
    defer server.Close()

    watcher := httpconfig.New(server.URL, 10*time.Millisecond)

    ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
    defer cancel()

    ch, err := watcher.Watch(ctx)
    require.NoError(t, err)

    // Should receive initial value
    val := <-ch
    assert.Equal(t, `{"port": 8080}`, string(val))
}

Contributing a Provider

If your watcher is generally useful, consider contributing it to pkg/:

  1. Create pkg/yourprovider/
  2. Add go.mod with dependencies
  3. Implement Watcher interface
  4. Add tests using testcontainers if applicable
  5. Add README.md with usage examples
  6. Update go.work to include new module

Next Steps