Skip to content
This repository was archived by the owner on Sep 21, 2023. It is now read-only.
This repository was archived by the owner on Sep 21, 2023. It is now read-only.

The PublishEvents RPC should block when the queue is full #84

@cmacknz

Description

@cmacknz

Implementation issue following the discussion in #81.

Specifically the RPC should block until at least one event is accepted into the queue. The RPC should not block until all events in the batch have been accepted into the queue.

The shipper queue's publish interface already blocks when the queue is full based on the underlying Beats' memory and disk queue implementations:

producer := eventQueue.Producer(beatsqueue.ProducerConfig{})
return &Queue{eventQueue: eventQueue, producer: producer}, nil
}
func (queue *Queue) Publish(event *messages.Event) (EntryID, error) {
if !queue.producer.Publish(event) {
return EntryID(0), ErrQueueIsFull
}

  1. Beats' memory queue publish implementation: https://github.com/elastic/beats/blob/e6db9a53f2fff60b350153b03407bc38b21bf0b1/libbeat/publisher/queue/memqueue/produce.go#L128-L136
  2. Beats' disk queue publish implementation: https://github.com/elastic/beats/blob/e6db9a53f2fff60b350153b03407bc38b21bf0b1/libbeat/publisher/queue/diskqueue/producer.go#L52-L62

One missing piece with the existing Publish method is that it does not accept a context.Context as input. This means that when a PublishEvents RPC call is made with a timeout then that timeout will be ignored if the RPC is blocked in the queue Publish method. We will need to modify the queue interface to accept a context.Context and propagate the RPC context to it in the PublishEvents method.

Metadata

Metadata

Assignees

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions