Skip to content

Implement S3 snapshot manager#4150

Merged
generall merged 59 commits intoqdrant:devfrom
kemkemG0:feature/make-s3-available-for-snapshots
May 10, 2024
Merged

Implement S3 snapshot manager#4150
generall merged 59 commits intoqdrant:devfrom
kemkemG0:feature/make-s3-available-for-snapshots

Conversation

@kemkemG0
Copy link
Contributor

@kemkemG0 kemkemG0 commented May 1, 2024

What I changed

  • I have added the aws-sdk-s3 to the collection for managing operations on S3. I chose this SDK because it is the official one and the version 1.24.0 is well-established and stable. The implementation was based on the official sample code found at https://github.com/awslabs/aws-sdk-rust/tree/main/examples/examples/s3/src/bin.

  • Removed aws_s3_rust and replaced it with object_store. This change allows the same abstract code to be used with S3, GCS, and Azure Storage. Currently, the implementation is only for S3, but it can be easily extended to support other services.

  • Next, I implemented the S3 functionalities in the existing snapshots_manager.rs. The necessary functions using aws-sdk-s3 were implemented in snapshots_s3_ops.rs and called from there.

  • Deleted the previously created snapshot_s3_ops.rs and introduced a more abstract snapshot_storage_ops.rs.

  • One challenging aspect, different from what was initially expected, involved operations like delete and download of snapshots. The process used get_snapshot_path for path verification followed by delete and download actions, which invariably led to errors when the paths did not exist locally on S3.

  • To address this, I implemented get_s3_snapshot_path and used a match statement to handle different scenarios.

  • The same changes have been made in four places: delete and download of snapshots, and delete and download of full snapshots. (Note: Additional changes were also necessary for downloading shards.)

  • Addressed a review comment by removing S3-specific functions such as get_s3_snapshot_path and get_full_s3_snapshot_path. Replaced them with more abstract functions get_snapshot_path and get_full_snapshot_path located in snapshot_storage_ops.rs to enhance abstraction.

  • The use of object_store has resulted in a reduced build size.

Test Modifications

  • The test script tests/snapshots/snapshots-recovery.sh was modified to allow switching between local and s3 based on the input arguments. For S3 tests, the config file is automatically modified using the yq command before execution.

✅ I have verified that all tests pass on my forked repository.

resolve: #4109

/claim #4109

All Submissions:

  • Contributions should target the dev branch. Did you create your branch from dev?
  • Have you followed the guidelines in our Contributing document?
  • Have you checked to ensure there aren't other open Pull Requests for the same update/change?

New Feature Submissions:

  1. Does your submission pass tests?
  2. Have you formatted your code locally using cargo +nightly fmt --all command prior to submission?
  3. Have you checked your code using cargo clippy --all --all-features command?

Changes to Core Features:

  • Have you added an explanation of what your changes do and why you'd like us to include them?
  • Have you written new tests for your core changes, as applicable?
  • Have you successfully ran tests with your changes locally?

@kemkemG0 kemkemG0 marked this pull request as ready for review May 1, 2024 06:12
@kemkemG0 kemkemG0 changed the title Implement S3 snapshot manager [WIP] Implement S3 snapshot manager May 1, 2024
@kemkemG0
Copy link
Contributor Author

kemkemG0 commented May 1, 2024

Implementation is done.

I am going to add unitests/integration test from now.

Comment on lines +78 to +82
# AWS
aws-config = { version = "1.1.7", features = ["behavior-version-latest"] }
aws-sdk-s3 = "1.24.0"
aws-smithy-types = "1.1.8"
aws-smithy-types-convert = { version = "0.60.8", features = ["convert-chrono"] }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the official AWS SDK for S3

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason for that?

Comment on lines +16 to +44
#[derive(Clone, Deserialize, Debug, Default)]
pub struct SnapShotsConfig {
pub snapshots_storage: SnapshotsStorageConfig,
pub s3_config: Option<S3Config>,
}

#[derive(Clone, Debug, Default)]
pub enum SnapshotsStorageConfig {
#[default]
Local,
S3,
}

impl<'de> Deserialize<'de> for SnapshotsStorageConfig {
fn deserialize<D>(deserializer: D) -> Result<SnapshotsStorageConfig, D::Error>
where
D: serde::Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
match s.as_str() {
"local" => Ok(SnapshotsStorageConfig::Local),
"s3" => Ok(SnapshotsStorageConfig::S3),
_ => Err(serde::de::Error::custom(
"Invalid snapshots_storage. Use 'local' or 's3'",
)),
}
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are for deserialization of config yaml

Comment on lines +32 to +33
const CHUNK_SIZE: u64 = 1024 * 1024 * 5;
const MAX_CHUNKS: u64 = 10000;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key.map(|k| k.trim_start_matches("./").to_string())
}

pub async fn multi_part_upload(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@generall
Copy link
Member

generall commented May 7, 2024

A few things we would need to finalize this PR:

  • In the do_get_snapshot and similar functions we download the snapshot from s3 and only after it we respond to te user. But this might not work so good with large files, where user would need to wait indefinitely without a single response byte. Additionally it will left temporary files behind which are never cleaned.

My suggestion is to stream files from either local or S3 as an

HttpResponse::Ok().content_type("application/octet-stream").streaming(stream)

In order to do that, we would need to implement something like

pub struct SnapshotStreamer {
    snapshot_manager: SnapshotStorageManager,
    snapshot_path: PathBuf,
}

impl Stream for SnapshotStreamer {
    type Item = Result<bytes::Bytes, CollectionError>;

NamedFile from actix does a lot of nice things, maybe we could either keep it for local files, or re-implement in the octet-stream.

  • make sure tests work

Please note that the main issue bounty should still be paid once this PR is merged

@kemkemG0 kemkemG0 changed the title Implement S3 snapshot manager [WIP] Implement S3 snapshot manager May 9, 2024
@kemkemG0
Copy link
Contributor Author

kemkemG0 commented May 10, 2024

@generall
I also noticed that, shard snapshots had the separated implementations and need to add create_shard_snapshot, recover_shard_snapshot, create_shard_snapshot and delete_shard_snapshot under snapshot_storage_manager.
These looks complicated and may take some time.

Nevermind, it was just an internal thing and we just need to call common create_store after these are done.

@kemkemG0
Copy link
Contributor Author

kemkemG0 commented May 10, 2024

  • Fixed bugs and tests (should) passe
  • Implement downloading with Stream
  • Add Integration test for Shard Snapshot API with S3 storage

@kemkemG0 kemkemG0 changed the title [WIP] Implement S3 snapshot manager Implement S3 snapshot manager May 10, 2024
sleep 10
./tests/shard-snapshot-api.sh test-all

test-shard-snapshot-api-s3-minio:
Copy link
Contributor Author

@kemkemG0 kemkemG0 May 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add shard snapshot API integration test for s3 version.

Comment on lines +6 to +42

pub struct SnapShotStreamLocalFS {
pub snapshot_path: PathBuf,
pub req: HttpRequest,
}
pub struct SnapShotStreamCloudStrage {
pub streamer:
std::pin::Pin<Box<dyn Stream<Item = Result<bytes::Bytes, object_store::Error>> + Send>>,
}

pub enum SnapshotStream {
LocalFS(SnapShotStreamLocalFS),
CloudStorage(SnapShotStreamCloudStrage),
}

impl Responder for SnapshotStream {
type Body = actix_web::body::BoxBody;

fn respond_to(self, _: &actix_web::HttpRequest) -> HttpResponse<Self::Body> {
match self {
SnapshotStream::LocalFS(stream) => match NamedFile::open(stream.snapshot_path) {
Ok(file) => file.into_response(&stream.req),
Err(e) => match e.kind() {
std::io::ErrorKind::NotFound => {
HttpResponse::NotFound().body(format!("File not found: {}", e))
}
_ => HttpResponse::InternalServerError()
.body(format!("Failed to open file: {}", e)),
},
},

SnapshotStream::CloudStorage(stream) => HttpResponse::Ok()
.content_type("application/octet-stream")
.streaming(stream.streamer),
}
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add SnspShotStream for downloading snapshot with streaming

@kemkemG0
Copy link
Contributor Author

@generall
I fixed them up and added extra integration test !

@generall generall merged commit 0d46aeb into qdrant:dev May 10, 2024
generall pushed a commit that referenced this pull request May 26, 2024
* Add SnapshotsStorageConfig enum(Local or S3) and deserialize implementation

* [refactor]  use snapshots_config instead of s3_config

* update config

* add AWS official`aws-sdk-s3`

* implement store_file() WITHOUT error handling

* implement list_snapshots

* implement delete_snapshot

* run `cargo +nightly fmt`

* delete println

* implement get_stored_file

* Add error handlings

* Refactor AWS S3 configuration and error handling

* fix bugs

* create an empty test file

* fix `alias_test.rs` for StorageConfig type

* tempolary delete some test and try s3 test

* Update integration-tests.yml to use snap instead of apt-get for installing yq

* Update integration-tests.yml to use sudo when installing yq

* add sudo

* make (full/non-full) snapshots downloadable

* debug

* small fix

* Add S3 endpoint URL configuration option

* fix

* fix

* debug

* fix endpoint

* update to http://127.0.0.1:9000/

* update

* fix

* fix `#[get("/collections/{collection}/shards/{shard}/snapshots/{snapshot}")]` for s3

* put original tests back

* refactor

* small fix (delete println & echo)

* use object_store and refactor

* create snapshot_storage_ops and implement

* Refactor get_appropriate_chunk_size function to adjust chunk size based on service limits and file size

* cargo +nightly fmt --all

* make it more abstract

* Refactor SnapshotsStorageConfig deserialization in SnapShotsConfig

* small update

* small fix

* Update dependencies in Cargo.lock

* Update minio image to satantime/minio-server

* Refactor snapshot storage paths in snapshots_manager.rs and snapshot_storage_ops.rs

* Fix issue with downloaded file size not matching expected size in download_snapshot function

* add flush

* Use Streaming instead of donloading once

* apply `cargo +nightly fmt --all`

* Fix issue with opening file in SnapshotStream::LocalFS variant

* Fix error handling in SnapshotStream::LocalFS variant

* Add integration test for Shard Snapshot API with S3 storage (#7)
@ghost ghost mentioned this pull request Nov 7, 2024
HighBestCoder pushed a commit to HighBestCoder/qdrant that referenced this pull request Dec 14, 2025
Summary:
Pull Request resolved: facebookresearch/faiss#4150

Creates a sharding convenience function for IVF indexes.
- The __**centroids on the quantizer**__ are sharded based on the given sharding function. (not the data, as data sharding by ids is already implemented by copy_subuset_to, https://github.com/facebookresearch/faiss/blob/main/faiss/IndexIVF.h#L408)
- The output is written to files based on the template filename generator param.
- The default sharding function is simply the ith vector mod the total shard count.

This would called by Laser here: https://www.internalfb.com/code/fbsource/[ce1f2e028e79]/fbcode/fblearner/flow/projects/laser/laser_sim_search/knn_trainer.py?lines=295-296. This convenience function will do the file writing, and return the created file names.

There's a few key required changes in FAISS:
1. Allow `std::vector<std::string>` to be used. Updates swigfaiss.swig and array_conversions.py to accommodate. These have to be numpy dtype of `object` instead of the more correct `unicode`, because unicode dtype is fixed length. I couldn't figure out how to create a numpy array with each of the output file names where they have different dtypes. (Say the file names are like file1, file11, file111. The dtype would need to be U5, U6, U7 respectively, as the dtype for unicode contains the length). I tried structured arrays : this does not work either, as numpy makes it into a matrix instead: the `file1 file11 file111` example with explicit setting of U5, U6, U7 turns into `[[file1 file1 file1], [file1 file11 file11], [file1 file11 file111]]`, which we do not want. If someone knows the right syntax, please yell at me
2. Create Python callbacks for sharding and template filename: `PyCallbackFilenameTemplateGenerator` and `PyCallbackShardingFunction`. Users of this function would inherit from the FilenameTemplateGenerator or ShardingFunction in C++ to pass to `shard_ivf_index_centroids`. See the other examples in python_callbacks.cpp. This is required because Python functions cannot be passed through SWIG to C++ (i.e. no std::function or function pointers), so we have to use this approach. This approach allows it to be called from both C++ and Python. test_sharding.py shows the Python calling, test_utils.cpp shows the C++ calling.

Reviewed By: asadoughi

Differential Revision: D68534991

fbshipit-source-id: b857e20c6cc4249a2ab7792db4c93dd4fb8403fd
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants