Skip to content
This repository was archived by the owner on Sep 21, 2023. It is now read-only.
This repository was archived by the owner on Sep 21, 2023. It is now read-only.

Investigate changing the pipeline order to improve performance #44

@cmacknz

Description

@cmacknz

Let's consider the case where the shipper is configured to use a disk queue with the Elasticsearch output. Let's also assume we use the default protobuf encoding over gRPC. If we reuse the existing structure of the beats publishing pipeline, the data flow will look like:

flowchart LR

A[Input] -->|Protobuf| B[Server] 
B --> C[Processors] 
C -->|CBOR| D[Disk Queue] 
D -->|JSON| E[Elasticsearch]
Loading

The diagram shows that the data must be serialized multiple times:

  1. To the protobuf wire format when the input sends events to the shipper using gRPC. This could optionally be replaced with JSON, but we would likely still need to deserialize it regardless.
  2. To CBOR when writing to the disk queue.
  3. To JSON when writing to Elasticsearch.

It seems extremely worthwhile to restructure the pipeline to eliminate the amount of times the data must be serialized:

flowchart LR

A[Input] -->|Protobuf| B[Server] 
B -->|Protobuf| C[Disk Queue] 
C --> D[Processors] 
D -->|JSON| E[Elasticsearch]
Loading

In this case we would change the disk queue's serialization format to protobuf, deferring deserialization until after data as been read from the queue. This leaves us with a single transformation from protobuf, to the shipper's internal data format, and then back to JSON (or whatever encoding the output requires).

If the memory queue were used instead of the disk queue, we could use the same strategy of storing serialized events in the memory queue and only decoding them when they are read from the queue. This would give us a way to deterministically calculate the number of bytes stored in the memory queue. Currently the memory queue size must be specified in events.

The output of this issue should be a proof of concept demonstrating that this reordering of the pipeline is possible and has the expected benefits. At minimum the work will need to include:

  1. Modifying the gRPC server in the shipper to stop deserializing messages so they can be passed directly to the queue. The ideal option would be to keep the existing RPC definitions but implement a no-op codec. See the gRPC encoding documentation. We may need to write a custom set of RPC handlers instead of generating them:
    func _Producer_PublishEvents_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
    A fallback option is to use messages that just wrap a bytes payload with the required message type and serialization documented in the RPC call.
  2. Modify the disk queue to use protobuf serialization. At minimum this depends on Libbeat disk queue should support custom serialization #41 and possibly some of the work for [Meta][Feature] Implement encrypted disk queue #33 to use the new disk queue headers.
  3. Ensure we can still return errors back to clients (after deserialization or processing, for example). [Meta][Feature] Implement end to end acknowledgement #9 should provide a mechanism for this.
  4. Benchmark the performance of the modified pipeline and compare it to the original configuration. We do not have a set of repeatable performance tests yet, so we may choose to defer this work until we do.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions