Conversation
|
Implementation is done. I am going to add unitests/integration test from now. |
lib/collection/Cargo.toml
Outdated
| # AWS | ||
| aws-config = { version = "1.1.7", features = ["behavior-version-latest"] } | ||
| aws-sdk-s3 = "1.24.0" | ||
| aws-smithy-types = "1.1.8" | ||
| aws-smithy-types-convert = { version = "0.60.8", features = ["convert-chrono"] } |
There was a problem hiding this comment.
I used the official AWS SDK for S3
There was a problem hiding this comment.
Any particular reason for that?
| #[derive(Clone, Deserialize, Debug, Default)] | ||
| pub struct SnapShotsConfig { | ||
| pub snapshots_storage: SnapshotsStorageConfig, | ||
| pub s3_config: Option<S3Config>, | ||
| } | ||
|
|
||
| #[derive(Clone, Debug, Default)] | ||
| pub enum SnapshotsStorageConfig { | ||
| #[default] | ||
| Local, | ||
| S3, | ||
| } | ||
|
|
||
| impl<'de> Deserialize<'de> for SnapshotsStorageConfig { | ||
| fn deserialize<D>(deserializer: D) -> Result<SnapshotsStorageConfig, D::Error> | ||
| where | ||
| D: serde::Deserializer<'de>, | ||
| { | ||
| let s: String = Deserialize::deserialize(deserializer)?; | ||
| match s.as_str() { | ||
| "local" => Ok(SnapshotsStorageConfig::Local), | ||
| "s3" => Ok(SnapshotsStorageConfig::S3), | ||
| _ => Err(serde::de::Error::custom( | ||
| "Invalid snapshots_storage. Use 'local' or 's3'", | ||
| )), | ||
| } | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
These are for deserialization of config yaml
| const CHUNK_SIZE: u64 = 1024 * 1024 * 5; | ||
| const MAX_CHUNKS: u64 = 10000; |
There was a problem hiding this comment.
I just followed official example as below.
This probably should be on config but i wasn't sure.
| key.map(|k| k.trim_start_matches("./").to_string()) | ||
| } | ||
|
|
||
| pub async fn multi_part_upload( |
There was a problem hiding this comment.
I used this official example code as the reference.
…test-for-s3-snapshots
|
A few things we would need to finalize this PR:
My suggestion is to stream files from either local or S3 as an In order to do that, we would need to implement something like
Please note that the main issue bounty should still be paid once this PR is merged |
|
@generall Nevermind, it was just an internal thing and we just need to call common |
…nload_snapshot function
Fix bugs, adding `flush()` after `write_all()`
|
Use Streaming instead of Downloading snapshots
| sleep 10 | ||
| ./tests/shard-snapshot-api.sh test-all | ||
|
|
||
| test-shard-snapshot-api-s3-minio: |
There was a problem hiding this comment.
Add shard snapshot API integration test for s3 version.
|
|
||
| pub struct SnapShotStreamLocalFS { | ||
| pub snapshot_path: PathBuf, | ||
| pub req: HttpRequest, | ||
| } | ||
| pub struct SnapShotStreamCloudStrage { | ||
| pub streamer: | ||
| std::pin::Pin<Box<dyn Stream<Item = Result<bytes::Bytes, object_store::Error>> + Send>>, | ||
| } | ||
|
|
||
| pub enum SnapshotStream { | ||
| LocalFS(SnapShotStreamLocalFS), | ||
| CloudStorage(SnapShotStreamCloudStrage), | ||
| } | ||
|
|
||
| impl Responder for SnapshotStream { | ||
| type Body = actix_web::body::BoxBody; | ||
|
|
||
| fn respond_to(self, _: &actix_web::HttpRequest) -> HttpResponse<Self::Body> { | ||
| match self { | ||
| SnapshotStream::LocalFS(stream) => match NamedFile::open(stream.snapshot_path) { | ||
| Ok(file) => file.into_response(&stream.req), | ||
| Err(e) => match e.kind() { | ||
| std::io::ErrorKind::NotFound => { | ||
| HttpResponse::NotFound().body(format!("File not found: {}", e)) | ||
| } | ||
| _ => HttpResponse::InternalServerError() | ||
| .body(format!("Failed to open file: {}", e)), | ||
| }, | ||
| }, | ||
|
|
||
| SnapshotStream::CloudStorage(stream) => HttpResponse::Ok() | ||
| .content_type("application/octet-stream") | ||
| .streaming(stream.streamer), | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Add SnspShotStream for downloading snapshot with streaming
|
@generall |
* Add SnapshotsStorageConfig enum(Local or S3) and deserialize implementation * [refactor] use snapshots_config instead of s3_config * update config * add AWS official`aws-sdk-s3` * implement store_file() WITHOUT error handling * implement list_snapshots * implement delete_snapshot * run `cargo +nightly fmt` * delete println * implement get_stored_file * Add error handlings * Refactor AWS S3 configuration and error handling * fix bugs * create an empty test file * fix `alias_test.rs` for StorageConfig type * tempolary delete some test and try s3 test * Update integration-tests.yml to use snap instead of apt-get for installing yq * Update integration-tests.yml to use sudo when installing yq * add sudo * make (full/non-full) snapshots downloadable * debug * small fix * Add S3 endpoint URL configuration option * fix * fix * debug * fix endpoint * update to http://127.0.0.1:9000/ * update * fix * fix `#[get("/collections/{collection}/shards/{shard}/snapshots/{snapshot}")]` for s3 * put original tests back * refactor * small fix (delete println & echo) * use object_store and refactor * create snapshot_storage_ops and implement * Refactor get_appropriate_chunk_size function to adjust chunk size based on service limits and file size * cargo +nightly fmt --all * make it more abstract * Refactor SnapshotsStorageConfig deserialization in SnapShotsConfig * small update * small fix * Update dependencies in Cargo.lock * Update minio image to satantime/minio-server * Refactor snapshot storage paths in snapshots_manager.rs and snapshot_storage_ops.rs * Fix issue with downloaded file size not matching expected size in download_snapshot function * add flush * Use Streaming instead of donloading once * apply `cargo +nightly fmt --all` * Fix issue with opening file in SnapshotStream::LocalFS variant * Fix error handling in SnapshotStream::LocalFS variant * Add integration test for Shard Snapshot API with S3 storage (#7)
Summary: Pull Request resolved: facebookresearch/faiss#4150 Creates a sharding convenience function for IVF indexes. - The __**centroids on the quantizer**__ are sharded based on the given sharding function. (not the data, as data sharding by ids is already implemented by copy_subuset_to, https://github.com/facebookresearch/faiss/blob/main/faiss/IndexIVF.h#L408) - The output is written to files based on the template filename generator param. - The default sharding function is simply the ith vector mod the total shard count. This would called by Laser here: https://www.internalfb.com/code/fbsource/[ce1f2e028e79]/fbcode/fblearner/flow/projects/laser/laser_sim_search/knn_trainer.py?lines=295-296. This convenience function will do the file writing, and return the created file names. There's a few key required changes in FAISS: 1. Allow `std::vector<std::string>` to be used. Updates swigfaiss.swig and array_conversions.py to accommodate. These have to be numpy dtype of `object` instead of the more correct `unicode`, because unicode dtype is fixed length. I couldn't figure out how to create a numpy array with each of the output file names where they have different dtypes. (Say the file names are like file1, file11, file111. The dtype would need to be U5, U6, U7 respectively, as the dtype for unicode contains the length). I tried structured arrays : this does not work either, as numpy makes it into a matrix instead: the `file1 file11 file111` example with explicit setting of U5, U6, U7 turns into `[[file1 file1 file1], [file1 file11 file11], [file1 file11 file111]]`, which we do not want. If someone knows the right syntax, please yell at me 2. Create Python callbacks for sharding and template filename: `PyCallbackFilenameTemplateGenerator` and `PyCallbackShardingFunction`. Users of this function would inherit from the FilenameTemplateGenerator or ShardingFunction in C++ to pass to `shard_ivf_index_centroids`. See the other examples in python_callbacks.cpp. This is required because Python functions cannot be passed through SWIG to C++ (i.e. no std::function or function pointers), so we have to use this approach. This approach allows it to be called from both C++ and Python. test_sharding.py shows the Python calling, test_utils.cpp shows the C++ calling. Reviewed By: asadoughi Differential Revision: D68534991 fbshipit-source-id: b857e20c6cc4249a2ab7792db4c93dd4fb8403fd
What I changed
I have added theaws-sdk-s3to the collection for managing operations on S3. I chose this SDK because it is the official one and the version 1.24.0 is well-established and stable. The implementation was based on the official sample code found at https://github.com/awslabs/aws-sdk-rust/tree/main/examples/examples/s3/src/bin.Removed aws_s3_rust and replaced it with object_store. This change allows the same abstract code to be used with S3, GCS, and Azure Storage. Currently, the implementation is only for S3, but it can be easily extended to support other services.
Next, I implemented the S3 functionalities in the existingsnapshots_manager.rs. The necessary functions usingaws-sdk-s3were implemented insnapshots_s3_ops.rsand called from there.Deleted the previously created snapshot_s3_ops.rs and introduced a more abstract snapshot_storage_ops.rs.
One challenging aspect, different from what was initially expected, involved operations like delete and download of snapshots. The process usedget_snapshot_pathfor path verification followed by delete and download actions, which invariably led to errors when the paths did not exist locally on S3.To address this, I implementedget_s3_snapshot_pathand used a match statement to handle different scenarios.The same changes have been made in four places: delete and download of snapshots, and delete and download of full snapshots. (Note: Additional changes were also necessary for downloading shards.)Addressed a review comment by removing S3-specific functions such as get_s3_snapshot_path and get_full_s3_snapshot_path. Replaced them with more abstract functions get_snapshot_path and get_full_snapshot_path located in snapshot_storage_ops.rs to enhance abstraction.
The use of object_store has resulted in a reduced build size.
Test Modifications
tests/snapshots/snapshots-recovery.shwas modified to allow switching betweenlocalands3based on the input arguments. For S3 tests, the config file is automatically modified using the yq command before execution.✅ I have verified that all tests pass on my forked repository.
resolve: #4109
/claim #4109
All Submissions:
devbranch. Did you create your branch fromdev?New Feature Submissions:
cargo +nightly fmt --allcommand prior to submission?cargo clippy --all --all-featurescommand?Changes to Core Features: