Documentation
¶
Overview ¶
Package pipe provides BGP message processing with callbacks.
Index ¶
- Variables
- type Action
- type Callback
- type CallbackFunc
- type Direction
- func (d *Direction) CloseInput()
- func (d *Direction) CloseOutput()
- func (d *Direction) Process(input chan *msg.Msg, wg *sync.WaitGroup)
- func (d *Direction) ProcessMsg(m *msg.Msg)
- func (d *Direction) Read(dst []byte) (int, error)
- func (d *Direction) Stats() *Stats
- func (d *Direction) Write(src []byte) (n int, err error)
- func (d *Direction) WriteMsg(m *msg.Msg) (err error)
- func (d *Direction) WriteTo(w io.Writer) (int64, error)
- type Event
- type Handler
- type HandlerFunc
- type Options
- func (o *Options) AddCallback(cbf CallbackFunc, tpl *Callback) *Callback
- func (o *Options) AddHandler(hdf HandlerFunc, tpl *Handler) *Handler
- func (o *Options) OnEstablished(hdf HandlerFunc) *Handler
- func (o *Options) OnEvent(hdf HandlerFunc, types ...string) *Handler
- func (o *Options) OnEventPost(hdf HandlerFunc, types ...string) *Handler
- func (o *Options) OnEventPre(hdf HandlerFunc, types ...string) *Handler
- func (o *Options) OnMsg(cbf CallbackFunc, dst msg.Dst, types ...msg.Type) *Callback
- func (o *Options) OnMsgPost(cbf CallbackFunc, dst msg.Dst, types ...msg.Type) *Callback
- func (o *Options) OnMsgPre(cbf CallbackFunc, dst msg.Dst, types ...msg.Type) *Callback
- func (o *Options) OnParseError(hdf HandlerFunc) *Handler
- func (o *Options) OnStart(hdf HandlerFunc) *Handler
- func (o *Options) OnStop(hdf HandlerFunc) *Handler
- type Pipe
- type PipeContext
- type Stats
Constants ¶
This section is empty.
Variables ¶
var ( ErrInClosed = errors.New("input channel closed") ErrStopped = errors.New("pipe stopped") )
var ( // pipe has finished starting EVENT_START = "bgpfix/pipe.START" // pipe is about to stop EVENT_STOP = "bgpfix/pipe.STOP" // could not parse the message before its callback EVENT_PARSE = "bgpfix/pipe.PARSE" // valid OPEN with a bigger message timestamp (seconds) made it to output EVENT_OPEN = "bgpfix/pipe.OPEN" // KEEPALIVE with a bigger message timestamp (seconds) made it to output EVENT_ALIVE = "bgpfix/pipe.ALIVE" // UPDATE with a bigger message timestamp (seconds) made it to output EVENT_UPDATE = "bgpfix/pipe.UPDATE" // session established (OPEN+KEEPALIVE made it to both sides) EVENT_ESTABLISHED = "bgpfix/pipe.ESTABLISHED" )
a collection of events generated internally by pipe
var DefaultOptions = Options{ Logger: &log.Logger, Caps: true, Rbuf: 10, Rproc: 1, Lbuf: 10, Lproc: 1, }
Default BGP pipe options
Functions ¶
This section is empty.
Types ¶
type Action ¶
type Action byte
Action corresponds to m.Action values
const ( // The default, zero action: keep processing as-is. ACTION_CONTINUE Action = 0 // Keep the message for later use, do not re-use its memory. // // You must use this if you wish to re-inject the message, // or keep reference to some value inside the msg. // // Once set, you must not remove this action from a message // unless you know you are the sole owner of this message. ACTION_BORROW Action = 1 << iota // Drop the message immediately from the pipe. // // If you want to re-inject the message later, set ACTION_BORROW too, // and keep in mind the message will try to re-start after where // you dropped it, unless you call Context.Clear on it. ACTION_DROP // Accept the message immediately and write to pipe output. ACTION_ACCEPT )
type Callback ¶
type Callback struct {
Id int // optional callback id number (zero means none)
Name string // optional name
Order int // the lower the order, the sooner callback is run
Enabled *atomic.Bool // if non-nil, disables the callback unless true
Pre bool // run before non-pre callbacks?
Raw bool // if true, do not parse the message (which may already be parsed, but for other reasons)
Post bool // run after non-post callbacks?
Dst msg.Dst // if non-zero, limits the direction
Types []msg.Type // if non-empty, limits message types
Func CallbackFunc // the function to call
}
Callback represents a function to call for matching BGP messages
type CallbackFunc ¶
CallbackFunc processes message m. Optionally returns an Action to add to m's pipe.Context.Action.
type Direction ¶
type Direction struct {
// parent Pipe
Pipe *Pipe
// opposite pipe direction from the parent pipe
Opposite *Direction
// destination of messages flowing in this Direction
Dst msg.Dst
// In is the pipe input, where you write incoming messages.
// Get new messages using Pipe.Get().
In chan *msg.Msg
// Out is the pipe output, where you read processed messages.
// Dispose used messages using Pipe.Put().
Out chan *msg.Msg
// UNIX timestamp (seconds) of the last valid OPEN message
LastOpen atomic.Int64
// UNIX timestamp (seconds) of the last KEEPALIVE message
LastAlive atomic.Int64
// UNIX timestamp (seconds) of the last UPDATE message
LastUpdate atomic.Int64
// the last valid OPEN message that updated TimeOpen
Open atomic.Pointer[msg.Open]
// contains filtered or unexported fields
}
Direction represents a particular direction of messages in a Pipe
func (*Direction) CloseInput ¶
func (d *Direction) CloseInput()
CloseInput safely closes the Input channel. The Output channel will eventually be closed too, after all queued messages have been processed.
func (*Direction) CloseOutput ¶
func (d *Direction) CloseOutput()
CloseOutput safely closes the Output channel. Input handlers will keep running until Input is closed.
func (*Direction) Process ¶
Process reads input, runs all callbacks on incoming messages, and forwards the result to d.Out.
The d.Out channel *can* be closed anytime, which will cause the resultant messages to be dropped on the floor (and possibly re-used).
Exits when input closes and is emptied. wg may be nil.
func (*Direction) ProcessMsg ¶
ProcessMsg runs all callbacks on message m and forwards the result to d.Out.
func (*Direction) Read ¶
Read reads d.Out and writes raw BGP data to dst Must not be used concurrently. TODO: stats
func (*Direction) Write ¶
Write implements io.Writer and reads all BGP messages from src into dir.In. Copies bytes from src. Consumes what it can, buffers the remainder if needed. Returns n equal to len(src). May block if dir.In is full.
In case of a non-nil err, call Write(nil) to re-try using the buffered remainder, untill it returns a nil err.
Must not be used concurrently.
type Event ¶
type Event struct {
Pipe *Pipe `json:"-"` // parent pipe
Seq uint64 `json:"seq,omitempty"` // event sequence number
Time time.Time `json:"time,omitempty"` // event timestamp
Type string `json:"type"` // type, usually "lib/pkg.NAME"
Dst msg.Dst `json:"dst"` // optional destination
Msg *msg.Msg `json:"-"` // optional message that caused the event
Error error `json:"err"` // optional error value
Value any `json:"value"` // optional value, type-specific
}
Event represents an arbitrary event for a BGP pipe. Seq and Time will be set by the handler if non-zero.
type Handler ¶
type Handler struct {
Id int // optional handler id number (zero means none)
Name string // optional name
Order int // the lower the order, the sooner handler is run
Enabled *atomic.Bool // if non-nil, disables the handler unless true
Pre bool // run before non-pre handlers?
Post bool // run after non-post handlers?
Dst msg.Dst // if non-zero, limits the direction
Types []string // if non-empty, limits event types
Func HandlerFunc // the function to call
}
Handler represents a function to call for matching pipe events
type HandlerFunc ¶
HandlerFunc handles event ev. If returns false, unregisters the parent Handler for all Types.
type Options ¶
type Options struct {
Logger *zerolog.Logger // if nil logging is disabled
MsgPool *sync.Pool // optional pool for msg.Msg
Caps bool // overwrite pipe.Caps using OPEN messages?
Rbuf int // R channels buffer length
Rproc int // number of R input processors
Rreverse bool // reverse the order in R?
Lbuf int // L channels buffer length
Lproc int // number of L input processors
Lreverse bool // reverse the order in L?
Callbacks []*Callback // message callbacks
Handlers []*Handler // event handlers
}
BGP pipe options
func (*Options) AddCallback ¶
func (o *Options) AddCallback(cbf CallbackFunc, tpl *Callback) *Callback
AddCallbacks adds a callback function using tpl as its template (if non-nil). It returns the added Callback, which can be further configured.
func (*Options) AddHandler ¶
func (o *Options) AddHandler(hdf HandlerFunc, tpl *Handler) *Handler
AddHandler adds a handler function using tpl as its template (if non-nil). It returns the added Handler, which can be further configured.
func (*Options) OnEstablished ¶
func (o *Options) OnEstablished(hdf HandlerFunc) *Handler
OnEstablished request hdf to be called when the BGP session is established.
func (*Options) OnEvent ¶
func (o *Options) OnEvent(hdf HandlerFunc, types ...string) *Handler
OnEvent request hdf to be called for given event types. If no types provided, it requests to call hdf on *every* event.
func (*Options) OnEventPost ¶
func (o *Options) OnEventPost(hdf HandlerFunc, types ...string) *Handler
OnEventPost is like OnEvent but requests to run hdf after other handlers
func (*Options) OnEventPre ¶
func (o *Options) OnEventPre(hdf HandlerFunc, types ...string) *Handler
OnEventPre is like OnEvent but requests to run hdf before other handlers
func (*Options) OnMsg ¶
OnMsg adds a callback for all messages of given types (or all types if not specified).
func (*Options) OnParseError ¶
func (o *Options) OnParseError(hdf HandlerFunc) *Handler
OnParseError request hdf to be called on BGP message parse error.
func (*Options) OnStart ¶
func (o *Options) OnStart(hdf HandlerFunc) *Handler
OnStart request hdf to be called after the pipe starts.
func (*Options) OnStop ¶
func (o *Options) OnStop(hdf HandlerFunc) *Handler
OnStop request hdf to be called when the pipe stops.
type Pipe ¶
type Pipe struct {
*zerolog.Logger
Options Options // pipe options; modify before Start()
Caps caps.Caps // BGP capability context; always thread-safe
L *Direction // messages for L, call Start() before use
R *Direction // messages for R; call Start() before use
// generic Key-Value store, always thread-safe
KV *xsync.MapOf[string, any]
// contains filtered or unexported fields
}
Pipe processes BGP messages exchanged between two BGP peers, L (for "left" or "local") and R (for "right" or "remote"), allowing for building callback-based pipelines in both directions, plus an internal event system. It can also track OPEN messages to find out the negotiated BGP capabilities on a session.
Write messages destined for R to Pipe.R.In, and read the results from Pipe.R.Out. Similarly, write messages destined for L to Pipe.L.In, and read the results from Pipe.L.Out.
Use NewPipe() to get a new object and modify its Pipe.Options. Then call Pipe.Start() to start the message flow.
func NewPipe ¶
NewPipe returns a new pipe, which can be configured through its Options. To start/stop the pipe, call Start() and Stop().
func (*Pipe) Event ¶
Event announces a new event type et to the pipe, with optional arguments. The first msg.Dst argument is used as ev.Dst. The first *msg.Msg is used as ev.Msg and borrowed (add ACTION_BORROW). All error arguments are joined together into single ev.Error. The remaining arguments are used as ev.Val. Returns true iff the event was queued for processing.
func (*Pipe) Start ¶
func (p *Pipe) Start()
Start starts given number of r/t message handlers in background, by default r/t = 1/1 (single-threaded, strictly ordered processing).
func (*Pipe) Started ¶
Started returns true iff Start() has already been called = pipe is (being) started.
func (*Pipe) Stop ¶
func (p *Pipe) Stop()
Stop stops all handlers and blocks till handlers finish. Pipe must not be used again past this point. Closes all input channels, which should eventually close all output channels, possibly after this function returns.
type PipeContext ¶
type PipeContext struct {
Pipe *Pipe // pipe processing the message
Dir *Direction // direction processing the message
// Optional id of message source, by default 0 (disabled).
// Allows for detecting own messages.
SourceId int
// Optional callback Id filter, by default 0 (disabled).
// Allows for injecting messages at arbitrary pipe location.
// Applies only to callbacks with a non-zero Id.
// If >0, skip callback if its Id < SkipId (or Id > SkipId in reverse mode).
// If <0, skip callback if its Id <= -SkipId (or Id >= -SkipId in reverse mode).
SkipId int
Callback *Callback // currently run callback
Action Action // requested message actions
// contains filtered or unexported fields
}
PipeContext tracks message processing progress in a pipe
func Context ¶
func Context(m *msg.Msg) *PipeContext
Context returns pipe Context inside message m, updating m.Value if needed.
func (*PipeContext) Clear ¶
func (pc *PipeContext) Clear()
Clear resets pc, but preserves ACTION_BORROW if set.
func (*PipeContext) HasKV ¶
func (pc *PipeContext) HasKV() bool
HasKV returns true iff the context already has a Key-Value store.
func (*PipeContext) KV ¶
func (pc *PipeContext) KV() map[string]any
KV returns a generic Key-Value store, creating it first if needed.
func (*PipeContext) NoCallbacks ¶
func (pc *PipeContext) NoCallbacks()
NoCallbacks requests the message to skip running any callbacks