Custom Watcher
Recipe: Build a custom watcher for configuration sources not covered by built-in providers.
The Interface
type Watcher interface {
Watch(ctx context.Context) (<-chan []byte, error)
}
Requirements
- Emit current value immediately - First emission enables initial load
- Emit on change - Subsequent emissions trigger the pipeline
- Close channel on shutdown - When context is cancelled
- Return error if watching cannot start - Invalid config, connection failure
Example: HTTP Polling Watcher
Poll an HTTP endpoint at intervals:
package httpconfig
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"time"
)
type Watcher struct {
url string
interval time.Duration
client *http.Client
}
func New(url string, interval time.Duration) *Watcher {
return &Watcher{
url: url,
interval: interval,
client: &http.Client{Timeout: 10 * time.Second},
}
}
func (w *Watcher) Watch(ctx context.Context) (<-chan []byte, error) {
// Fetch initial value to validate endpoint
initial, err := w.fetch(ctx)
if err != nil {
return nil, fmt.Errorf("initial fetch failed: %w", err)
}
out := make(chan []byte)
go func() {
defer close(out)
// Emit initial value
select {
case out <- initial:
case <-ctx.Done():
return
}
current := initial
ticker := time.NewTicker(w.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
next, err := w.fetch(ctx)
if err != nil {
continue // Skip failed fetches
}
if !bytes.Equal(current, next) {
current = next
select {
case out <- current:
case <-ctx.Done():
return
}
}
}
}
}()
return out, nil
}
func (w *Watcher) fetch(ctx context.Context) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, "GET", w.url, nil)
if err != nil {
return nil, err
}
resp, err := w.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status: %d", resp.StatusCode)
}
return io.ReadAll(resp.Body)
}
Usage
type Config struct {
Port int `json:"port"`
}
func (c Config) Validate() error {
if c.Port < 1 || c.Port > 65535 {
return errors.New("port must be between 1 and 65535")
}
return nil
}
capacitor := flux.New[Config](
httpconfig.New("https://config.example.com/app.json", 30*time.Second),
func(ctx context.Context, prev, curr Config) error {
return app.Apply(curr)
},
)
Example: AWS Parameter Store Watcher
Watch AWS SSM Parameter Store:
package ssm
import (
"context"
"time"
"github.com/aws/aws-sdk-go-v2/service/ssm"
)
type Watcher struct {
client *ssm.Client
name string
interval time.Duration
}
func New(client *ssm.Client, name string, interval time.Duration) *Watcher {
return &Watcher{
client: client,
name: name,
interval: interval,
}
}
func (w *Watcher) Watch(ctx context.Context) (<-chan []byte, error) {
initial, err := w.fetch(ctx)
if err != nil {
return nil, err
}
out := make(chan []byte)
go func() {
defer close(out)
select {
case out <- initial:
case <-ctx.Done():
return
}
current := string(initial)
ticker := time.NewTicker(w.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
next, err := w.fetch(ctx)
if err != nil {
continue
}
if string(next) != current {
current = string(next)
select {
case out <- next:
case <-ctx.Done():
return
}
}
}
}
}()
return out, nil
}
func (w *Watcher) fetch(ctx context.Context) ([]byte, error) {
out, err := w.client.GetParameter(ctx, &ssm.GetParameterInput{
Name: &w.name,
WithDecryption: aws.Bool(true),
})
if err != nil {
return nil, err
}
return []byte(*out.Parameter.Value), nil
}
Example: gRPC Streaming Watcher
Watch a gRPC streaming endpoint:
package grpcconfig
import (
"context"
pb "myapp/proto/config"
)
type Watcher struct {
client pb.ConfigServiceClient
key string
}
func New(client pb.ConfigServiceClient, key string) *Watcher {
return &Watcher{client: client, key: key}
}
func (w *Watcher) Watch(ctx context.Context) (<-chan []byte, error) {
stream, err := w.client.WatchConfig(ctx, &pb.WatchRequest{Key: w.key})
if err != nil {
return nil, err
}
out := make(chan []byte)
go func() {
defer close(out)
for {
resp, err := stream.Recv()
if err != nil {
return
}
select {
case out <- resp.Value:
case <-ctx.Done():
return
}
}
}()
return out, nil
}
Common Patterns
Deduplication
Only emit when value changes:
if !bytes.Equal(current, next) {
current = next
out <- current
}
Backoff on Error
Don't spam a failing source:
backoff := time.Second
maxBackoff := time.Minute
for {
data, err := fetch(ctx)
if err != nil {
time.Sleep(backoff)
backoff = min(backoff*2, maxBackoff)
continue
}
backoff = time.Second // Reset on success
// ...
}
Graceful Send
Always respect context during channel sends:
select {
case out <- value:
case <-ctx.Done():
return
}
Options Pattern
type Option func(*Watcher)
func WithInterval(d time.Duration) Option {
return func(w *Watcher) {
w.interval = d
}
}
func WithClient(c *http.Client) Option {
return func(w *Watcher) {
w.client = c
}
}
func New(url string, opts ...Option) *Watcher {
w := &Watcher{
url: url,
interval: 30 * time.Second,
client: http.DefaultClient,
}
for _, opt := range opts {
opt(w)
}
return w
}
Testing Custom Watchers
func TestHTTPWatcher(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"port": 8080}`))
}))
defer server.Close()
watcher := httpconfig.New(server.URL, 10*time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
ch, err := watcher.Watch(ctx)
require.NoError(t, err)
// Should receive initial value
val := <-ch
assert.Equal(t, `{"port": 8080}`, string(val))
}
Contributing a Provider
If your watcher is generally useful, consider contributing it to pkg/:
- Create
pkg/yourprovider/ - Add
go.modwith dependencies - Implement
Watcherinterface - Add tests using testcontainers if applicable
- Add
README.mdwith usage examples - Update
go.workto include new module
Next Steps
- Providers Guide - Existing providers for reference
- Architecture - How watchers integrate with Capacitor