Documentation
¶
Overview ¶
Example ¶
package main
import (
"context"
"errors"
"fmt"
"io"
"github.com/bsm/bfs"
"github.com/bsm/feedx"
"github.com/bsm/feedx/internal/testdata"
)
type message = testdata.MockMessage
func main() {
ctx := context.TODO()
// create an mock object
obj := bfs.NewInMemObject("todos.ndjson")
defer obj.Close()
pcr := feedx.NewProducerForRemote(obj)
defer pcr.Close()
// produce
status, err := pcr.Produce(ctx, 101, nil, func(w *feedx.Writer) error {
return errors.Join(
w.Encode(&message{Name: "Jane", Height: 175}),
w.Encode(&message{Name: "Joe", Height: 172}),
)
})
if err != nil {
panic(err)
}
fmt.Printf("PRODUCED skipped:%v version:%v->%v items:%v\n", status.Skipped, status.LocalVersion, status.RemoteVersion, status.NumItems)
// create a consumer
csm := feedx.NewConsumerForRemote(obj)
defer csm.Close()
// consume data
var msgs []*message
status, err = csm.Consume(context.TODO(), nil, func(r *feedx.Reader) error {
for {
var msg message
if err := r.Decode(&msg); err != nil {
if errors.Is(err, io.EOF) {
break
}
return err
}
msgs = append(msgs, &msg)
}
return nil
})
if err != nil {
panic(err)
}
fmt.Printf("CONSUMED skipped:%v version:%v->%v items:%v\n", status.Skipped, status.LocalVersion, status.RemoteVersion, status.NumItems)
fmt.Printf("DATA [%q, %q]\n", msgs[0].Name, msgs[1].Name)
}
Output: PRODUCED skipped:false version:101->0 items:2 CONSUMED skipped:false version:0->101 items:2 DATA ["Jane", "Joe"]
Index ¶
- Variables
- type AfterHook
- type BeforeHook
- type Compression
- type ConsumeFunc
- type Consumer
- type CronJob
- type Format
- type FormatDecoder
- type FormatEncoder
- type IncrementalProduceFunc
- type IncrementalProducer
- type ProduceFunc
- type Producer
- type Reader
- type ReaderOptions
- type Scheduler
- func (s *Scheduler) AfterSync(hooks ...AfterHook) *Scheduler
- func (s *Scheduler) BeforeSync(hooks ...BeforeHook) *Scheduler
- func (s *Scheduler) Consume(ctx context.Context, remoteURL string, cfn ConsumeFunc) (*CronJob, error)
- func (s *Scheduler) ConsumeWith(csm Consumer, cfn ConsumeFunc) (*CronJob, error)
- func (s *Scheduler) Produce(ctx context.Context, remoteURL string, pfn ProduceFunc) (*CronJob, error)
- func (s *Scheduler) ProduceIncrementally(ctx context.Context, remoteURL string, pfn IncrementalProduceFunc) (*CronJob, error)
- func (s *Scheduler) ProduceIncrementallyWith(pcr *IncrementalProducer, pfn IncrementalProduceFunc) (*CronJob, error)
- func (s *Scheduler) ProduceWith(pcr *Producer, pfn ProduceFunc) (*CronJob, error)
- func (s *Scheduler) WithContext(ctx context.Context) *Scheduler
- func (s *Scheduler) WithReaderOptions(opt *ReaderOptions) *Scheduler
- func (s *Scheduler) WithVersionCheck(fn VersionCheck) *Scheduler
- func (s *Scheduler) WithWriterOptions(opt *WriterOptions) *Scheduler
- type Status
- type VersionCheck
- type Writer
- type WriterOptions
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var CBORFormat = cborFormat{}
CBORFormat provides a Format implemention for CBOR.
var ErrNotModified = errors.New("feedx: not modified")
ErrNotModified is used to signal that something has not been modified.
var FlateCompression = flateCompression{}
FlateCompression supports flate compression format.
var GZipCompression = gzipCompression{}
GZipCompression supports gzip compression format.
var JSONFormat = jsonFormat{}
JSONFormat provides a Format implemention for JSON.
var NoCompression = noCompression{}
NoCompression is just a pass-through without compression.
var ProtobufFormat = protobufFormat{}
ProtobufFormat provides a Format implemention for Protobuf.
var ZstdCompression = zstdCompression{}
ZstdCompression supports zstd compression format.
Functions ¶
This section is empty.
Types ¶
type BeforeHook ¶ added in v0.14.0
BeforeHook callbacks are run before jobs are started. It receives the local version before sync as an argument and may return false to abort the cycle.
type Compression ¶
type Compression interface {
// NewReader wraps a reader.
NewReader(io.Reader) (io.ReadCloser, error)
// NewWriter wraps a writer.
NewWriter(io.Writer) (io.WriteCloser, error)
}
Compression represents the data compression.
func DetectCompression ¶
func DetectCompression(name string) Compression
DetectCompression detects the compression type from a URL path or file name.
type ConsumeFunc ¶ added in v0.4.0
ConsumeFunc is a callback invoked by consumers.
type Consumer ¶
type Consumer interface {
// Consume initiates a sync attempt. It will consume the remote feed only if it has changed since
// last invocation.
Consume(context.Context, *ReaderOptions, ConsumeFunc) (*Status, error)
// Version indicates the most recently consumed version.
Version() int64
// Close stops the underlying sync process.
Close() error
}
Consumer manages data retrieval from a remote feed. It queries the feed in regular intervals, continuously retrieving new updates.
func NewConsumer ¶
NewConsumer starts a new feed consumer.
func NewConsumerForRemote ¶ added in v0.4.0
NewConsumerForRemote starts a new feed consumer with a remote.
func NewIncrementalConsumer ¶ added in v0.14.0
NewIncrementalConsumer starts a new incremental feed consumer.
func NewIncrementalConsumerForBucket ¶ added in v0.14.0
NewIncrementalConsumerForBucket starts a new incremental feed consumer with a bucket.
type CronJob ¶ added in v0.14.0
type CronJob struct {
// contains filtered or unexported fields
}
CronJob runs in regular intervals until it's stopped.
type Format ¶
type Format interface {
// NewDecoder wraps a decoder around a reader.
NewDecoder(io.Reader) (FormatDecoder, error)
// NewEncoder wraps an encoder around a writer.
NewEncoder(io.Writer) (FormatEncoder, error)
}
Format represents the data format.
func DetectFormat ¶
DetectFormat detects the data format from a URL path or file name. May return nil.
type FormatDecoder ¶
type FormatDecoder interface {
// Decode decodes the next message into an interface.
Decode(v interface{}) error
io.Closer
}
FormatDecoder methods
type FormatEncoder ¶
type FormatEncoder interface {
// Encode encodes the value to the stream.
Encode(v interface{}) error
io.Closer
}
FormatEncoder methods
type IncrementalProduceFunc ¶ added in v0.14.0
type IncrementalProduceFunc func(remoteVersion int64) ProduceFunc
IncrmentalProduceFunc returns a ProduceFunc closure around an incremental version.
type IncrementalProducer ¶ added in v0.14.0
type IncrementalProducer struct {
// contains filtered or unexported fields
}
IncrementalProducer pushes incremental feeds to a remote bucket location.
func NewIncrementalProducer ¶ added in v0.14.0
func NewIncrementalProducer(ctx context.Context, bucketURL string) (*IncrementalProducer, error)
NewIncrementalProducer inits a new incremental feed producer.
func NewIncrementalProducerForBucket ¶ added in v0.14.0
func NewIncrementalProducerForBucket(bucket bfs.Bucket) *IncrementalProducer
NewIncrementalProducerForRemote starts a new incremental feed producer for a bucket.
func (*IncrementalProducer) Close ¶ added in v0.14.0
func (p *IncrementalProducer) Close() (err error)
Close stops the producer.
func (*IncrementalProducer) Produce ¶ added in v0.14.0
func (p *IncrementalProducer) Produce(ctx context.Context, version int64, opt *WriterOptions, pfn IncrementalProduceFunc) (*Status, error)
type ProduceFunc ¶ added in v0.6.0
ProduceFunc is a callback which is run by the producer on every iteration.
type Producer ¶ added in v0.6.0
type Producer struct {
// contains filtered or unexported fields
}
Producer instances push data feeds to remote locations.
func NewProducer ¶ added in v0.6.0
NewProducer inits a new feed producer.
func NewProducerForRemote ¶ added in v0.6.0
NewProducerForRemote starts a new feed producer with a remote.
func (*Producer) Produce ¶ added in v0.14.0
func (p *Producer) Produce(ctx context.Context, version int64, opt *WriterOptions, pfn ProduceFunc) (*Status, error)
type Reader ¶ added in v0.4.0
type Reader struct {
// contains filtered or unexported fields
}
Reader reads data from a remote feed.
func MultiReader ¶ added in v0.14.0
MultiReader inits a new reader for multiple remotes. Remotes are read sequentially as if concatenated. Once all remotes are fully read, Read will return EOF.
func (*Reader) Decode ¶ added in v0.4.0
Decode decodes the next formatted value from the feed. At end of feed, Read returns io.EOF.
type ReaderOptions ¶ added in v0.4.0
type ReaderOptions struct {
// Format specifies the format
// Default: auto-detected from URL path.
Format Format
// Compression specifies the compression type.
// Default: auto-detected from URL path.
Compression Compression
}
ReaderOptions configure the reader instance.
type Scheduler ¶ added in v0.14.0
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler runs cronjobs in regular intervals.
func (*Scheduler) BeforeSync ¶ added in v0.14.0
func (s *Scheduler) BeforeSync(hooks ...BeforeHook) *Scheduler
BeforeSync adds custom before hooks.
func (*Scheduler) Consume ¶ added in v0.14.0
func (s *Scheduler) Consume(ctx context.Context, remoteURL string, cfn ConsumeFunc) (*CronJob, error)
Consume starts a consumer job.
func (*Scheduler) ConsumeWith ¶ added in v0.16.0
func (s *Scheduler) ConsumeWith(csm Consumer, cfn ConsumeFunc) (*CronJob, error)
ConsumeWith starts a consumer job with an existing consumer.
Example ¶
package main
import (
"context"
"fmt"
"time"
"github.com/bsm/bfs"
"github.com/bsm/feedx"
)
func main() {
ctx := context.TODO()
// create an mock object
obj := bfs.NewInMemObject("todos.ndjson")
defer obj.Close()
// create a consumer
csm := feedx.NewConsumerForRemote(obj)
defer csm.Close()
job, err := feedx.Every(time.Hour).
WithContext(ctx).
BeforeSync(func(_ int64) bool {
fmt.Println("1. Before sync")
return true
}).
AfterSync(func(_ *feedx.Status, err error) {
fmt.Printf("3. After sync - error:%v", err)
}).
ConsumeWith(csm, func(_ *feedx.Reader) error {
fmt.Println("2. Consuming feed")
return nil
})
if err != nil {
panic(err)
}
_ = job.Close()
}
Output: 1. Before sync 2. Consuming feed 3. After sync - error:<nil>
func (*Scheduler) Produce ¶ added in v0.14.0
func (s *Scheduler) Produce(ctx context.Context, remoteURL string, pfn ProduceFunc) (*CronJob, error)
Produce starts a producer job.
func (*Scheduler) ProduceIncrementally ¶ added in v0.14.0
func (s *Scheduler) ProduceIncrementally(ctx context.Context, remoteURL string, pfn IncrementalProduceFunc) (*CronJob, error)
Produce starts an incremental producer job.
func (*Scheduler) ProduceIncrementallyWith ¶ added in v0.16.0
func (s *Scheduler) ProduceIncrementallyWith(pcr *IncrementalProducer, pfn IncrementalProduceFunc) (*CronJob, error)
ProduceIncrementallyFrom starts an incremental producer job with an existing producer.
func (*Scheduler) ProduceWith ¶ added in v0.16.0
func (s *Scheduler) ProduceWith(pcr *Producer, pfn ProduceFunc) (*CronJob, error)
ProduceWith starts a producer job with an existing producer.
Example ¶
package main
import (
"context"
"fmt"
"time"
"github.com/bsm/bfs"
"github.com/bsm/feedx"
)
func main() {
ctx := context.TODO()
// create an mock object
obj := bfs.NewInMemObject("todos.ndjson")
defer obj.Close()
// create a producer
pcr := feedx.NewProducerForRemote(obj)
defer pcr.Close()
job, err := feedx.Every(time.Hour).
WithContext(ctx).
BeforeSync(func(_ int64) bool {
fmt.Println("2. Before sync")
return true
}).
AfterSync(func(_ *feedx.Status, err error) {
fmt.Printf("4. After sync - error:%v", err)
}).
WithVersionCheck(func(_ context.Context) (int64, error) {
fmt.Println("1. Retrieve latest version")
return 101, nil
}).
ProduceWith(pcr, func(w *feedx.Writer) error {
fmt.Println("3. Producing feed")
return nil
})
if err != nil {
panic(err)
}
_ = job.Close()
}
Output: 1. Retrieve latest version 2. Before sync 3. Producing feed 4. After sync - error:<nil>
func (*Scheduler) WithContext ¶ added in v0.14.0
WithContext sets a custom context for the run.
func (*Scheduler) WithReaderOptions ¶ added in v0.14.0
func (s *Scheduler) WithReaderOptions(opt *ReaderOptions) *Scheduler
WithReaderOptions sets custom reader options for consumers.
func (*Scheduler) WithVersionCheck ¶ added in v0.14.0
func (s *Scheduler) WithVersionCheck(fn VersionCheck) *Scheduler
WithVersionCheck sets a custom version check for producers.
func (*Scheduler) WithWriterOptions ¶ added in v0.14.0
func (s *Scheduler) WithWriterOptions(opt *WriterOptions) *Scheduler
WithWriterOptions sets custom writer options for producers.
type Status ¶ added in v0.14.0
type Status struct {
// Skipped indicates the the sync was skipped, because there were no new changes.
Skipped bool
// LocalVersion indicates the local version before sync.
LocalVersion int64
// RemoteVersion indicates the remote version before sync.
RemoteVersion int64
// NumItems returns the number of items processed, either read of written.
NumItems int64
}
Status is returned by sync processes.
type VersionCheck ¶ added in v0.14.0
VersionCheck callbacks return the latest local version.
type Writer ¶ added in v0.4.0
type Writer struct {
// contains filtered or unexported fields
}
Writer encodes feeds to remote locations.
func (*Writer) NumWritten ¶ added in v0.4.0
NumWritten returns the number of written values.
type WriterOptions ¶ added in v0.4.0
type WriterOptions struct {
// Format specifies the format
// Default: auto-detected from URL path.
Format Format
// Compression specifies the compression type.
// Default: auto-detected from URL path.
Compression Compression
// Provides an optional version which is stored with the remote metadata.
// Default: 0
Version int64
}
WriterOptions configure the producer instance.