Conversation
Benchmarks
|
Add `lantern_hnsw.external_index_host` GUC. Add `lantern_hnsw.external_index_port` GUC. When the `external` parameter will be passed we connect to remote lantern daemon via tcp socket using the `external_index_host` and `external_index_port` GUC variables. First we send the index header to socket and codebook if needed. After getting ack response we start streaming tuples via socket to daemon which will create multithreaded usearch index. Then it will send back the `num_tuples_added` and `index_size` (which is the file size for index) and start streaming index file. We will collect index file into `result_buf` and use that to write the index into disk. For now we assume that the data sent will be in little endian byte ordering.
8f64a61 to
a6b6dca
Compare
Codecov ReportAttention: Patch coverage is
📢 Thoughts on this report? Let us know! |
eb58fbc to
a1efe8a
Compare
There was a problem hiding this comment.
PR Summary
- Added new fixtures and tests for external indexing via TCP socket in
/scripts/integration_tests.py - Introduced GUC variables
lantern_hnsw.external_index_hostandlantern_hnsw.external_index_portin/src/hnsw/options.c - Added
externalparameter for index creation in/src/hnsw/options.c - Updated
ldb_HnswOptionsstruct withexternalparameter and addedldb_HnswGetExternalfunction in/src/hnsw/options.h - Enhanced scalability and performance by enabling connection to remote Lantern daemon for multithreaded index creation
3 file(s) reviewed, 13 comment(s)
Edit PR Review Bot Settings
| import socket | ||
| import subprocess | ||
| import time |
There was a problem hiding this comment.
Info: Added imports for socket, subprocess, and time to support external indexing setup.
| # fixture to handle external index server setup | ||
| @pytest.fixture | ||
| def external_index(request): | ||
| cli_path = os.getenv("LANTERN_CLI_PATH") | ||
| if not cli_path: | ||
| pytest.skip("pass 'LANTERN_CLI_PATH' environment variable to run external indexing tests") | ||
| return | ||
|
|
||
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | ||
| if s.connect_ex(('127.0.0.1', 8998)) != 0: | ||
| subprocess.Popen([cli_path, "start-indexing-server", "--host", "127.0.0.1", "--port", "8998"], shell=False, | ||
| stdin=None, stdout=None, stderr=None, close_fds=True) | ||
| tries = 1 | ||
| time.sleep(5) | ||
|
|
There was a problem hiding this comment.
Info: Added fixture external_index to handle the setup of the external index server.
scripts/integration_tests.py
Outdated
| tries = 1 | ||
| time.sleep(5) | ||
|
|
||
| @pytest.mark.parametrize("distance_metric", ["l2sq", "cos"]) |
There was a problem hiding this comment.
Info: Added test test_external_index to create and validate an external index using the new external parameter.
| primary.execute("testdb", f"SELECT _lantern_internal.validate_index('{index_name}')") | ||
|
|
||
|
|
||
| @pytest.mark.external_index |
There was a problem hiding this comment.
Info: Added test test_external_index_pq to create and validate an external index with product quantization (pq) enabled.
| int ldb_external_index_port; | ||
| char *ldb_external_index_host; |
There was a problem hiding this comment.
Info: Added new GUC variables ldb_external_index_port and ldb_external_index_host for external indexing configuration.
| DefineCustomStringVariable("lantern_hnsw.external_index_host", | ||
| "Host for external indexing", | ||
| "Change this value if you run lantern daemon on remote host", | ||
| &ldb_external_index_host, | ||
| "127.0.0.1", | ||
| PGC_USERSET, | ||
| 0, | ||
| NULL, | ||
| NULL, | ||
| NULL); |
There was a problem hiding this comment.
Info: Defined lantern_hnsw.external_index_host GUC variable for specifying the host used for external indexing.
| int ef_construction; | ||
| int ef; | ||
| bool pq; | ||
| bool external; |
There was a problem hiding this comment.
Info: Added external parameter to support external indexing via TCP socket.
| int ldb_HnswGetEf(Relation index); | ||
| char* ldb_HnswGetIndexFilePath(Relation index); | ||
| bool ldb_HnswGetPq(Relation index); | ||
| bool ldb_HnswGetExternal(Relation index); |
There was a problem hiding this comment.
Info: Added getter function for the new external parameter.
| extern int ldb_external_index_port; | ||
| extern char* ldb_external_index_host; |
There was a problem hiding this comment.
Info: Introduced new GUC variables for external index host and port.
a1efe8a to
e2c459d
Compare
fix header size issue in external indexing
e2c459d to
c506a37
Compare
ci/scripts/build-linux.sh
Outdated
| function setup_rust() { | ||
| curl -k -o /tmp/rustup.sh https://sh.rustup.rs | ||
| chmod +x /tmp/rustup.sh | ||
| /tmp/rustup.sh -y |
There was a problem hiding this comment.
Do we need a specific version of rust or this is fine ?
| # note: when M is too low, the streaming search below might return fewer results since the bottom layer of hnsw is less connected | ||
| primary.execute( | ||
| "testdb", | ||
| f"CREATE INDEX {index_name} ON {table_name} USING lantern_hnsw (v {ops}) WITH (dim=128, M=10, quant_bits = {quant_bits}, external = true)", |
There was a problem hiding this comment.
Is the test set up such that this gets executed for various quant bits or does this only run once ?
There was a problem hiding this comment.
yes this gets executed for 8, 16, 32 quant bits and cos, l2sq distances
src/hnsw/build.c
Outdated
| { | ||
| if(bytes_written > 0) return; | ||
|
|
||
| shutdown(client_fd, SHUT_RDWR); |
There was a problem hiding this comment.
Why shutdown if you are closing right after?
Is this necessary ?
src/hnsw/build.c
Outdated
| client_fd, buildstate->pq_codebook, params->dimensions, params->num_centroids, params->num_subvectors); | ||
| } | ||
|
|
||
| uint32 buf_size = read(client_fd, &init_response, 1024); |
There was a problem hiding this comment.
Make 1024 a named constant here and at chat array definition
src/hnsw/build.c
Outdated
| usearch_reserve(buildstate->usearch_index, estimated_row_count, &error); | ||
|
|
||
| if(buildstate->external) { | ||
| assert(is_little_endian()); |
There was a problem hiding this comment.
Make this "if" and elog error on fail, so the check happens in release builds
src/hnsw/build.c
Outdated
| if(!buildstate->external) { | ||
| fstat(index_file_fd, &index_file_stat); | ||
| result_buf = mmap(NULL, index_file_stat.st_size, PROT_READ, MAP_PRIVATE, index_file_fd, 0); | ||
| assert(result_buf != MAP_FAILED); |
There was a problem hiding this comment.
Make this elog error
Q: is it safe to elog error from this context ?
src/hnsw/build.c
Outdated
| bytes_read = read(buildstate->external_client_fd, result_buf + total_received, BUFFER_SIZE); | ||
|
|
||
| // Check for CTRL-C interrupts | ||
| CHECK_FOR_INTERRUPTS(); |
…socket, close socket on interrupt
| fstat(index_file_fd, &index_file_stat); | ||
| result_buf = mmap(NULL, index_file_stat.st_size, PROT_READ, MAP_PRIVATE, index_file_fd, 0); | ||
| assert(result_buf != MAP_FAILED); | ||
| if(result_buf == MAP_FAILED) { |
There was a problem hiding this comment.
is this the only failure condition?
does mmap API guarantee that if this branch is not taken mmap succeeded?
| } | ||
| } | ||
|
|
||
| static int connect_with_timeout(int sockfd, const struct sockaddr *addr, socklen_t addrlen, int timeout) |
There was a problem hiding this comment.
Write a comment here so we do not forget what this is about. something like:
This function ensures we do not get stuck connecting to an unavailable endpoint.
It creates a non-blocking socket, then converts the socket to a blocking one after the connection is established
…result buffer in case of external indexing
add
externalparam to index creationadd
lantern_hnsw.external_index_hostGUC.add
lantern_hnsw.external_index_portGUC.When the
externalparameter will be passed we connect to remote lantern daemon via tcp socket using theexternal_index_hostandexternal_index_portGUC variables. First we send the index header to socket and codebook if needed. After getting ack response we start streaming tuples via socket to daemon which will create multithreaded usearch index. Then it will send back thenum_tuples_addedandindex_size(which is the file size for index) and start streaming index file. We will collect index file intoresult_bufand use that to write the index into disk. For now we assume that the data sent will be in little endian byte ordering.Corresponding PR in Lantern Extras PR
TODO