feedx

package module
v0.17.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 18, 2025 License: Apache-2.0 Imports: 19 Imported by: 1

README

Feedx

Go reference GitHub release Go Report Card License

Feed-based data exchange between services.

Usage (Ruby)

require 'bfs/s3'
require 'feedx'

# Init a new producer with an S3 destination
relation = Post.includes(:author)
producer = Feedx::Producer.new relation, 's3://my-bucket/feeds/users.json.gz'

# Push a new feed every hour
loop do
  producer.perform
  sleep(3600)
end

Documentation

Overview

Example
package main

import (
	"context"
	"errors"
	"fmt"
	"io"

	"github.com/bsm/bfs"
	"github.com/bsm/feedx"
	"github.com/bsm/feedx/internal/testdata"
)

type message = testdata.MockMessage

func main() {
	ctx := context.TODO()

	// create an mock object
	obj := bfs.NewInMemObject("todos.ndjson")
	defer obj.Close()

	pcr := feedx.NewProducerForRemote(obj)
	defer pcr.Close()

	// produce
	status, err := pcr.Produce(ctx, 101, nil, func(w *feedx.Writer) error {
		return errors.Join(
			w.Encode(&message{Name: "Jane", Height: 175}),
			w.Encode(&message{Name: "Joe", Height: 172}),
		)
	})
	if err != nil {
		panic(err)
	}

	fmt.Printf("PRODUCED skipped:%v version:%v->%v items:%v\n", status.Skipped, status.LocalVersion, status.RemoteVersion, status.NumItems)

	// create a consumer
	csm := feedx.NewConsumerForRemote(obj)
	defer csm.Close()

	// consume data
	var msgs []*message
	status, err = csm.Consume(context.TODO(), nil, func(r *feedx.Reader) error {
		for {
			var msg message
			if err := r.Decode(&msg); err != nil {
				if errors.Is(err, io.EOF) {
					break
				}
				return err
			}
			msgs = append(msgs, &msg)
		}

		return nil
	})
	if err != nil {
		panic(err)
	}

	fmt.Printf("CONSUMED skipped:%v version:%v->%v items:%v\n", status.Skipped, status.LocalVersion, status.RemoteVersion, status.NumItems)
	fmt.Printf("DATA     [%q, %q]\n", msgs[0].Name, msgs[1].Name)

}
Output:

PRODUCED skipped:false version:101->0 items:2
CONSUMED skipped:false version:0->101 items:2
DATA     ["Jane", "Joe"]

Index

Examples

Constants

This section is empty.

Variables

View Source
var CBORFormat = cborFormat{}

CBORFormat provides a Format implemention for CBOR.

View Source
var ErrNotModified = errors.New("feedx: not modified")

ErrNotModified is used to signal that something has not been modified.

View Source
var FlateCompression = flateCompression{}

FlateCompression supports flate compression format.

View Source
var GZipCompression = gzipCompression{}

GZipCompression supports gzip compression format.

View Source
var JSONFormat = jsonFormat{}

JSONFormat provides a Format implemention for JSON.

View Source
var NoCompression = noCompression{}

NoCompression is just a pass-through without compression.

View Source
var ProtobufFormat = protobufFormat{}

ProtobufFormat provides a Format implemention for Protobuf.

View Source
var ZstdCompression = zstdCompression{}

ZstdCompression supports zstd compression format.

Functions

This section is empty.

Types

type AfterHook added in v0.14.0

type AfterHook func(*Status, error)

AfterHook callbacks are run after jobs have finished.

type BeforeHook added in v0.14.0

type BeforeHook func(version int64) bool

BeforeHook callbacks are run before jobs are started. It receives the local version before sync as an argument and may return false to abort the cycle.

type Compression

type Compression interface {
	// NewReader wraps a reader.
	NewReader(io.Reader) (io.ReadCloser, error)
	// NewWriter wraps a writer.
	NewWriter(io.Writer) (io.WriteCloser, error)
}

Compression represents the data compression.

func DetectCompression

func DetectCompression(name string) Compression

DetectCompression detects the compression type from a URL path or file name.

type ConsumeFunc added in v0.4.0

type ConsumeFunc func(*Reader) error

ConsumeFunc is a callback invoked by consumers.

type Consumer

type Consumer interface {
	// Consume initiates a sync attempt. It will consume the remote feed only if it has changed since
	// last invocation.
	Consume(context.Context, *ReaderOptions, ConsumeFunc) (*Status, error)

	// Version indicates the most recently consumed version.
	Version() int64

	// Close stops the underlying sync process.
	Close() error
}

Consumer manages data retrieval from a remote feed. It queries the feed in regular intervals, continuously retrieving new updates.

func NewConsumer

func NewConsumer(ctx context.Context, remoteURL string) (Consumer, error)

NewConsumer starts a new feed consumer.

func NewConsumerForRemote added in v0.4.0

func NewConsumerForRemote(remote *bfs.Object) Consumer

NewConsumerForRemote starts a new feed consumer with a remote.

func NewIncrementalConsumer added in v0.14.0

func NewIncrementalConsumer(ctx context.Context, bucketURL string) (Consumer, error)

NewIncrementalConsumer starts a new incremental feed consumer.

func NewIncrementalConsumerForBucket added in v0.14.0

func NewIncrementalConsumerForBucket(bucket bfs.Bucket) Consumer

NewIncrementalConsumerForBucket starts a new incremental feed consumer with a bucket.

type CronJob added in v0.14.0

type CronJob struct {
	// contains filtered or unexported fields
}

CronJob runs in regular intervals until it's stopped.

func (*CronJob) Close added in v0.17.0

func (j *CronJob) Close() error

Close stops the job and waits until it is complete.

type Format

type Format interface {
	// NewDecoder wraps a decoder around a reader.
	NewDecoder(io.Reader) (FormatDecoder, error)
	// NewEncoder wraps an encoder around a writer.
	NewEncoder(io.Writer) (FormatEncoder, error)
}

Format represents the data format.

func DetectFormat

func DetectFormat(name string) Format

DetectFormat detects the data format from a URL path or file name. May return nil.

type FormatDecoder

type FormatDecoder interface {
	// Decode decodes the next message into an interface.
	Decode(v interface{}) error

	io.Closer
}

FormatDecoder methods

type FormatEncoder

type FormatEncoder interface {
	// Encode encodes the value to the stream.
	Encode(v interface{}) error

	io.Closer
}

FormatEncoder methods

type IncrementalProduceFunc added in v0.14.0

type IncrementalProduceFunc func(remoteVersion int64) ProduceFunc

IncrmentalProduceFunc returns a ProduceFunc closure around an incremental version.

type IncrementalProducer added in v0.14.0

type IncrementalProducer struct {
	// contains filtered or unexported fields
}

IncrementalProducer pushes incremental feeds to a remote bucket location.

func NewIncrementalProducer added in v0.14.0

func NewIncrementalProducer(ctx context.Context, bucketURL string) (*IncrementalProducer, error)

NewIncrementalProducer inits a new incremental feed producer.

func NewIncrementalProducerForBucket added in v0.14.0

func NewIncrementalProducerForBucket(bucket bfs.Bucket) *IncrementalProducer

NewIncrementalProducerForRemote starts a new incremental feed producer for a bucket.

func (*IncrementalProducer) Close added in v0.14.0

func (p *IncrementalProducer) Close() (err error)

Close stops the producer.

func (*IncrementalProducer) Produce added in v0.14.0

func (p *IncrementalProducer) Produce(ctx context.Context, version int64, opt *WriterOptions, pfn IncrementalProduceFunc) (*Status, error)

type ProduceFunc added in v0.6.0

type ProduceFunc func(*Writer) error

ProduceFunc is a callback which is run by the producer on every iteration.

type Producer added in v0.6.0

type Producer struct {
	// contains filtered or unexported fields
}

Producer instances push data feeds to remote locations.

func NewProducer added in v0.6.0

func NewProducer(ctx context.Context, remoteURL string) (*Producer, error)

NewProducer inits a new feed producer.

func NewProducerForRemote added in v0.6.0

func NewProducerForRemote(remote *bfs.Object) *Producer

NewProducerForRemote starts a new feed producer with a remote.

func (*Producer) Close added in v0.6.0

func (p *Producer) Close() error

Close stops the producer.

func (*Producer) Produce added in v0.14.0

func (p *Producer) Produce(ctx context.Context, version int64, opt *WriterOptions, pfn ProduceFunc) (*Status, error)

type Reader added in v0.4.0

type Reader struct {
	// contains filtered or unexported fields
}

Reader reads data from a remote feed.

func MultiReader added in v0.14.0

func MultiReader(ctx context.Context, remotes []*bfs.Object, opt *ReaderOptions) *Reader

MultiReader inits a new reader for multiple remotes. Remotes are read sequentially as if concatenated. Once all remotes are fully read, Read will return EOF.

func NewReader added in v0.4.0

func NewReader(ctx context.Context, remote *bfs.Object, opt *ReaderOptions) (*Reader, error)

NewReader inits a new reader.

func (*Reader) Close added in v0.4.0

func (r *Reader) Close() (err error)

Close closes the reader.

func (*Reader) Decode added in v0.4.0

func (r *Reader) Decode(v interface{}) error

Decode decodes the next formatted value from the feed. At end of feed, Read returns io.EOF.

func (*Reader) NumRead added in v0.4.0

func (r *Reader) NumRead() int64

NumRead returns the number of read values.

func (*Reader) Read added in v0.8.0

func (r *Reader) Read(p []byte) (int, error)

Read reads raw bytes from the feed. At end of feed, Read returns 0, io.EOF.

func (*Reader) Version added in v0.14.0

func (r *Reader) Version() (int64, error)

Version returns the version of the remote feed.

type ReaderOptions added in v0.4.0

type ReaderOptions struct {
	// Format specifies the format
	// Default: auto-detected from URL path.
	Format Format

	// Compression specifies the compression type.
	// Default: auto-detected from URL path.
	Compression Compression
}

ReaderOptions configure the reader instance.

type Scheduler added in v0.14.0

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler runs cronjobs in regular intervals.

func Every added in v0.14.0

func Every(interval time.Duration) *Scheduler

Every creates a scheduler.

func (*Scheduler) AfterSync added in v0.14.0

func (s *Scheduler) AfterSync(hooks ...AfterHook) *Scheduler

AfterSync adds before hooks.

func (*Scheduler) BeforeSync added in v0.14.0

func (s *Scheduler) BeforeSync(hooks ...BeforeHook) *Scheduler

BeforeSync adds custom before hooks.

func (*Scheduler) Consume added in v0.14.0

func (s *Scheduler) Consume(ctx context.Context, remoteURL string, cfn ConsumeFunc) (*CronJob, error)

Consume starts a consumer job.

func (*Scheduler) ConsumeWith added in v0.16.0

func (s *Scheduler) ConsumeWith(csm Consumer, cfn ConsumeFunc) (*CronJob, error)

ConsumeWith starts a consumer job with an existing consumer.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/bsm/bfs"
	"github.com/bsm/feedx"
)

func main() {
	ctx := context.TODO()

	// create an mock object
	obj := bfs.NewInMemObject("todos.ndjson")
	defer obj.Close()

	// create a consumer
	csm := feedx.NewConsumerForRemote(obj)
	defer csm.Close()

	job, err := feedx.Every(time.Hour).
		WithContext(ctx).
		BeforeSync(func(_ int64) bool {
			fmt.Println("1. Before sync")
			return true
		}).
		AfterSync(func(_ *feedx.Status, err error) {
			fmt.Printf("3. After sync - error:%v", err)
		}).
		ConsumeWith(csm, func(_ *feedx.Reader) error {
			fmt.Println("2. Consuming feed")
			return nil
		})
	if err != nil {
		panic(err)
	}

	_ = job.Close()

}
Output:

1. Before sync
2. Consuming feed
3. After sync - error:<nil>

func (*Scheduler) Produce added in v0.14.0

func (s *Scheduler) Produce(ctx context.Context, remoteURL string, pfn ProduceFunc) (*CronJob, error)

Produce starts a producer job.

func (*Scheduler) ProduceIncrementally added in v0.14.0

func (s *Scheduler) ProduceIncrementally(ctx context.Context, remoteURL string, pfn IncrementalProduceFunc) (*CronJob, error)

Produce starts an incremental producer job.

func (*Scheduler) ProduceIncrementallyWith added in v0.16.0

func (s *Scheduler) ProduceIncrementallyWith(pcr *IncrementalProducer, pfn IncrementalProduceFunc) (*CronJob, error)

ProduceIncrementallyFrom starts an incremental producer job with an existing producer.

func (*Scheduler) ProduceWith added in v0.16.0

func (s *Scheduler) ProduceWith(pcr *Producer, pfn ProduceFunc) (*CronJob, error)

ProduceWith starts a producer job with an existing producer.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/bsm/bfs"
	"github.com/bsm/feedx"
)

func main() {
	ctx := context.TODO()

	// create an mock object
	obj := bfs.NewInMemObject("todos.ndjson")
	defer obj.Close()

	// create a producer
	pcr := feedx.NewProducerForRemote(obj)
	defer pcr.Close()

	job, err := feedx.Every(time.Hour).
		WithContext(ctx).
		BeforeSync(func(_ int64) bool {
			fmt.Println("2. Before sync")
			return true
		}).
		AfterSync(func(_ *feedx.Status, err error) {
			fmt.Printf("4. After sync - error:%v", err)
		}).
		WithVersionCheck(func(_ context.Context) (int64, error) {
			fmt.Println("1. Retrieve latest version")
			return 101, nil
		}).
		ProduceWith(pcr, func(w *feedx.Writer) error {
			fmt.Println("3. Producing feed")
			return nil
		})
	if err != nil {
		panic(err)
	}

	_ = job.Close()

}
Output:

1. Retrieve latest version
2. Before sync
3. Producing feed
4. After sync - error:<nil>

func (*Scheduler) WithContext added in v0.14.0

func (s *Scheduler) WithContext(ctx context.Context) *Scheduler

WithContext sets a custom context for the run.

func (*Scheduler) WithReaderOptions added in v0.14.0

func (s *Scheduler) WithReaderOptions(opt *ReaderOptions) *Scheduler

WithReaderOptions sets custom reader options for consumers.

func (*Scheduler) WithVersionCheck added in v0.14.0

func (s *Scheduler) WithVersionCheck(fn VersionCheck) *Scheduler

WithVersionCheck sets a custom version check for producers.

func (*Scheduler) WithWriterOptions added in v0.14.0

func (s *Scheduler) WithWriterOptions(opt *WriterOptions) *Scheduler

WithWriterOptions sets custom writer options for producers.

type Status added in v0.14.0

type Status struct {
	// Skipped indicates the the sync was skipped, because there were no new changes.
	Skipped bool
	// LocalVersion indicates the local version before sync.
	LocalVersion int64
	// RemoteVersion indicates the remote version before sync.
	RemoteVersion int64
	// NumItems returns the number of items processed, either read of written.
	NumItems int64
}

Status is returned by sync processes.

type VersionCheck added in v0.14.0

type VersionCheck func(context.Context) (int64, error)

VersionCheck callbacks return the latest local version.

type Writer added in v0.4.0

type Writer struct {
	// contains filtered or unexported fields
}

Writer encodes feeds to remote locations.

func NewWriter added in v0.4.0

func NewWriter(ctx context.Context, remote *bfs.Object, opt *WriterOptions) *Writer

NewWriter inits a new feed writer.

func (*Writer) Commit added in v0.5.0

func (w *Writer) Commit() error

Commit closes the writer and persists the contents.

func (*Writer) Discard added in v0.5.0

func (w *Writer) Discard() error

Discard closes the writer and discards the contents.

func (*Writer) Encode added in v0.4.0

func (w *Writer) Encode(v interface{}) error

Encode appends a value to the feed.

func (*Writer) NumWritten added in v0.4.0

func (w *Writer) NumWritten() int64

NumWritten returns the number of written values.

func (*Writer) Write added in v0.8.0

func (w *Writer) Write(p []byte) (int, error)

Write write raw bytes to the feed.

func (*Writer) WriteString added in v0.8.0

func (w *Writer) WriteString(s string) (int, error)

WriteString write a raw string to the feed.

type WriterOptions added in v0.4.0

type WriterOptions struct {
	// Format specifies the format
	// Default: auto-detected from URL path.
	Format Format

	// Compression specifies the compression type.
	// Default: auto-detected from URL path.
	Compression Compression

	// Provides an optional version which is stored with the remote metadata.
	// Default: 0
	Version int64
}

WriterOptions configure the producer instance.

Directories

Path Synopsis
ext
parquet module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL