Skip to content

External indexing via tcp socket#324

Merged
var77 merged 8 commits intomainfrom
varik/streaming-external-indexing
Jul 25, 2024
Merged

External indexing via tcp socket#324
var77 merged 8 commits intomainfrom
varik/streaming-external-indexing

Conversation

@var77
Copy link
Copy Markdown
Collaborator

@var77 var77 commented Jul 16, 2024

add external param to index creation
add lantern_hnsw.external_index_host GUC.
add lantern_hnsw.external_index_port GUC.

When the external parameter will be passed we connect to remote lantern daemon via tcp socket using the external_index_host and external_index_port GUC variables. First we send the index header to socket and codebook if needed. After getting ack response we start streaming tuples via socket to daemon which will create multithreaded usearch index. Then it will send back the num_tuples_added and index_size (which is the file size for index) and start streaming index file. We will collect index file into result_buf and use that to write the index into disk. For now we assume that the data sent will be in little endian byte ordering.

Corresponding PR in Lantern Extras PR

TODO

  • add tests

@github-actions
Copy link
Copy Markdown

github-actions bot commented Jul 16, 2024

Benchmarks

metric old new pct change
recall (after create) 0.954 0.952 -0.21%
recall (after insert) 0.000 0.000 -
select tps 12226.184 12513.687 +2.35%
select bulk(100) tps 32.354 36.207 +11.91%
select latency (ms) 0.756 ± 2.277𝜎 0.763 ± 1.962𝜎 +0.93%
select bulk(100) latency (ms) 922.465 ± 149.265𝜎 857.631 ± 113.802𝜎 -7.03%
create latency (ms) 396700.063 394004.141 -0.68%
insert tps 455.562 450.699 -1.07%
insert bulk(100) tps 4.873 4.688 -3.79%
insert latency (ms) 69.561 ± 16.001𝜎 70.363 ± 16.073𝜎 +1.15%
insert bulk(100) latency (ms) 6443.522 ± 116.431𝜎 6682.611 ± 187.341𝜎 +3.71%
disk usage (bytes) 8192008192.000 8192008192.000 -

Add `lantern_hnsw.external_index_host` GUC.
Add `lantern_hnsw.external_index_port` GUC.

When the `external` parameter will be passed we connect to remote lantern daemon  via tcp socket using the `external_index_host` and `external_index_port` GUC variables.
First we send the index header to socket and codebook if needed. After getting ack response we start streaming tuples via socket to daemon which will create multithreaded usearch index. Then it will send back the `num_tuples_added` and `index_size` (which is the file size for index) and start streaming index file.
We will collect index file into `result_buf` and use that to write the index into disk.
For now we assume that the data sent will be in little endian byte ordering.
@var77 var77 force-pushed the varik/streaming-external-indexing branch from 8f64a61 to a6b6dca Compare July 16, 2024 09:23
@codecov
Copy link
Copy Markdown

codecov bot commented Jul 16, 2024

Codecov Report

Attention: Patch coverage is 11.53846% with 161 lines in your changes missing coverage. Please review.

Files Patch % Lines
src/hnsw/external_index_socket.c 0.00% 144 Missing ⚠️
src/hnsw/build.c 39.28% 5 Missing and 12 partials ⚠️

📢 Thoughts on this report? Let us know!

@var77 var77 force-pushed the varik/streaming-external-indexing branch from eb58fbc to a1efe8a Compare July 17, 2024 14:49
@var77 var77 marked this pull request as ready for review July 17, 2024 14:51
@var77 var77 requested a review from Ngalstyan4 July 17, 2024 14:51
Copy link
Copy Markdown

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Summary

  • Added new fixtures and tests for external indexing via TCP socket in /scripts/integration_tests.py
  • Introduced GUC variables lantern_hnsw.external_index_host and lantern_hnsw.external_index_port in /src/hnsw/options.c
  • Added external parameter for index creation in /src/hnsw/options.c
  • Updated ldb_HnswOptions struct with external parameter and added ldb_HnswGetExternal function in /src/hnsw/options.h
  • Enhanced scalability and performance by enabling connection to remote Lantern daemon for multithreaded index creation

3 file(s) reviewed, 13 comment(s)
Edit PR Review Bot Settings

Comment on lines +7 to +9
import socket
import subprocess
import time
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Info: Added imports for socket, subprocess, and time to support external indexing setup.

Comment on lines +651 to +665
# fixture to handle external index server setup
@pytest.fixture
def external_index(request):
cli_path = os.getenv("LANTERN_CLI_PATH")
if not cli_path:
pytest.skip("pass 'LANTERN_CLI_PATH' environment variable to run external indexing tests")
return

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
if s.connect_ex(('127.0.0.1', 8998)) != 0:
subprocess.Popen([cli_path, "start-indexing-server", "--host", "127.0.0.1", "--port", "8998"], shell=False,
stdin=None, stdout=None, stderr=None, close_fds=True)
tries = 1
time.sleep(5)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Info: Added fixture external_index to handle the setup of the external index server.

tries = 1
time.sleep(5)

@pytest.mark.parametrize("distance_metric", ["l2sq", "cos"])
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Info: Added test test_external_index to create and validate an external index using the new external parameter.

primary.execute("testdb", f"SELECT _lantern_internal.validate_index('{index_name}')")


@pytest.mark.external_index
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Info: Added test test_external_index_pq to create and validate an external index with product quantization (pq) enabled.

Comment on lines +53 to +54
int ldb_external_index_port;
char *ldb_external_index_host;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Info: Added new GUC variables ldb_external_index_port and ldb_external_index_host for external indexing configuration.

Comment on lines +392 to +401
DefineCustomStringVariable("lantern_hnsw.external_index_host",
"Host for external indexing",
"Change this value if you run lantern daemon on remote host",
&ldb_external_index_host,
"127.0.0.1",
PGC_USERSET,
0,
NULL,
NULL,
NULL);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Info: Defined lantern_hnsw.external_index_host GUC variable for specifying the host used for external indexing.

int ef_construction;
int ef;
bool pq;
bool external;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Info: Added external parameter to support external indexing via TCP socket.

int ldb_HnswGetEf(Relation index);
char* ldb_HnswGetIndexFilePath(Relation index);
bool ldb_HnswGetPq(Relation index);
bool ldb_HnswGetExternal(Relation index);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Info: Added getter function for the new external parameter.

Comment on lines +82 to +83
extern int ldb_external_index_port;
extern char* ldb_external_index_host;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Info: Introduced new GUC variables for external index host and port.

@var77 var77 force-pushed the varik/streaming-external-indexing branch from a1efe8a to e2c459d Compare July 17, 2024 14:57
fix header size issue in external indexing
@var77 var77 force-pushed the varik/streaming-external-indexing branch from e2c459d to c506a37 Compare July 17, 2024 15:07
function setup_rust() {
curl -k -o /tmp/rustup.sh https://sh.rustup.rs
chmod +x /tmp/rustup.sh
/tmp/rustup.sh -y
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a specific version of rust or this is fine ?

# note: when M is too low, the streaming search below might return fewer results since the bottom layer of hnsw is less connected
primary.execute(
"testdb",
f"CREATE INDEX {index_name} ON {table_name} USING lantern_hnsw (v {ops}) WITH (dim=128, M=10, quant_bits = {quant_bits}, external = true)",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the test set up such that this gets executed for various quant bits or does this only run once ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this gets executed for 8, 16, 32 quant bits and cos, l2sq distances

src/hnsw/build.c Outdated
{
if(bytes_written > 0) return;

shutdown(client_fd, SHUT_RDWR);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why shutdown if you are closing right after?
Is this necessary ?

src/hnsw/build.c Outdated
client_fd, buildstate->pq_codebook, params->dimensions, params->num_centroids, params->num_subvectors);
}

uint32 buf_size = read(client_fd, &init_response, 1024);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make 1024 a named constant here and at chat array definition

src/hnsw/build.c Outdated
usearch_reserve(buildstate->usearch_index, estimated_row_count, &error);

if(buildstate->external) {
assert(is_little_endian());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this "if" and elog error on fail, so the check happens in release builds

src/hnsw/build.c Outdated
if(!buildstate->external) {
fstat(index_file_fd, &index_file_stat);
result_buf = mmap(NULL, index_file_stat.st_size, PROT_READ, MAP_PRIVATE, index_file_fd, 0);
assert(result_buf != MAP_FAILED);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this elog error
Q: is it safe to elog error from this context ?

src/hnsw/build.c Outdated
bytes_read = read(buildstate->external_client_fd, result_buf + total_received, BUFFER_SIZE);

// Check for CTRL-C interrupts
CHECK_FOR_INTERRUPTS();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try or test this

@var77 var77 requested a review from Ngalstyan4 July 23, 2024 15:09
fstat(index_file_fd, &index_file_stat);
result_buf = mmap(NULL, index_file_stat.st_size, PROT_READ, MAP_PRIVATE, index_file_fd, 0);
assert(result_buf != MAP_FAILED);
if(result_buf == MAP_FAILED) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the only failure condition?
does mmap API guarantee that if this branch is not taken mmap succeeded?

}
}

static int connect_with_timeout(int sockfd, const struct sockaddr *addr, socklen_t addrlen, int timeout)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Write a comment here so we do not forget what this is about. something like:

This function ensures we do not get stuck connecting to an unavailable endpoint.
It creates a non-blocking socket, then converts the socket to a blocking one after the connection is established

@var77 var77 merged commit 84cb0f6 into main Jul 25, 2024
@var77 var77 deleted the varik/streaming-external-indexing branch July 25, 2024 11:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants