pipe

package
v0.0.0-...-4712b44 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 20, 2023 License: MIT Imports: 18 Imported by: 9

Documentation

Overview

Package pipe provides BGP message processing with callbacks.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInClosed = errors.New("input channel closed")
	ErrStopped  = errors.New("pipe stopped")
)
View Source
var (
	// pipe has finished starting
	EVENT_START = "bgpfix/pipe.START"

	// pipe is about to stop
	EVENT_STOP = "bgpfix/pipe.STOP"

	// could not parse the message before its callback
	EVENT_PARSE = "bgpfix/pipe.PARSE"

	// valid OPEN with a bigger message timestamp (seconds) made it to output
	EVENT_OPEN = "bgpfix/pipe.OPEN"

	// KEEPALIVE with a bigger message timestamp (seconds) made it to output
	EVENT_ALIVE = "bgpfix/pipe.ALIVE"

	// UPDATE with a bigger message timestamp (seconds) made it to output
	EVENT_UPDATE = "bgpfix/pipe.UPDATE"

	// session established (OPEN+KEEPALIVE made it to both sides)
	EVENT_ESTABLISHED = "bgpfix/pipe.ESTABLISHED"
)

a collection of events generated internally by pipe

View Source
var DefaultOptions = Options{
	Logger: &log.Logger,
	Caps:   true,
	Rbuf:   10,
	Rproc:  1,
	Lbuf:   10,
	Lproc:  1,
}

Default BGP pipe options

Functions

This section is empty.

Types

type Action

type Action byte

Action corresponds to m.Action values

const (
	// The default, zero action: keep processing as-is.
	ACTION_CONTINUE Action = 0

	// Keep the message for later use, do not re-use its memory.
	//
	// You must use this if you wish to re-inject the message,
	// or keep reference to some value inside the msg.
	//
	// Once set, you must not remove this action from a message
	// unless you know you are the sole owner of this message.
	ACTION_BORROW Action = 1 << iota

	// Drop the message immediately from the pipe.
	//
	// If you want to re-inject the message later, set ACTION_BORROW too,
	// and keep in mind the message will try to re-start after where
	// you dropped it, unless you call Context.Clear on it.
	ACTION_DROP

	// Accept the message immediately and write to pipe output.
	ACTION_ACCEPT
)

func (*Action) Add

func (ac *Action) Add(a Action)

Add adds a to action ac

func (*Action) Clear

func (ac *Action) Clear()

Clear clears all bits except for BORROW

func (Action) Is

func (ac Action) Is(a Action) bool

Is returns true iff a is set in ac

func (Action) Not

func (ac Action) Not(a Action) bool

IsNot returns true iff a is NOT set in ac

type Callback

type Callback struct {
	Id      int          // optional callback id number (zero means none)
	Name    string       // optional name
	Order   int          // the lower the order, the sooner callback is run
	Enabled *atomic.Bool // if non-nil, disables the callback unless true

	Pre  bool // run before non-pre callbacks?
	Raw  bool // if true, do not parse the message (which may already be parsed, but for other reasons)
	Post bool // run after non-post callbacks?

	Dst   msg.Dst      // if non-zero, limits the direction
	Types []msg.Type   // if non-empty, limits message types
	Func  CallbackFunc // the function to call
}

Callback represents a function to call for matching BGP messages

type CallbackFunc

type CallbackFunc func(m *msg.Msg) (add_action Action)

CallbackFunc processes message m. Optionally returns an Action to add to m's pipe.Context.Action.

type Direction

type Direction struct {
	// parent Pipe
	Pipe *Pipe

	// opposite pipe direction from the parent pipe
	Opposite *Direction

	// destination of messages flowing in this Direction
	Dst msg.Dst

	// In is the pipe input, where you write incoming messages.
	// Get new messages using Pipe.Get().
	In chan *msg.Msg

	// Out is the pipe output, where you read processed messages.
	// Dispose used messages using Pipe.Put().
	Out chan *msg.Msg

	// UNIX timestamp (seconds) of the last valid OPEN message
	LastOpen atomic.Int64

	// UNIX timestamp (seconds) of the last KEEPALIVE message
	LastAlive atomic.Int64

	// UNIX timestamp (seconds) of the last UPDATE message
	LastUpdate atomic.Int64

	// the last valid OPEN message that updated TimeOpen
	Open atomic.Pointer[msg.Open]
	// contains filtered or unexported fields
}

Direction represents a particular direction of messages in a Pipe

func (*Direction) CloseInput

func (d *Direction) CloseInput()

CloseInput safely closes the Input channel. The Output channel will eventually be closed too, after all queued messages have been processed.

func (*Direction) CloseOutput

func (d *Direction) CloseOutput()

CloseOutput safely closes the Output channel. Input handlers will keep running until Input is closed.

func (*Direction) Process

func (d *Direction) Process(input chan *msg.Msg, wg *sync.WaitGroup)

Process reads input, runs all callbacks on incoming messages, and forwards the result to d.Out.

The d.Out channel *can* be closed anytime, which will cause the resultant messages to be dropped on the floor (and possibly re-used).

Exits when input closes and is emptied. wg may be nil.

func (*Direction) ProcessMsg

func (d *Direction) ProcessMsg(m *msg.Msg)

ProcessMsg runs all callbacks on message m and forwards the result to d.Out.

func (*Direction) Read

func (d *Direction) Read(dst []byte) (int, error)

Read reads d.Out and writes raw BGP data to dst Must not be used concurrently. TODO: stats

func (*Direction) Stats

func (d *Direction) Stats() *Stats

Stats returns dir statistics FIXME: concurrent access

func (*Direction) Write

func (d *Direction) Write(src []byte) (n int, err error)

Write implements io.Writer and reads all BGP messages from src into dir.In. Copies bytes from src. Consumes what it can, buffers the remainder if needed. Returns n equal to len(src). May block if dir.In is full.

In case of a non-nil err, call Write(nil) to re-try using the buffered remainder, untill it returns a nil err.

Must not be used concurrently.

func (*Direction) WriteMsg

func (d *Direction) WriteMsg(m *msg.Msg) (err error)

WriteMsg safely sends m to d.In, returning an error instead of a panic if d.In is closed.

func (*Direction) WriteTo

func (d *Direction) WriteTo(w io.Writer) (int64, error)

WriteTo implements io.WriterTo interface, writing raw BGP data to w

type Event

type Event struct {
	Pipe *Pipe     `json:"-"`              // parent pipe
	Seq  uint64    `json:"seq,omitempty"`  // event sequence number
	Time time.Time `json:"time,omitempty"` // event timestamp

	Type  string   `json:"type"`  // type, usually "lib/pkg.NAME"
	Dst   msg.Dst  `json:"dst"`   // optional destination
	Msg   *msg.Msg `json:"-"`     // optional message that caused the event
	Error error    `json:"err"`   // optional error value
	Value any      `json:"value"` // optional value, type-specific
}

Event represents an arbitrary event for a BGP pipe. Seq and Time will be set by the handler if non-zero.

func (*Event) String

func (ev *Event) String() string

String returns the event Type, or "(nil)" if ev is nil

type Handler

type Handler struct {
	Id      int          // optional handler id number (zero means none)
	Name    string       // optional name
	Order   int          // the lower the order, the sooner handler is run
	Enabled *atomic.Bool // if non-nil, disables the handler unless true

	Pre  bool // run before non-pre handlers?
	Post bool // run after non-post handlers?

	Dst   msg.Dst     // if non-zero, limits the direction
	Types []string    // if non-empty, limits event types
	Func  HandlerFunc // the function to call
}

Handler represents a function to call for matching pipe events

type HandlerFunc

type HandlerFunc func(ev *Event) (keep_event bool)

HandlerFunc handles event ev. If returns false, unregisters the parent Handler for all Types.

type Options

type Options struct {
	Logger  *zerolog.Logger // if nil logging is disabled
	MsgPool *sync.Pool      // optional pool for msg.Msg

	Caps bool // overwrite pipe.Caps using OPEN messages?

	Rbuf     int  // R channels buffer length
	Rproc    int  // number of R input processors
	Rreverse bool // reverse the order in R?

	Lbuf     int  // L channels buffer length
	Lproc    int  // number of L input processors
	Lreverse bool // reverse the order in L?

	Callbacks []*Callback // message callbacks
	Handlers  []*Handler  // event handlers
}

BGP pipe options

func (*Options) AddCallback

func (o *Options) AddCallback(cbf CallbackFunc, tpl *Callback) *Callback

AddCallbacks adds a callback function using tpl as its template (if non-nil). It returns the added Callback, which can be further configured.

func (*Options) AddHandler

func (o *Options) AddHandler(hdf HandlerFunc, tpl *Handler) *Handler

AddHandler adds a handler function using tpl as its template (if non-nil). It returns the added Handler, which can be further configured.

func (*Options) OnEstablished

func (o *Options) OnEstablished(hdf HandlerFunc) *Handler

OnEstablished request hdf to be called when the BGP session is established.

func (*Options) OnEvent

func (o *Options) OnEvent(hdf HandlerFunc, types ...string) *Handler

OnEvent request hdf to be called for given event types. If no types provided, it requests to call hdf on *every* event.

func (*Options) OnEventPost

func (o *Options) OnEventPost(hdf HandlerFunc, types ...string) *Handler

OnEventPost is like OnEvent but requests to run hdf after other handlers

func (*Options) OnEventPre

func (o *Options) OnEventPre(hdf HandlerFunc, types ...string) *Handler

OnEventPre is like OnEvent but requests to run hdf before other handlers

func (*Options) OnMsg

func (o *Options) OnMsg(cbf CallbackFunc, dst msg.Dst, types ...msg.Type) *Callback

OnMsg adds a callback for all messages of given types (or all types if not specified).

func (*Options) OnMsgPost

func (o *Options) OnMsgPost(cbf CallbackFunc, dst msg.Dst, types ...msg.Type) *Callback

OnMsgPost is like OnMsg but requests to run cb after other callbacks

func (*Options) OnMsgPre

func (o *Options) OnMsgPre(cbf CallbackFunc, dst msg.Dst, types ...msg.Type) *Callback

OnMsgPre is like OnMsg but requests to run cb before other callbacks

func (*Options) OnParseError

func (o *Options) OnParseError(hdf HandlerFunc) *Handler

OnParseError request hdf to be called on BGP message parse error.

func (*Options) OnStart

func (o *Options) OnStart(hdf HandlerFunc) *Handler

OnStart request hdf to be called after the pipe starts.

func (*Options) OnStop

func (o *Options) OnStop(hdf HandlerFunc) *Handler

OnStop request hdf to be called when the pipe stops.

type Pipe

type Pipe struct {
	*zerolog.Logger

	Options Options    // pipe options; modify before Start()
	Caps    caps.Caps  // BGP capability context; always thread-safe
	L       *Direction // messages for L, call Start() before use
	R       *Direction // messages for R; call Start() before use

	// generic Key-Value store, always thread-safe
	KV *xsync.MapOf[string, any]
	// contains filtered or unexported fields
}

Pipe processes BGP messages exchanged between two BGP peers, L (for "left" or "local") and R (for "right" or "remote"), allowing for building callback-based pipelines in both directions, plus an internal event system. It can also track OPEN messages to find out the negotiated BGP capabilities on a session.

Write messages destined for R to Pipe.R.In, and read the results from Pipe.R.Out. Similarly, write messages destined for L to Pipe.L.In, and read the results from Pipe.L.Out.

Use NewPipe() to get a new object and modify its Pipe.Options. Then call Pipe.Start() to start the message flow.

func NewPipe

func NewPipe(ctx context.Context) *Pipe

NewPipe returns a new pipe, which can be configured through its Options. To start/stop the pipe, call Start() and Stop().

func (*Pipe) Event

func (p *Pipe) Event(et string, args ...any) (sent bool)

Event announces a new event type et to the pipe, with optional arguments. The first msg.Dst argument is used as ev.Dst. The first *msg.Msg is used as ev.Msg and borrowed (add ACTION_BORROW). All error arguments are joined together into single ev.Error. The remaining arguments are used as ev.Val. Returns true iff the event was queued for processing.

func (*Pipe) Get

func (p *Pipe) Get() (m *msg.Msg)

Get returns empty msg from pool, or a new msg object

func (*Pipe) Put

func (p *Pipe) Put(m *msg.Msg)

Put resets msg and returns it to pool, which might free it

func (*Pipe) Start

func (p *Pipe) Start()

Start starts given number of r/t message handlers in background, by default r/t = 1/1 (single-threaded, strictly ordered processing).

func (*Pipe) Started

func (p *Pipe) Started() bool

Started returns true iff Start() has already been called = pipe is (being) started.

func (*Pipe) Stop

func (p *Pipe) Stop()

Stop stops all handlers and blocks till handlers finish. Pipe must not be used again past this point. Closes all input channels, which should eventually close all output channels, possibly after this function returns.

func (*Pipe) Stopped

func (p *Pipe) Stopped() bool

Stopped returns true iff Stop() has already been called = pipe is (being) stopped.

func (*Pipe) Wait

func (p *Pipe) Wait()

Wait blocks until the pipe starts and stops completely.

type PipeContext

type PipeContext struct {
	Pipe *Pipe      // pipe processing the message
	Dir  *Direction // direction processing the message

	// Optional id of message source, by default 0 (disabled).
	// Allows for detecting own messages.
	SourceId int

	// Optional callback Id filter, by default 0 (disabled).
	// Allows for injecting messages at arbitrary pipe location.
	// Applies only to callbacks with a non-zero Id.
	// If >0, skip callback if its Id < SkipId (or Id > SkipId in reverse mode).
	// If <0, skip callback if its Id <= -SkipId (or Id >= -SkipId in reverse mode).
	SkipId int

	Callback *Callback // currently run callback
	Action   Action    // requested message actions
	// contains filtered or unexported fields
}

PipeContext tracks message processing progress in a pipe

func Context

func Context(m *msg.Msg) *PipeContext

Context returns pipe Context inside message m, updating m.Value if needed.

func (*PipeContext) Clear

func (pc *PipeContext) Clear()

Clear resets pc, but preserves ACTION_BORROW if set.

func (*PipeContext) FromJSON

func (pc *PipeContext) FromJSON(src []byte) error

TODO

func (*PipeContext) HasKV

func (pc *PipeContext) HasKV() bool

HasKV returns true iff the context already has a Key-Value store.

func (*PipeContext) KV

func (pc *PipeContext) KV() map[string]any

KV returns a generic Key-Value store, creating it first if needed.

func (*PipeContext) NoCallbacks

func (pc *PipeContext) NoCallbacks()

NoCallbacks requests the message to skip running any callbacks

func (*PipeContext) Reset

func (pc *PipeContext) Reset()

Reset resets pc to empty state

func (*PipeContext) ToJSON

func (pc *PipeContext) ToJSON(dst []byte) []byte

TODO

type Stats

type Stats struct {
	Parsed  uint64
	Short   uint64
	Garbled uint64
}

BGP dir statistics

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL