Skip to content

Commit 403a645

Browse files
committed
Fix: do start and stop immediately instead of scheduling
Instead of handling start and stop action via the scheduler it is done immediately. This helps to reflect the actual status to the client instead of doing it in the background. Although the request may potentially take longer it prevents race conditions as it should ensure that the change is known to the database on return.
1 parent 97fb0cd commit 403a645

File tree

3 files changed

+23
-88
lines changed

3 files changed

+23
-88
lines changed

rust/src/openvasd/container_image_scanner/endpoints/scans.rs

Lines changed: 16 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,15 @@ use greenbone_scanner_framework::{
77
models::{PreferenceValue, ScanPreferenceInformation},
88
prelude::*,
99
};
10-
use tokio::sync::mpsc::Sender;
1110
use tracing::instrument;
1211

1312
use crate::{
14-
container_image_scanner::scheduling::{
15-
self,
16-
db::{DBResults, DataBase, scan::DBScan},
17-
},
18-
database::dao::{DAOError, DBViolation, Execute, Fetch, StreamFetch},
13+
container_image_scanner::scheduling::db::{DBResults, DataBase, scan::DBScan},
14+
database::dao::{DAOError, DBViolation, Execute, Fetch, RetryExec, StreamFetch},
1915
};
2016

2117
pub struct Scans {
2218
pub pool: DataBase,
23-
pub scheduling: Sender<scheduling::Message>,
2419
}
2520

2621
impl Prefixed for Scans {
@@ -180,10 +175,9 @@ impl PostScansId for Scans {
180175
id: String,
181176
action: models::Action,
182177
) -> Pin<Box<dyn Future<Output = Result<(), PostScansIDError>> + Send + '_>> {
183-
let sender = self.scheduling.clone();
184178
Box::pin(async move {
185-
sender
186-
.send(scheduling::Message::new(id, action))
179+
DBScan::new(&self.pool, (id, action))
180+
.retry_exec()
187181
.await
188182
.map_err(PostScansIDError::from_external)
189183
})
@@ -204,7 +198,9 @@ impl DeleteScansId for Scans {
204198
if phase.is_running() {
205199
return Err(DeleteScansIDError::Running);
206200
}
207-
db.exec().await.map_err(DeleteScansIDError::from_external)
201+
db.retry_exec()
202+
.await
203+
.map_err(DeleteScansIDError::from_external)
208204
})
209205
}
210206
}
@@ -218,20 +214,14 @@ pub mod scans_utils {
218214
use tokio::sync::Mutex;
219215

220216
use super::Scans;
221-
use crate::{
222-
container_image_scanner::{
223-
Config, MIGRATOR,
224-
config::DBLocation,
225-
image::{
226-
DockerRegistryV2, DockerRegistryV2Mock, RegistrySetting, extractor::filtered_image,
227-
packages::AllTypes,
228-
},
229-
scheduling::{
230-
Scheduler,
231-
db::{DataBase, scan::DBScan},
232-
},
217+
use crate::container_image_scanner::{
218+
Config, MIGRATOR,
219+
config::DBLocation,
220+
image::{
221+
DockerRegistryV2, DockerRegistryV2Mock, RegistrySetting, extractor::filtered_image,
222+
packages::AllTypes,
233223
},
234-
database::dao::Execute,
224+
scheduling::{Scheduler, db::DataBase},
235225
};
236226
use scannerlib::notus::path_to_products;
237227

@@ -257,15 +247,12 @@ pub mod scans_utils {
257247
let products_path: &str =
258248
concat!(env!("CARGO_MANIFEST_DIR"), "/examples/feed/notus/products");
259249

260-
let (sender, scheduler) = Scheduler::<R, E>::init(
250+
let scheduler = Scheduler::<R, E>::init(
261251
config.into(),
262252
pool.clone(),
263253
path_to_products(products_path, false),
264254
);
265-
let scans = super::Scans {
266-
pool: pool.clone(),
267-
scheduling: sender,
268-
};
255+
let scans = super::Scans { pool: pool.clone() };
269256
(scheduler, scans)
270257
}
271258

@@ -276,16 +263,6 @@ pub mod scans_utils {
276263
}
277264

278265
impl Fakes {
279-
async fn recv(&mut self) {
280-
let msg = self.scheduler.receiver().recv().await;
281-
if let Some(msg) = msg {
282-
DBScan::new(&self.scheduler.pool(), (msg.id, msg.action))
283-
.exec()
284-
.await
285-
.unwrap();
286-
}
287-
}
288-
289266
pub async fn internal_id(&self, client_id: &str, scan_id: &str) -> String {
290267
self.entry
291268
.contains_scan_id(client_id, scan_id)
@@ -305,8 +282,6 @@ pub mod scans_utils {
305282
.await
306283
.unwrap();
307284

308-
self.recv().await;
309-
310285
self.entry.get_scans_id_status(id).await.unwrap()
311286
}
312287

@@ -328,8 +303,6 @@ pub mod scans_utils {
328303
.await
329304
.unwrap();
330305

331-
self.recv().await;
332-
333306
(scan_id, self.entry.get_scans_id_status(id).await.unwrap())
334307
}
335308

rust/src/openvasd/container_image_scanner/mod.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ pub async fn init(
9797
.await?;
9898
MIGRATOR.run(&pool).await?;
9999

100-
let (sender, scheduler) = Scheduler::<DockerRegistryV2, filtered_image::Extractor>::init(
100+
let scheduler = Scheduler::<DockerRegistryV2, filtered_image::Extractor>::init(
101101
config.into(),
102102
pool.clone(),
103103
products,
@@ -106,10 +106,7 @@ pub async fn init(
106106
scheduler.run::<AllTypes>().await;
107107
});
108108

109-
let scan = Scans {
110-
pool,
111-
scheduling: sender,
112-
};
109+
let scan = Scans { pool };
113110
let vts = VTEndpoints::new(
114111
SqlPluginStorage::from(vt_pool),
115112
feed_state,

rust/src/openvasd/container_image_scanner/scheduling/mod.rs

Lines changed: 5 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,7 @@ use container_image_scanner::{
99
use futures::StreamExt;
1010
use greenbone_scanner_framework::models;
1111
use tokio::{
12-
sync::{
13-
Mutex, RwLock,
14-
mpsc::{Receiver, Sender},
15-
},
12+
sync::{Mutex, RwLock},
1613
task::JoinSet,
1714
time,
1815
};
@@ -60,25 +57,13 @@ pub struct ProcessingImage {
6057
pub image: Vec<Result<Image, ImageParseError>>,
6158
pub credentials: Option<Credential>,
6259
}
63-
#[derive(Debug)]
64-
pub struct Message {
65-
pub id: String,
66-
pub action: models::Action,
67-
}
68-
69-
impl Message {
70-
pub fn new(id: String, action: models::Action) -> Self {
71-
Self { id, action }
72-
}
73-
}
7460

7561
/// Scheduler is responsible to start, stop and storing results of scans.
7662
///
7763
/// It retrieves commands usually by the endpoint handler to either start or stop a scan.
7864
/// It then sets the status of that scan to queued and regularly verifies if a scan can be started
7965
/// when the scan is finished it also sets the status to either succeed or failed.
8066
pub struct Scheduler<Registry, Extractor> {
81-
receiver: Receiver<Message>,
8267
pool: DataBase,
8368
config: Arc<Config>,
8469
registry: PhantomData<Registry>,
@@ -89,12 +74,10 @@ pub struct Scheduler<Registry, Extractor> {
8974
impl<Registry, Extractor> Scheduler<Registry, Extractor> {
9075
fn new(
9176
config: Arc<Config>,
92-
receiver: Receiver<Message>,
9377
pool: DataBase,
9478
products: Arc<RwLock<Notus<HashsumProductLoader>>>,
9579
) -> Self {
9680
Scheduler {
97-
receiver,
9881
pool,
9982
config,
10083
registry: PhantomData,
@@ -107,9 +90,8 @@ impl<Registry, Extractor> Scheduler<Registry, Extractor> {
10790
config: Arc<Config>,
10891
pool: DataBase,
10992
products: Arc<RwLock<Notus<HashsumProductLoader>>>,
110-
) -> (Sender<Message>, Scheduler<Registry, Extractor>) {
111-
let (sender, receiver) = tokio::sync::mpsc::channel(10);
112-
(sender, Self::new(config, receiver, pool, products))
93+
) -> Scheduler<Registry, Extractor> {
94+
Self::new(config, pool, products)
11395
}
11496
}
11597

@@ -131,11 +113,6 @@ where
131113
self.pool.clone()
132114
}
133115

134-
#[cfg(test)]
135-
pub fn receiver(&mut self) -> &mut Receiver<Message> {
136-
&mut self.receiver
137-
}
138-
139116
#[cfg(test)]
140117
pub fn config(&self) -> Arc<Config> {
141118
self.config.clone()
@@ -215,7 +192,7 @@ where
215192
) where
216193
T: ToNotus,
217194
{
218-
tracing::debug!("checking for requested and scanning");
195+
tracing::trace!("checking for requested and scanning");
219196
let pool = conn.lock().await;
220197
let requested = match DBImages::new(&pool, config.max_scans).fetch().await {
221198
Ok(r) => r,
@@ -343,7 +320,7 @@ where
343320
js.join_all().await;
344321
}
345322

346-
pub async fn run<T>(mut self)
323+
pub async fn run<T>(self)
347324
where
348325
T: ToNotus,
349326
{
@@ -367,19 +344,7 @@ where
367344
let conn = Arc::new(Mutex::new(conn));
368345
loop {
369346
tokio::select! {
370-
Some(msg) = self.receiver.recv() => {
371-
let pool = self.pool.clone();
372-
373-
tokio::spawn(async move {
374-
let id = msg.id.clone();
375347

376-
if let Err(e) = DBScan::new(&pool, (msg.id, msg.action))
377-
.retry_exec()
378-
.await {
379-
warn!(error=%e, id, "Unable to handle message");
380-
}
381-
});
382-
}
383348

384349
_ = interval.tick() => {
385350
let products = self.products.clone();

0 commit comments

Comments
 (0)