Skip to content

Add OakIncrementalIndex to Druid#7676

Closed
sanastas wants to merge 15 commits intoapache:masterfrom
yonigottesman:oak_druid
Closed

Add OakIncrementalIndex to Druid#7676
sanastas wants to merge 15 commits intoapache:masterfrom
yonigottesman:oak_druid

Conversation

@sanastas
Copy link

@sanastas sanastas commented May 16, 2019

Introduction
This PR suggests an implementation of IncrementalIndex for Druid that is based on OakMap -- an off-heap (direct-allocation) concurrent navigable (ordered) KV-map. OakMap is part of the Oak open source project.

OakMap is a thread-safe concurrent data structure (that is, Druid does not need to manage concurrency on its own). It offers two API versions - a standard JDK ConcurrentNavigableMap and its zero-copy variant that provides direct access to off-heap storage for selective retrieval of the stored data. It also features zero-copy update-in-place (lambda) function execution. OakMap exploits JDK Unsafe mechanisms to speed up copy to/from off-heap buffers. Its data layout is cache-friendly. It supports ascending and descending iterators (scans) at the same speed. It is backed by an internal memory allocator, which relieves Java GC from most of its work.

OakMap is ideal for building very large segment indexes (16+ GB). Our initial benchmarks show excellent scaling on multicore, near-optimal use of JVM memory, and up to 50% lift in ingestion speed. See the OakMap microbenchmark results:
OAK PERFORMANCE.pdf
Soon we will publish OakMap-based Incremental Index results (OakIncrementalIndex).

Using OakMap opens new opportunities for real-time indexing in Druid, including:

  • Faster data ingestion rates.

  • Building fewer, larger segments, thereby reducing the query latencies and merging/compaction overhead.

  • Real-time queries of segments currently being built.

  • Much faster, zero-overhead reverse scans.

It might also suggest future re-architecting of parts of Druid - for example, running each indexing task in a standalone JVM (Peon) - for better CPU and RAM utilization.

Activating Oak
The PR provides a mechanism to configure instantiation of the OakMap-based concurrent index in Druid.
To enable Oak, add the following to middleManager configuration file, which resides in quickstart/tutorial/conf/druid/middleManager/runtime.properties
druid.indexer.useOak=true
and update Task launch parameters (in the same file) to allow enough direct memory:
druid.indexer.runner.javaOpts=-server -Xms1g -Xmx1g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+ExitOnOutOfMemoryError -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -XX:MaxDirectMemorySize=16g

Some Design Points
All the changes Oak imposes on Druid are encapsulated in org.apache.druid.segment.incremental package. Oak integration results in a new IncrementalIndex variant working with buffer aggregators:
public class OakIncrementalIndex extends IncrementalIndex<BufferAggregator>

In order to work with underlying ByteBuffer (index row serialization) more efficiently we also present an OakIncrementalIndexRow:
public class OakIncrementalIndexRow extends IncrementalIndexRow

Oak requires keys and values to be serialized (deserialized) and compared, thus OakKeySerializer, OakValueSerializer, and OakKeysComparator classes were added (deserialization is a Serialize method):

public class OakKeySerializer implements OakSerializer<IncrementalIndexRow>
public class OakValueSerializer implements OakSerializer<Row>
public class OakKeysComparator implements OakComparator<IncrementalIndexRow>

@b-slim
Copy link
Contributor

b-slim commented May 17, 2019

Just one point, not sure what lock-free means here but when i do grep on synchronized i see the following and some of those locks are used as part of memory allocation which i guess it is part of the hot path.

core/src/main/java/com/oath/oak/InternalOakMap.java:            synchronized (childToAdd = iterChildren.next()) {
core/src/main/java/com/oath/oak/NativeAllocator/BlocksPool.java:            synchronized (BlocksPool.class) { // can be easily changed to lock-free
core/src/main/java/com/oath/oak/NativeAllocator/BlocksPool.java:                synchronized (BlocksPool.class) { // can be easily changed to lock-free
core/src/main/java/com/oath/oak/NativeAllocator/BlocksPool.java:            synchronized (BlocksPool.class) { // can be easily changed to lock-free
core/src/main/java/com/oath/oak/NativeAllocator/OakNativeMemoryAllocator.java:                    synchronized (this) {
core/src/main/java/com/oath/oak/NativeAllocator/OakNativeMemoryAllocator.java:            synchronized (this) {
core/src/main/java/com/oath/oak/NativeAllocator/OakNativeMemoryAllocator.java:            synchronized (this) {

@sanastas
Copy link
Author

Hi @b-slim ,

First, it is great you are taking a look :) , thanks!

Second, we are trying now different Memory Allocators and Memory Managers. The implementation is not final, so we allow ourselves to make it easier and just to check the performance. The locks you see are for singleton initialization just upon a start. Most-likely, Memory Allocator/Pool will not continue to be singleton.
Anyway, there are currently no locks on the hot path, and no locks are going to be there.

@fjy
Copy link
Contributor

fjy commented May 19, 2019

@sanastas Are there any Druid benchmarks?

@b-slim
Copy link
Contributor

b-slim commented May 20, 2019

@sanastas that is simple not true, some of those locks are part of allocate calls
and some are part of iterator loop to do some stuff that i doubt it is static ...

Looking at the allocator briefly i see bunch of red flags, like possible internal fragmentations up to factor 2 and not clear way to deal with external fragmentations ... Memory Management is a very hard thing in general and make it lock free is not going to be simple task .... Writing one form scratch is a common source of bugs, In fact most of the big data projects avoid this task and use the Netty allocator.
The take our of this, current state is not lock free thus lets avoid confusion, (FYI even on the oak github page i do not see it called lock-free so not sure why you are calling it lock free).
But the most important thing to add here is how OKA memory management will fit with how Druid manages Direct memory.

@sanastas
Copy link
Author

@fjy , thanks for being interested, we are going to publish some Druid benchmarks soon

@sanastas
Copy link
Author

Hi Slim (@b-slim ),

I am impressed with such a thorough review! Thanks! It looks like the issue of lock-free programming and lock-free memory management is as close to your heart as it is close to mine! This is the reason I did PhD in this field :) I am glad to find an ally! What was your PhD thesis about? I have some answers below. If I understand something wrongly, I would be happy to hear and learn.

that is simple not true, some of those locks are part of allocate calls and some are part of iterator loop to do some stuff that i doubt it is static ...

I would be happy to speak with you about internal Oak implementation. As it is time consuming to discuss it via chat, I suggest us to arrange us an off-line discussion via Hangouts for example. Let me know what time-slots suit you.

Looking at the allocator briefly i see bunch of red flags, like possible internal fragmentations up to factor 2 and not clear way to deal with external fragmentations … Memory Management is a very hard thing in general and make it lock free is not going to be simple task ....

We're very well aware that the full lock-free Memory Management is a hard thing and actually not available yet. However, the Druid's usage of IncrementalIndex is a restricted one, IMHO. For example, there are no concurrent deletions, thus dealing with internal fragmentation should not be an issue. Dealing with external fragmentation might be an issue of tuning per specific Block size and common workloads. Bottom line, we had no intent to write a perfect general-purpose full lock-free Memory Management, but a specialized memory management. Not to say that it is not final and can be fixed/replaced if some objective issues are found.

Writing one form scratch is a common source of bugs, In fact most of the big data projects avoid this task and use the Netty allocator.

As you have seen our memory manager and memory allocator is quite restricted. So we believe it can perform faster and it can be easier to find bugs there. Thanks for referring to the Netty Allocator. I have taken a brief look on Netty Allocator, it appears to be based on reference-counting and their underlying data structure is their own ByteBuf (not ByteBuffer). It is assumed that epoch-based GC is more efficient in performance than reference-counting. But mostly important, I am not sure we can convert ByteBuf into ByteBuffer without any complications or performance issues. Nor can we require Druid to start working with Netty ByteBufs. However, we will continue investigation and if Netty Allocator can be used and gives a better performance, no problems to switch to it.

The take our of this, current state is not lock free thus lets avoid confusion, (FYI even on the oak github page i do not see it called lock-free so not sure why you are calling it lock free).

Generally speaking lock-free is about the data structure not about the underlying memory management. As a full lock-free memory management (GC+allocation) isn’t available yet, not any of currently existing lock-free data structures can be called lock-free and neither their performance results are valid… Huge amount of scientific papers wasted :) . But I will remove the "lock-free” term from anywhere it appears here.

As I said, I am a big fan of lock-free programming, and I will be the first one to fight to eliminate any lock anywhere. But in reality what does matter is the final performance results and in order to see gains from lock-free programming we need big contention of multi-threading, is it the case in Druid’s IncrementalIndex? All the benchmarks are single threaded…

I am OK with lock not on the hot path and with lock that doesn't impact the performance at all.

But the most important thing to add here is how OKA memory management will fit with how Druid manages Direct memory.

Exactly! This is the mostly important! And here we need your input! How does Druid manages Direct memory? Is there some policy? Documentation to read? Pointers to code?

@sanastas
Copy link
Author

Here we have some results for org.apache.druid.benchmark.indexing.IndexIngestionBenchmark (IndexIngestionBenchmark)

We try to insert 3M rows, that should be about 4GB data. We try to give the same amount of data for Oak case and for native Druid IncrementalIndex. Pay attention that as IndexIngestionBenchmark is written the rows are generated prior to the benchmark and hold almost 4GB of on-heap memory anyway. So native Druid IncrementalIndex has some advantage, as it just needs to reference something already on-heap, but Oak needs to copy and needs additional memory. Also many on-heap space goes to StringIndexer and other structures.

IngestionOakvsIncrIdx.pdf

Finally, this is single threaded. We see that we can give much more advantage in a multithreaded case, which we will describe shortly.

The command lines for reference:

java -Xmx15g -XX:MaxDirectMemorySize=0g -jar benchmarks/target/benchmarks.jar IndexIngestionBenchmark -p rowsPerSegment=3000000 -p indexType=onheap
java -Xmx9g -XX:MaxDirectMemorySize=6g -jar benchmarks/target/benchmarks.jar IndexIngestionBenchmark -p rowsPerSegment=3000000 -p indexType=oak

@b-slim
Copy link
Contributor

b-slim commented May 22, 2019

Exactly! This is the mostly important! And here we need your input! How does Druid manages Direct memory? Is there some policy? Documentation to read? Pointers to code?

I don't think there is bunch of docs but in nutshell Druid uses 3 kind of memory resources.

  1. Mapped memory for all the immutable data and it is managed by the OS
  2. DirectByte Buffers used for Decompression and Query Execution (Aggregate / merging) managed by common resource pool.
  3. OnHeap stuff used by the rest including what you are working on the realtime in memory DS managed by the JVM
    I recommend you take a look on how we use those buffers for query processing, it is all coming from io.druid.guice.DruidProcessingModule you can track this io.druid.guice.DruidProcessingModule#getMergeBufferPool

Not sure how you want to hook into this but maybe the Druid Processing module can be one of your block providers.
This doc might help as well-> http://druid.io/docs/latest/operations/performance-faq.html
As a side note:

  • Netty buffers are backed by byteBuffers, they just add some wrappers around the buffer but you still can get the reference to it thought.
  • You might wanna have a flag to turn off unsafe reads most of the Druid users i know they prefer to run 10% slower and not wanting to deal with JVM seg fault stuff.

Sorry for short answer! will try touch base.

@sanastas
Copy link
Author

Thank you for your input!

@sanastas
Copy link
Author

Hi Slim (@b-slim ),

We have taken your comment about Netty Allocator seriously and implemented and measured OakIncrementalIndex performance based on Netty. The comparison was running IncrementalIngestionBenchmark and inserting 3 million rows which is about 3.8GGB of off-heap data. We have tried giving different off-heap size limits, the result can be seen in the following table.

Oak Native Allocator vs Netty.pdf

To summarize, Netty Allocator requires up to twice memory and when its memory request is satisfied the latency is a bit slower. Looking closer on Netty implementation it is understandable, because Netty Allocator Pools hold buffers in different (exponentially increasing) sizes. This is reasonable for network protocol buffers but less reasonable for data storage.

Bottom line, based on empirical results we would like to continue with the primitive OakNativeAllocator which satisfies the Druid needs and gives better performance. The experiments with Netty Allocator will continue. Thank you for you valuable input!

@b-slim
Copy link
Contributor

b-slim commented Jun 20, 2019

@sanastas thanks for this update. Would be nice if you have time/resources to post more information about how those experiments are conducted and what kind of workload is used.

@sanastas
Copy link
Author

@b-slim , sure I can provide more details.

As I have mentioned we have run Druid's IncrementalIngestionBenchmark (incubator-druid/benchmarks/src/main/java/org/apache/druid/benchmark/indexing/IndexIngestionBenchmark.java) with the data/key generator used there. In order to be more sensitive to data allocation we have increased the number of rows to 3 million (rowsPerSegment=3000000).

Experiments compare OakIncrementalIndex implemented with two different memory allocators: OakNativeAllocator and NettyAllocator. Those are the only differences between two Oak implementations. Both allocators allocate from off-heap. The results are presented in the two columns compared in the table I pasted above. The rows in the table represent how many off-heap memory was given to each experiment, for example (-XX:MaxDirectMemorySize=4g). In each experiment 3.8GB are written off-heap.

What other details would you like to know?

@stale
Copy link

stale bot commented Aug 23, 2019

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Aug 23, 2019
@sanastas
Copy link
Author

Hey, we are going to update this PR soon

@stale
Copy link

stale bot commented Aug 26, 2019

This issue is no longer marked as stale.

@stale stale bot removed the stale label Aug 26, 2019
@stale
Copy link

stale bot commented Oct 25, 2019

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Oct 25, 2019
@stale
Copy link

stale bot commented Nov 22, 2019

This pull request/issue has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants