Skip to content

Commit a38a76c

Browse files
committed
introduce only-decription mode
Signed-off-by: hehechen <awd123456sss@gmail.com>
1 parent ca2f51f commit a38a76c

File tree

4 files changed

+159
-7
lines changed

4 files changed

+159
-7
lines changed

components/raftstore/src/engine_store_ffi/mod.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ impl<T> UnwrapExternCFunc<T> for std::option::Option<T> {
5656
pub struct RaftStoreProxy {
5757
status: AtomicU8,
5858
key_manager: Option<Arc<DataKeyManager>>,
59-
read_index_client: Box<dyn read_index_helper::ReadIndex>,
59+
read_index_client: Option<Box<dyn read_index_helper::ReadIndex>>,
6060
kv_engine: std::sync::RwLock<Option<engine_rocks::RocksEngine>>,
6161
}
6262

@@ -72,7 +72,7 @@ impl RaftStoreProxy {
7272
pub fn new(
7373
status: AtomicU8,
7474
key_manager: Option<Arc<DataKeyManager>>,
75-
read_index_client: Box<dyn read_index_helper::ReadIndex>,
75+
read_index_client: Option<Box<dyn read_index_helper::ReadIndex>>,
7676
kv_engine: std::sync::RwLock<Option<engine_rocks::RocksEngine>>,
7777
) -> Self {
7878
RaftStoreProxy {
@@ -206,6 +206,14 @@ pub extern "C" fn ffi_batch_read_index(
206206
fn_insert_batch_read_index_resp: Option<unsafe extern "C" fn(RawVoidPtr, BaseBuffView, u64)>,
207207
) {
208208
assert!(!proxy_ptr.is_null());
209+
unsafe {
210+
match proxy_ptr.as_ref().read_index_client {
211+
Option::None => {
212+
return;
213+
}
214+
_ => {}
215+
}
216+
}
209217
debug_assert!(fn_insert_batch_read_index_resp.is_some());
210218
if view.len != 0 {
211219
assert_ne!(view.view, std::ptr::null());
@@ -223,6 +231,8 @@ pub extern "C" fn ffi_batch_read_index(
223231
let resp = proxy_ptr
224232
.as_ref()
225233
.read_index_client
234+
.as_ref()
235+
.unwrap()
226236
.batch_read_index(req_vec, time::Duration::from_millis(timeout_ms));
227237
assert_ne!(res, std::ptr::null_mut());
228238
for (r, region_id) in &resp {
@@ -295,12 +305,22 @@ pub extern "C" fn ffi_make_read_index_task(
295305
req_view: BaseBuffView,
296306
) -> RawRustPtr {
297307
assert!(!proxy_ptr.is_null());
308+
unsafe {
309+
match proxy_ptr.as_ref().read_index_client {
310+
Option::None => {
311+
return RawRustPtr::default();
312+
}
313+
_ => {}
314+
}
315+
}
298316
let mut req = kvrpcpb::ReadIndexRequest::default();
299317
req.merge_from_bytes(req_view.to_slice()).unwrap();
300318
let task = unsafe {
301319
proxy_ptr
302320
.as_ref()
303321
.read_index_client
322+
.as_ref()
323+
.unwrap()
304324
.make_read_index_task(req)
305325
};
306326
return match task {
@@ -346,6 +366,14 @@ pub extern "C" fn ffi_poll_read_index_task(
346366
waker: RawVoidPtr,
347367
) -> u8 {
348368
assert!(!proxy_ptr.is_null());
369+
unsafe {
370+
match proxy_ptr.as_ref().read_index_client {
371+
Option::None => {
372+
return 0;
373+
}
374+
_ => {}
375+
}
376+
}
349377
let task = unsafe {
350378
&mut *(task_ptr as *mut crate::engine_store_ffi::read_index_helper::ReadIndexTask)
351379
};
@@ -358,6 +386,8 @@ pub extern "C" fn ffi_poll_read_index_task(
358386
proxy_ptr
359387
.as_ref()
360388
.read_index_client
389+
.as_ref()
390+
.unwrap()
361391
.poll_read_index_task(task, waker)
362392
} {
363393
get_engine_store_server_helper().set_read_index_resp(resp_data, &res);

components/server/src/proxy.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,11 @@ pub unsafe fn run_proxy(
197197
.required(true)
198198
.takes_value(true),
199199
)
200+
.arg(
201+
Arg::with_name("only-decryption")
202+
.long("only-decryption")
203+
.help("Only do decryption in Proxy"),
204+
)
200205
.get_matches_from(args);
201206

202207
if matches.is_present("print-sample-config") {
@@ -241,7 +246,11 @@ pub unsafe fn run_proxy(
241246
}
242247

243248
config.raft_store.engine_store_server_helper = engine_store_server_helper as *const _ as isize;
244-
crate::server::run_tikv(config, engine_store_server_helper);
249+
if matches.is_present("only-decryption") {
250+
crate::server::run_tikv_only_decryption(config, engine_store_server_helper);
251+
} else {
252+
crate::server::run_tikv(config, engine_store_server_helper);
253+
}
245254
}
246255

247256
fn check_engine_label(matches: &clap::ArgMatches<'_>) {

components/server/src/server.rs

Lines changed: 115 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,10 @@ pub unsafe fn run_tikv(config: TiKvConfig, engine_store_server_helper: &EngineSt
146146
let mut proxy = RaftStoreProxy::new(
147147
AtomicU8::new(RaftProxyStatus::Idle as u8),
148148
tikv.encryption_key_manager.clone(),
149-
Box::new(ReadIndexClient::new(
149+
Some(Box::new(ReadIndexClient::new(
150150
tikv.router.clone(),
151151
SysQuota::cpu_cores_quota() as usize * 2,
152-
)),
152+
))),
153153
std::sync::RwLock::new(None),
154154
);
155155

@@ -239,6 +239,119 @@ pub unsafe fn run_tikv(config: TiKvConfig, engine_store_server_helper: &EngineSt
239239
}
240240
}
241241

242+
/// Run a TiKV server only for decryption. Returns when the server is shutdown by the user, in which
243+
/// case the server will be properly stopped.
244+
pub unsafe fn run_tikv_only_decryption(
245+
config: TiKvConfig,
246+
engine_store_server_helper: &EngineStoreServerHelper,
247+
) {
248+
// Sets the global logger ASAP.
249+
// It is okay to use the config w/o `validate()`,
250+
// because `initial_logger()` handles various conditions.
251+
initial_logger(&config);
252+
253+
// Print version information.
254+
crate::log_proxy_info();
255+
256+
// Print resource quota.
257+
SysQuota::log_quota();
258+
CPU_CORES_QUOTA_GAUGE.set(SysQuota::cpu_cores_quota());
259+
260+
// Do some prepare works before start.
261+
pre_start();
262+
263+
let _m = Monitor::default();
264+
265+
macro_rules! run_impl {
266+
($ER: ty) => {{
267+
let encryption_key_manager =
268+
data_key_manager_from_config(&config.security.encryption, &config.storage.data_dir)
269+
.map_err(|e| {
270+
panic!(
271+
"Encryption failed to initialize: {}. code: {}",
272+
e,
273+
e.error_code()
274+
)
275+
})
276+
.unwrap()
277+
.map(Arc::new);
278+
279+
let mut proxy = RaftStoreProxy::new(
280+
AtomicU8::new(RaftProxyStatus::Idle as u8),
281+
encryption_key_manager.clone(),
282+
Option::None,
283+
std::sync::RwLock::new(None),
284+
);
285+
286+
let proxy_helper = {
287+
let mut proxy_helper = RaftStoreProxyFFIHelper::new(&proxy);
288+
proxy_helper.fn_server_info = Some(ffi_server_info);
289+
proxy_helper
290+
};
291+
292+
info!("set raft-store proxy helper");
293+
294+
engine_store_server_helper.handle_set_proxy(&proxy_helper);
295+
296+
info!("wait for engine-store server to start");
297+
while engine_store_server_helper.handle_get_engine_store_server_status()
298+
== EngineStoreServerStatus::Idle
299+
{
300+
thread::sleep(Duration::from_millis(200));
301+
}
302+
303+
if engine_store_server_helper.handle_get_engine_store_server_status()
304+
!= EngineStoreServerStatus::Running
305+
{
306+
info!("engine-store server is not running, make proxy exit");
307+
return;
308+
}
309+
310+
info!("engine-store server is started");
311+
312+
proxy.set_status(RaftProxyStatus::Running);
313+
314+
{
315+
debug_assert!(
316+
engine_store_server_helper.handle_get_engine_store_server_status()
317+
== EngineStoreServerStatus::Running
318+
);
319+
loop {
320+
if engine_store_server_helper.handle_get_engine_store_server_status()
321+
!= EngineStoreServerStatus::Running
322+
{
323+
break;
324+
}
325+
thread::sleep(Duration::from_millis(200));
326+
}
327+
}
328+
329+
info!(
330+
"found engine-store server status is {:?}, start to stop all services",
331+
engine_store_server_helper.handle_get_engine_store_server_status()
332+
);
333+
334+
proxy.set_status(RaftProxyStatus::Stopped);
335+
336+
info!("all services in raft-store proxy are stopped");
337+
338+
info!("wait for engine-store server to stop");
339+
while engine_store_server_helper.handle_get_engine_store_server_status()
340+
!= EngineStoreServerStatus::Terminated
341+
{
342+
thread::sleep(Duration::from_millis(200));
343+
}
344+
info!("engine-store server is stopped");
345+
}};
346+
}
347+
348+
if !config.raft_engine.enable {
349+
run_impl!(RocksEngine)
350+
} else {
351+
run_impl!(RaftLogEngine)
352+
}
353+
}
354+
242355
const RESERVED_OPEN_FDS: u64 = 1000;
243356

244357
const DEFAULT_METRICS_FLUSH_INTERVAL: Duration = Duration::from_millis(10_000);

components/test_raftstore/src/cluster.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,10 +297,10 @@ impl<T: Simulator> Cluster<T> {
297297
let proxy = Box::new(raftstore::engine_store_ffi::RaftStoreProxy::new(
298298
AtomicU8::new(raftstore::engine_store_ffi::RaftProxyStatus::Idle as u8),
299299
key_mgr.clone(),
300-
Box::new(raftstore::engine_store_ffi::ReadIndexClient::new(
300+
Some(Box::new(raftstore::engine_store_ffi::ReadIndexClient::new(
301301
router.clone(),
302302
SysQuota::cpu_cores_quota() as usize * 2,
303-
)),
303+
))),
304304
std::sync::RwLock::new(Some(engines.kv.clone())),
305305
));
306306

0 commit comments

Comments
 (0)