zoobzio December 10, 2025 Edit this page

Providers Reference

API reference for all provider packages in github.com/zoobz-io/flux/.

pkg/file

File watcher using fsnotify.

import "github.com/zoobz-io/flux/file"

New

func New(path string, opts ...Option) *Watcher

Creates a watcher for the given file path.

Options

None currently defined.

Example

capacitor := flux.New[Config](
    file.New("/etc/myapp/config.json"),
    callback,
)

pkg/redis

Redis watcher using keyspace notifications.

import "github.com/zoobz-io/flux/redis"

New

func New(client *redis.Client, key string, opts ...Option) *Watcher

Creates a watcher for the given Redis key.

Requires keyspace notifications enabled:

redis-cli CONFIG SET notify-keyspace-events KEA

Options

None currently defined.

Example

client := redis.NewClient(&redis.Options{Addr: "localhost:6379"})

capacitor := flux.New[Config](
    fluxredis.New(client, "myapp:config"),
    callback,
)

pkg/consul

Consul KV watcher using blocking queries.

import "github.com/zoobz-io/flux/consul"

New

func New(client *api.Client, key string, opts ...Option) *Watcher

Creates a watcher for the given Consul KV key.

Options

None currently defined.

Example

client, _ := api.NewClient(api.DefaultConfig())

capacitor := flux.New[Config](
    consul.New(client, "myapp/config"),
    callback,
)

pkg/etcd

etcd watcher using the native Watch API.

import "github.com/zoobz-io/flux/etcd"

New

func New(client *clientv3.Client, key string, opts ...Option) *Watcher

Creates a watcher for the given etcd key.

Options

None currently defined.

Example

client, _ := clientv3.New(clientv3.Config{
    Endpoints: []string{"localhost:2379"},
})

capacitor := flux.New[Config](
    etcd.New(client, "/myapp/config"),
    callback,
)

pkg/nats

NATS JetStream KV watcher.

import "github.com/zoobz-io/flux/nats"

New

func New(kv jetstream.KeyValue, key string, opts ...Option) *Watcher

Creates a watcher for the given NATS KV key.

Options

None currently defined.

Example

nc, _ := nats.Connect("nats://localhost:4222")
js, _ := jetstream.New(nc)
kv, _ := js.KeyValue(ctx, "config")

capacitor := flux.New[Config](
    fluxnats.New(kv, "myapp"),
    callback,
)

pkg/kubernetes

Kubernetes ConfigMap/Secret watcher.

import "github.com/zoobz-io/flux/kubernetes"

New

func New(client kubernetes.Interface, namespace, name, key string, opts ...Option) *Watcher

Creates a watcher for a ConfigMap or Secret. The key specifies which data key within the resource to watch.

ResourceType

type ResourceType int

const (
    ConfigMap ResourceType = iota
    Secret
)

Options

WithResourceType

func WithResourceType(rt ResourceType) Option

Sets the resource type. Default: ConfigMap.

Example

config, _ := rest.InClusterConfig()
client, _ := kubernetes.NewForConfig(config)

// ConfigMap
capacitor := flux.New[Config](
    k8s.New(client, "default", "myapp-config", "config.json"),
    callback,
)

// Secret
capacitor := flux.New[Config](
    k8s.New(client, "default", "myapp-secret", "config.json",
        k8s.WithResourceType(k8s.Secret),
    ),
    callback,
)

pkg/zookeeper

ZooKeeper node watcher.

import "github.com/zoobz-io/flux/zookeeper"

New

func New(conn *zk.Conn, path string, opts ...Option) *Watcher

Creates a watcher for the given ZooKeeper node path.

Options

None currently defined.

Example

conn, _, _ := zk.Connect([]string{"localhost:2181"}, 5*time.Second)

capacitor := flux.New[Config](
    zookeeper.New(conn, "/config/myapp"),
    callback,
)

pkg/firestore

Firestore document watcher using realtime listeners.

import "github.com/zoobz-io/flux/firestore"

New

func New(client *firestore.Client, collection, document string, opts ...Option) *Watcher

Creates a watcher for the given Firestore document.

Options

WithField

func WithField(field string) Option

Sets a specific field to extract. Default: extracts "data" field.

Helper Functions

CreateDocument

func CreateDocument(ctx context.Context, client *firestore.Client, collection, document string, data []byte) error

Creates a document with the expected structure.

UpdateDocument

func UpdateDocument(ctx context.Context, client *firestore.Client, collection, document string, data []byte) error

Updates a document's data field.

Example

client, _ := firestore.NewClient(ctx, "my-project")

capacitor := flux.New[Config](
    fluxfs.New(client, "config", "myapp"),
    callback,
)

// Or with specific field
capacitor := flux.New[Config](
    fluxfs.New(client, "config", "myapp", fluxfs.WithField("settings")),
    callback,
)

Document structure: By default expects a data field:

{"data": "{\"port\": 8080}"}

Common Patterns

All providers implement the same interface:

type Watcher interface {
    Watch(ctx context.Context) (<-chan []byte, error)
}

All providers:

  • Emit current value immediately on Watch()
  • Emit subsequent values on change
  • Close channel when context is cancelled
  • Return error if watching cannot start

Next Steps