Skip to content

Concurrent hnsw graph and builder, take two#12421

Closed
jbellis wants to merge 21 commits intoapache:mainfrom
jbellis:concurrent4
Closed

Concurrent hnsw graph and builder, take two#12421
jbellis wants to merge 21 commits intoapache:mainfrom
jbellis:concurrent4

Conversation

@jbellis
Copy link

@jbellis jbellis commented Jul 6, 2023

Motivation

I need to support concurrent reads and writes to an HNSW index for Cassandra. One option would be to partition the document range and assign each range to a single thread with the existing implementation. However,

  • It is much faster to query a single on-heap hnsw index, than to query multiple such indexes and combine the result. (log(N) vs M log(N/M) where N is the number of documents and M is the number of partitions).
  • I need to ultimately write the combined index to disk, so even with some contention necessarily occurring during building of the index, we still come out way ahead in terms of total efficiency vs creating per-thread indexes and combining them, since combining the per-thread indexes boils down to "pick the largest and then add all the other nodes normally," you don't really benefit from having computed the others previously and you end up doing close to 2x the work.

Performance

Numbers are from my SIFT test harness at https://github.com/jbellis/hnswdemo/tree/lucene-bench. Numbers are averaged across 5 test runs on my i9-12900 (8 performance cores and 8 efficiency)

Serial construction of 1M nodes: 292.3s
Parallel construction, 1 thread: 379.0s = 129.7% of serial
Parallel construction, 2 threads: 191.3s = 50.5% of parallel/1
Parallel construction, 4 threads: 96.1s = 25.4% of parallel/1
Parallel construction, 8 threads: 52.6s = 13.8% of parallel/1

Serial queries of 100k vectors / top 100: 38.3s
Parallel queries, 1 thread: 41.6s = 1.09% of serial
Parallel queries, 2 threads: 21.0s = 50.5% of parallel/1
Parallel queries, 4 threads: 10.3s = 24.7% of parallel/1
Parallel queries, 8 threads: 5.3s = 12.7% of parallel/1

To summarize, there is about a 30% overhead during construction and 10% overhead at query time from using the concurrent class. The concurrent class parallelizes construction nearly perfectly through 4 threads, with some additional inefficiency becoming visible at 8. (Probably this is the effect of having to do more vector comparisons across the concurrently added nodes – I would expect this to remain relatively small and not exploding as thread counts increase.) Uncontended queries scale close to perfectly to at least 8 threads.

Design and implementation

ConcurrentOnHeapHnswGraph is very similar to OnHeapHnswGraph with concurrent backing Collections. The main addition is a getView operation to provide a threadsafe snapshot of the graph for searches. The View uses a CompletionTracker class to provide a kind of snapshot isolation – otherwise it is impossible to prevent partially added nodes from causing very difficult to debug race conditions. This is used during construction as well as for user-invoked searches.

(The initial CompletionTracker implementation was just an AtomicInteger clock and a Map<node Integer, clock Integer>, but the constant box/unbox introduced significant CPU and GC overhead. The current implementation is a result of optimizing that.)

ConcurrentHnswGraphBuilder adds an incremental ram used estimate as a return value to addGraphNode, and a buildAsync method that takes an ExecutorService for fine-grained control over parallelism. Otherwise, it follows the API of HnswGraphBuilder closely. (Closely enough that my original PR extracted a common interface and added factories so that they could be plugged in interchangeably, but this is now removed after Michael’s feedback.)

The key to achieving good concurrency while maintaining correctness without synchronization is, we track in-progress node additions across all threads in a ConcurrentSkipListSet. After adding ourselves in addGraphNode, we take a snapshot of this set (this is O(1) for CSLS), and consider all other in-progress updates as neighbor candidates (subject to normal level constraints).

In general, the main concern with the Concurrent Builder compared to the serial is to make sure that partially complete operations never “leak” to other threads. In particular,

  • Neighbor manipulation has been encapsulated in ConcurrentNeighborSet. CNS implements a copy-on-write NeighborArray – my initial implementation used a ConcurrentSkipListSet but this was significantly slower since even during construction, there are many more “iterate through the neighbors” operations than “change the neighbors.” We have to subclass NeighborArray here to be able to efficiently handle the case of concurrently inserting the same node (as a forward-link and a back-link).
  • Entry point updating is not done until the new node has been fully added.

One more point is a little subtle –

  • When a new node is added, it considers both existing nodes in the graph as candidates, as well as other nodes concurrently in the process of being added. It does this by taking a snapshot of the "insertionsInProgress" set when the node begins adding itself, and using both the snapshot and the current graph state as candidate sources. This allows the graph to remain connected even when many nodes are added concurrently.
  • The subtle part is that we compute diversity separately for the fully-added and the in-progress candidates. This is because there is an asymmetry between initial links (you need to be diverse or you’re discarded) and backlinks added later (diversity is not enforced again until you bump up against max connections). Treating them separately allows us to get very similar results to what you would get adding each node serially. See the discussion in addGraphNode about over-pruning for an example.
  • I think we could simplify this linking of new nodes to consider fully-added and in-progress candidates as a group instead of separately, if we implemented the paper’s “keep pruned connections” suggestion. I have experimented with this, and I think the recall:memory used benefits are good, but it deserves a followup ticket.

The main graph test suites are identical except for changes to perform concurrent graph builds instead of serial, and the addition of testing the incremental ram usage estimate from ConcurrentHnswGraphBuilder::addGraphNode. I also added new tests for ConcurrentNeighborSet.

Minor changes to existing code:

  • HnswGraphSearcher gets a new searchConcurrent method that uses a GrowableBitSet, since the size of the graph may change during the search. For the same reason, removed "assert friendOrd < size" from the search methods.
  • Moved JavaUtilBitSet out of BaseBitSetTestCase and renamed to GrowableBitSet.
  • Several NeighborArray fields move to protected since we're subclassing it now.
  • Updates OnHeapHnswGraph ramBytesUsed for the TreeMap -> HashMap change made in 3c16374, although apparently it’s close enough either way to mostly not matter.
  • Added --add-opens in build.gradle for tests to allow RamUsageEstimator to compute exact values when checking correctness of my estimates.
  • Added HnswGraph.addNode (with default unsupportedoperation) to document the shared expectations in one place.

@mayya-sharipova
Copy link
Contributor

Great work!
Have you compared the recall of the parallel graph with the serially built graph (for example using ann-benchmarks)?

Copy link
Member

@benwtrent benwtrent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very high level so far. But it seems very very complicated to me.

We should always expect an executor from the caller.

We shouldn't do one node a thread but probably batch no? Especially since random node access isn't thread safe.

I understand how this could be useful. Especially if we ever do multi-threaded merges.

It would also be good to have recall numbers to ensure correctness.

@jbellis
Copy link
Author

jbellis commented Jul 22, 2023 via email

@jbellis
Copy link
Author

jbellis commented Jul 25, 2023

While making that last commit (everything still passes, after the preceeding fixes) I noticed that AbstractMockVectorValues returns null for out-of-bounds ordinals while most of the other implementations throw exceptions. (Different exceptions, depending on the implementation.) Should we standardize this?

@JeremiahDJordan
Copy link

Any progress on reviewing this? We want to be able to use a concurrent implementation in Apache Cassandra and would prefer not to have to run code off a fork to do it.

Copy link
Member

@benwtrent benwtrent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't even really get to touch on the ConcurrentOnHeapHnswGraph class yet. The views & completion tracker seem trappy and need some careful reading.

Comment on lines +106 to 109
graphUpperLevels.add(
new HashMap<>(
16, levelLoadFactor)); // these are the default parameters, made explicit
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't specify the loadfactor. That doesn't make sense to me.

Also, with 16 really this is only pre-allocating up to 12 values and then it will grow again, I am not sure if that is your purpose or not.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We specify the loadFactor because it's necessary to compute ram usage, and you can't retrieve the loadFactor once the Map is constructed.

The 16 is there because before it was just saying new HashMap() and I can't specify the loadFactor w/o also specifying the initialCapacity. As the comment indicates, 16 is the default initialCapacity.

Comment on lines +371 to +390
// Consider the following graph with "circular" test vectors:
//
// 0 -> 1
// 1 <- 0
// At this point we insert nodes 2 and 3 concurrently, denoted T1 and T2 for threads 1 and 2
// T1 T2
// insert 2 to L1 [2 is marked "in progress"]
// insert 3 to L1
// 3 considers as neighbors 0, 1, 2; 0 and 1 are not diverse wrt 2
// 3 -> 2 is added to graph
// 3 is marked entry node
// 2 follows 3 to L0, where 3 only has 2 as a neighbor
// 2 -> 3 is added to graph
// all further nodes will only be added to the 2/3 subgraph; 0/1 are partitioned forever
//
// Considering concurrent inserts separately from natural candidates solves this problem;
// both 1 and 2 will be added as neighbors to 3, avoiding the partition, and 2 will then
// pick up the connection to 1 that it's supposed to have as well.
addForwardLinks(level, node, candidates); // natural candidates
addForwardLinks(level, node, inProgressBefore, progressMarker); // concurrent candidates
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "overpruning" seems like it is controlled by beam-width and the furtherest candidate. Why do we ignore the furthest candidate here? It seems like we shouldn't even bother with forward links with the in progress candidates if none of them are closer than the currently available one. Because, if they were already part of the graph, they would likely be ignored.

Why aren't beam width and ignoring candidates further than the current furthest one not adequate?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to clarify -- this is about graph connectedness. all graph ANN algorithms will break (in the sense of returning terrible results) if the graph isn't connected, because there's literally no way to get from partition A of the graph to partition B. (I believe that hnsw construction doesn't guarantee connectedness but it makes it a very highly likely outcome.)

the problem is that when you consider in-progress candidates together with the natural candidates you end up with a much lower level of connectivity, because you potentially get to see high-diversity neighbors "from the future" that stop the natural neighbors from being added. in other words the diversity check will remove neighbors that would have been present in the graph if the nodes had been added serially, which in the worst case as illustrated in this example can actually result in a partitioned graph.

(beam width is about "are we searching far enough to find good candidates," so it has no special relevance to the concurrency issue here.)

private void insertMultiple(NeighborArray others, BitSet selected) {
neighborsRef.getAndUpdate(
current -> {
ConcurrentNeighborArray next = current.copy();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are doing so many copies. The other neighbor array is sorted. Why can't we do a merge over the selected to reduce our copies?

Or is "others" always only 1 or 2 values?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that would be a reasonable optimization, but this hasn't shown up as a hot spot in my profiling

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looked at another profile this morning. 99.75% of insertMultiple is score comparisons, for vectors of dimension 256.

@benwtrent
Copy link
Member

Haven't forgotten about this. Just been bogged down with other things. Hope to revisit again soon!

@jbellis jbellis closed this Oct 2, 2023
@jbellis
Copy link
Author

jbellis commented Oct 2, 2023

Thanks for the feedback. I've switched my efforts to a DiskANN implementation in JVector, so closing this out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants