Providers Reference
API reference for all provider packages in github.com/zoobz-io/flux/.
pkg/file
File watcher using fsnotify.
import "github.com/zoobz-io/flux/file"
New
func New(path string, opts ...Option) *Watcher
Creates a watcher for the given file path.
Options
None currently defined.
Example
capacitor := flux.New[Config](
file.New("/etc/myapp/config.json"),
callback,
)
pkg/redis
Redis watcher using keyspace notifications.
import "github.com/zoobz-io/flux/redis"
New
func New(client *redis.Client, key string, opts ...Option) *Watcher
Creates a watcher for the given Redis key.
Requires keyspace notifications enabled:
redis-cli CONFIG SET notify-keyspace-events KEA
Options
None currently defined.
Example
client := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
capacitor := flux.New[Config](
fluxredis.New(client, "myapp:config"),
callback,
)
pkg/consul
Consul KV watcher using blocking queries.
import "github.com/zoobz-io/flux/consul"
New
func New(client *api.Client, key string, opts ...Option) *Watcher
Creates a watcher for the given Consul KV key.
Options
None currently defined.
Example
client, _ := api.NewClient(api.DefaultConfig())
capacitor := flux.New[Config](
consul.New(client, "myapp/config"),
callback,
)
pkg/etcd
etcd watcher using the native Watch API.
import "github.com/zoobz-io/flux/etcd"
New
func New(client *clientv3.Client, key string, opts ...Option) *Watcher
Creates a watcher for the given etcd key.
Options
None currently defined.
Example
client, _ := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
})
capacitor := flux.New[Config](
etcd.New(client, "/myapp/config"),
callback,
)
pkg/nats
NATS JetStream KV watcher.
import "github.com/zoobz-io/flux/nats"
New
func New(kv jetstream.KeyValue, key string, opts ...Option) *Watcher
Creates a watcher for the given NATS KV key.
Options
None currently defined.
Example
nc, _ := nats.Connect("nats://localhost:4222")
js, _ := jetstream.New(nc)
kv, _ := js.KeyValue(ctx, "config")
capacitor := flux.New[Config](
fluxnats.New(kv, "myapp"),
callback,
)
pkg/kubernetes
Kubernetes ConfigMap/Secret watcher.
import "github.com/zoobz-io/flux/kubernetes"
New
func New(client kubernetes.Interface, namespace, name, key string, opts ...Option) *Watcher
Creates a watcher for a ConfigMap or Secret. The key specifies which data key within the resource to watch.
ResourceType
type ResourceType int
const (
ConfigMap ResourceType = iota
Secret
)
Options
WithResourceType
func WithResourceType(rt ResourceType) Option
Sets the resource type. Default: ConfigMap.
Example
config, _ := rest.InClusterConfig()
client, _ := kubernetes.NewForConfig(config)
// ConfigMap
capacitor := flux.New[Config](
k8s.New(client, "default", "myapp-config", "config.json"),
callback,
)
// Secret
capacitor := flux.New[Config](
k8s.New(client, "default", "myapp-secret", "config.json",
k8s.WithResourceType(k8s.Secret),
),
callback,
)
pkg/zookeeper
ZooKeeper node watcher.
import "github.com/zoobz-io/flux/zookeeper"
New
func New(conn *zk.Conn, path string, opts ...Option) *Watcher
Creates a watcher for the given ZooKeeper node path.
Options
None currently defined.
Example
conn, _, _ := zk.Connect([]string{"localhost:2181"}, 5*time.Second)
capacitor := flux.New[Config](
zookeeper.New(conn, "/config/myapp"),
callback,
)
pkg/firestore
Firestore document watcher using realtime listeners.
import "github.com/zoobz-io/flux/firestore"
New
func New(client *firestore.Client, collection, document string, opts ...Option) *Watcher
Creates a watcher for the given Firestore document.
Options
WithField
func WithField(field string) Option
Sets a specific field to extract. Default: extracts "data" field.
Helper Functions
CreateDocument
func CreateDocument(ctx context.Context, client *firestore.Client, collection, document string, data []byte) error
Creates a document with the expected structure.
UpdateDocument
func UpdateDocument(ctx context.Context, client *firestore.Client, collection, document string, data []byte) error
Updates a document's data field.
Example
client, _ := firestore.NewClient(ctx, "my-project")
capacitor := flux.New[Config](
fluxfs.New(client, "config", "myapp"),
callback,
)
// Or with specific field
capacitor := flux.New[Config](
fluxfs.New(client, "config", "myapp", fluxfs.WithField("settings")),
callback,
)
Document structure: By default expects a data field:
{"data": "{\"port\": 8080}"}
Common Patterns
All providers implement the same interface:
type Watcher interface {
Watch(ctx context.Context) (<-chan []byte, error)
}
All providers:
- Emit current value immediately on
Watch() - Emit subsequent values on change
- Close channel when context is cancelled
- Return error if watching cannot start
Next Steps
- Providers Guide - Usage guidance
- Custom Watcher - Build your own