Skip to content

Commit 6cec7b4

Browse files
authored
feat(adapter/kv): support async iterating on scan results (#5208)
1 parent a2149b2 commit 6cec7b4

26 files changed

Lines changed: 289 additions & 40 deletions

File tree

core/Cargo.lock

Lines changed: 45 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ services-s3 = [
194194
services-seafile = []
195195
services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:bb8"]
196196
services-sled = ["dep:sled", "internal-tokio-rt"]
197-
services-sqlite = ["dep:sqlx", "sqlx?/sqlite"]
197+
services-sqlite = ["dep:sqlx", "sqlx?/sqlite", "dep:ouroboros"]
198198
services-supabase = []
199199
services-surrealdb = ["dep:surrealdb"]
200200
services-swift = []
@@ -277,6 +277,9 @@ sqlx = { version = "0.8.0", features = [
277277
# For http based services.
278278
reqsign = { version = "0.16.1", default-features = false, optional = true }
279279

280+
# for self-referencing structs
281+
ouroboros = { version = "0.18.4", optional = true }
282+
280283
# for services-atomic-server
281284
atomic_lib = { version = "0.39.0", optional = true }
282285
# for services-cacache

core/src/raw/adapters/kv/api.rs

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use std::fmt::Debug;
1919
use std::future::ready;
20+
use std::ops::DerefMut;
2021

2122
use futures::Future;
2223

@@ -25,10 +26,76 @@ use crate::Capability;
2526
use crate::Scheme;
2627
use crate::*;
2728

29+
/// Scan is the async iterator returned by `Adapter::scan`.
30+
pub trait Scan: Send + Sync + Unpin {
31+
/// Fetch the next key in the current key prefix
32+
///
33+
/// `Ok(None)` means no further key will be returned
34+
fn next(&mut self) -> impl Future<Output = Result<Option<String>>> + MaybeSend;
35+
}
36+
37+
/// A noop implementation of Scan
38+
impl Scan for () {
39+
async fn next(&mut self) -> Result<Option<String>> {
40+
Ok(None)
41+
}
42+
}
43+
44+
/// A Scan implementation for all trivial non-async iterators
45+
pub struct ScanStdIter<I>(I);
46+
47+
#[cfg(any(
48+
feature = "services-cloudflare-kv",
49+
feature = "services-etcd",
50+
feature = "services-nebula-graph",
51+
feature = "services-rocksdb",
52+
feature = "services-sled"
53+
))]
54+
impl<I> ScanStdIter<I>
55+
where
56+
I: Iterator<Item = Result<String>> + Unpin + Send + Sync,
57+
{
58+
/// Create a new ScanStdIter from an Iterator
59+
pub(crate) fn new(inner: I) -> Self {
60+
Self(inner)
61+
}
62+
}
63+
64+
impl<I> Scan for ScanStdIter<I>
65+
where
66+
I: Iterator<Item = Result<String>> + Unpin + Send + Sync,
67+
{
68+
async fn next(&mut self) -> Result<Option<String>> {
69+
self.0.next().transpose()
70+
}
71+
}
72+
73+
/// A type-erased wrapper of Scan
74+
pub type Scanner = Box<dyn ScanDyn>;
75+
76+
pub trait ScanDyn: Unpin + Send + Sync {
77+
fn next_dyn(&mut self) -> BoxedFuture<Result<Option<String>>>;
78+
}
79+
80+
impl<T: Scan + ?Sized> ScanDyn for T {
81+
fn next_dyn(&mut self) -> BoxedFuture<Result<Option<String>>> {
82+
Box::pin(self.next())
83+
}
84+
}
85+
86+
impl<T: ScanDyn + ?Sized> Scan for Box<T> {
87+
async fn next(&mut self) -> Result<Option<String>> {
88+
self.deref_mut().next_dyn().await
89+
}
90+
}
91+
2892
/// KvAdapter is the adapter to underlying kv services.
2993
///
3094
/// By implement this trait, any kv service can work as an OpenDAL Service.
3195
pub trait Adapter: Send + Sync + Debug + Unpin + 'static {
96+
/// TODO: use default associate type `= ()` after stablized
97+
type Scanner: Scan;
98+
3299
/// Return the metadata of this key value accessor.
33100
fn metadata(&self) -> Metadata;
34101

@@ -81,7 +148,7 @@ pub trait Adapter: Send + Sync + Debug + Unpin + 'static {
81148
}
82149

83150
/// Scan a key prefix to get all keys that start with this key.
84-
fn scan(&self, path: &str) -> impl Future<Output = Result<Vec<String>>> + MaybeSend {
151+
fn scan(&self, path: &str) -> impl Future<Output = Result<Self::Scanner>> + MaybeSend {
85152
let _ = path;
86153

87154
ready(Err(Error::new(

core/src/raw/adapters/kv/backend.rs

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use std::sync::Arc;
1919
use std::vec::IntoIter;
2020

21-
use super::Adapter;
21+
use super::{Adapter, Scan};
2222
use crate::raw::oio::HierarchyLister;
2323
use crate::raw::oio::QueueBuf;
2424
use crate::raw::*;
@@ -68,8 +68,8 @@ impl<S: Adapter> Access for Backend<S> {
6868
type BlockingReader = Buffer;
6969
type Writer = KvWriter<S>;
7070
type BlockingWriter = KvWriter<S>;
71-
type Lister = HierarchyLister<KvLister>;
72-
type BlockingLister = HierarchyLister<KvLister>;
71+
type Lister = HierarchyLister<KvLister<S::Scanner>>;
72+
type BlockingLister = HierarchyLister<BlockingKvLister>;
7373

7474
fn info(&self) -> Arc<AccessorInfo> {
7575
let mut am: AccessorInfo = self.kv.metadata().into();
@@ -182,19 +182,60 @@ impl<S: Adapter> Access for Backend<S> {
182182
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
183183
let p = build_abs_path(&self.root, path);
184184
let res = self.kv.blocking_scan(&p)?;
185-
let lister = KvLister::new(&self.root, res);
185+
let lister = BlockingKvLister::new(&self.root, res);
186186
let lister = HierarchyLister::new(lister, path, args.recursive());
187187

188188
Ok((RpList::default(), lister))
189189
}
190190
}
191191

192-
pub struct KvLister {
192+
pub struct KvLister<Iter> {
193+
root: String,
194+
inner: Iter,
195+
}
196+
197+
impl<Iter> KvLister<Iter>
198+
where
199+
Iter: Scan,
200+
{
201+
fn new(root: &str, inner: Iter) -> Self {
202+
Self {
203+
root: root.to_string(),
204+
inner,
205+
}
206+
}
207+
208+
async fn inner_next(&mut self) -> Result<Option<oio::Entry>> {
209+
Ok(self.inner.next().await?.map(|v| {
210+
let mode = if v.ends_with('/') {
211+
EntryMode::DIR
212+
} else {
213+
EntryMode::FILE
214+
};
215+
let mut path = build_rel_path(&self.root, &v);
216+
if path.is_empty() {
217+
path = "/".to_string();
218+
}
219+
oio::Entry::new(&path, Metadata::new(mode))
220+
}))
221+
}
222+
}
223+
224+
impl<Iter> oio::List for KvLister<Iter>
225+
where
226+
Iter: Scan,
227+
{
228+
async fn next(&mut self) -> Result<Option<oio::Entry>> {
229+
self.inner_next().await
230+
}
231+
}
232+
233+
pub struct BlockingKvLister {
193234
root: String,
194235
inner: IntoIter<String>,
195236
}
196237

197-
impl KvLister {
238+
impl BlockingKvLister {
198239
fn new(root: &str, inner: Vec<String>) -> Self {
199240
Self {
200241
root: root.to_string(),
@@ -218,13 +259,7 @@ impl KvLister {
218259
}
219260
}
220261

221-
impl oio::List for KvLister {
222-
async fn next(&mut self) -> Result<Option<oio::Entry>> {
223-
Ok(self.inner_next())
224-
}
225-
}
226-
227-
impl oio::BlockingList for KvLister {
262+
impl oio::BlockingList for BlockingKvLister {
228263
fn next(&mut self) -> Result<Option<oio::Entry>> {
229264
Ok(self.inner_next())
230265
}

core/src/raw/adapters/kv/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,16 @@
2222
mod api;
2323
pub use api::Adapter;
2424
pub use api::Metadata;
25+
pub use api::Scan;
26+
#[cfg(any(
27+
feature = "services-cloudflare-kv",
28+
feature = "services-etcd",
29+
feature = "services-nebula-graph",
30+
feature = "services-rocksdb",
31+
feature = "services-sled"
32+
))]
33+
pub(crate) use api::ScanStdIter;
34+
pub use api::Scanner;
2535

2636
mod backend;
2737
pub use backend::Backend;

core/src/services/atomicserver/backend.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,8 @@ impl Adapter {
351351
}
352352

353353
impl kv::Adapter for Adapter {
354+
type Scanner = ();
355+
354356
fn metadata(&self) -> kv::Metadata {
355357
kv::Metadata::new(
356358
Scheme::Atomicserver,

core/src/services/cacache/backend.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ impl Debug for Adapter {
8585
}
8686

8787
impl kv::Adapter for Adapter {
88+
type Scanner = ();
89+
8890
fn metadata(&self) -> kv::Metadata {
8991
kv::Metadata::new(
9092
Scheme::Cacache,

core/src/services/cloudflare_kv/backend.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,8 @@ impl Adapter {
181181
}
182182

183183
impl kv::Adapter for Adapter {
184+
type Scanner = kv::Scanner;
185+
184186
fn metadata(&self) -> kv::Metadata {
185187
kv::Metadata::new(
186188
Scheme::CloudflareKv,
@@ -240,7 +242,7 @@ impl kv::Adapter for Adapter {
240242
}
241243
}
242244

243-
async fn scan(&self, path: &str) -> Result<Vec<String>> {
245+
async fn scan(&self, path: &str) -> Result<Self::Scanner> {
244246
let mut url = format!("{}/keys", self.url_prefix);
245247
if !path.is_empty() {
246248
url = format!("{}?prefix={}", url, path);
@@ -261,7 +263,9 @@ impl kv::Adapter for Adapter {
261263
format!("failed to parse error response: {}", e),
262264
)
263265
})?;
264-
Ok(response.result.into_iter().map(|r| r.name).collect())
266+
Ok(Box::new(kv::ScanStdIter::new(
267+
response.result.into_iter().map(|r| Ok(r.name)),
268+
)))
265269
}
266270
_ => Err(parse_error(resp)),
267271
}

core/src/services/d1/backend.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,8 @@ impl Adapter {
258258
}
259259

260260
impl kv::Adapter for Adapter {
261+
type Scanner = ();
262+
261263
fn metadata(&self) -> kv::Metadata {
262264
kv::Metadata::new(
263265
Scheme::D1,

core/src/services/etcd/backend.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use std::fmt::Debug;
1919
use std::fmt::Formatter;
20+
use std::vec;
2021

2122
use bb8::PooledConnection;
2223
use bb8::RunError;
@@ -271,6 +272,8 @@ impl Adapter {
271272
}
272273

273274
impl kv::Adapter for Adapter {
275+
type Scanner = kv::ScanStdIter<vec::IntoIter<Result<String>>>;
276+
274277
fn metadata(&self) -> kv::Metadata {
275278
kv::Metadata::new(
276279
Scheme::Etcd,
@@ -310,7 +313,7 @@ impl kv::Adapter for Adapter {
310313
Ok(())
311314
}
312315

313-
async fn scan(&self, path: &str) -> Result<Vec<String>> {
316+
async fn scan(&self, path: &str) -> Result<Self::Scanner> {
314317
let mut client = self.conn().await?;
315318
let get_options = Some(GetOptions::new().with_prefix().with_keys_only());
316319
let resp = client
@@ -323,10 +326,10 @@ impl kv::Adapter for Adapter {
323326
Error::new(ErrorKind::Unexpected, "store key is not valid utf-8 string")
324327
.set_source(err)
325328
})?;
326-
res.push(v);
329+
res.push(Ok(v));
327330
}
328331

329-
Ok(res)
332+
Ok(kv::ScanStdIter::new(res.into_iter()))
330333
}
331334
}
332335

0 commit comments

Comments
 (0)