Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ thiserror = "1.0"
serde_json = "1.0"
libc = "0.2.132"
lazy_static = "1.4.0"
nix = "0.23"

[dev-dependencies]
tempfile = "3.0"
Expand Down
14 changes: 14 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
Second State Containerd shim of Wasmedge
Copyright 2022 The Second State

This product includes software developed at
The Second State (https://github.com/second-state/runwasi).

The Initial Developer of major parts of the framework, which are
fork from Deis Labs (https://github.com/deislabs).
Copyright 2022 Deis Labs. All Rights Reserved.

The Initial Developer of some parts of the file
crates/containerd-shim-wasm/src/sandbox/shim.rs, which are copied from,
derived from, or inspired by Youki (https://github.com/containers/youki)
Copyright 2021 - 2022 Youki. All Rights Reserved.
86 changes: 73 additions & 13 deletions crates/containerd-shim-wasm/src/sandbox/shim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use nix::mount::{mount, MsFlags};
use nix::sched::{setns, unshare, CloneFlags};
use nix::sys::stat::Mode;
use nix::unistd::mkdir;
use oci_spec::runtime;
use oci_spec::runtime::{self, Mount};
use ttrpc::context::Context;

use super::instance::{EngineGetter, Instance, InstanceConfig, Nop};
Expand Down Expand Up @@ -815,7 +815,7 @@ where
..Default::default()
});
return Ok(api::CreateTaskResponse {
pid: std::process::id(), // TODO: PID
pid: std::process::id(),
..Default::default()
});
}
Expand Down Expand Up @@ -1013,8 +1013,8 @@ where
container_id: id.clone(),
exit_status: ec.0,
exited_at: SingularPtrField::some(timestamp),
pid: pid,
id: id,
pid,
id,
..Default::default()
};

Expand Down Expand Up @@ -1303,15 +1303,7 @@ where
shim::Error::InvalidArgument(format!("error loading runtime spec: {}", err))
})?;

let default = HashMap::new() as HashMap<String, String>;
let annotations = spec.annotations().as_ref().unwrap_or(&default);

let id = opts.id.clone();

let grouping = annotations
.get("io.kubernetes.cri.sandbox-id")
.unwrap_or(&id)
.to_string();
let grouping = opts.id.clone();

let envs = vec![] as Vec<(&str, &str)>;

Expand All @@ -1336,8 +1328,10 @@ where
setns(f.as_raw_fd(), CloneFlags::CLONE_NEWNET).map_err(|err| {
ShimError::Other(format!("could not set network namespace: {0}", err))
})?;

break;
}

unshare(CloneFlags::CLONE_NEWNET).map_err(|err| {
ShimError::Other(format!("could not unshare network namespace: {0}", err))
})?;
Expand All @@ -1359,6 +1353,21 @@ where
)
.map_err(|err| shim::Error::Other(format!("failed to remount rootfs as slave: {}", err)))?;

if let Some(mounts) = spec.mounts() {
for m in mounts {
if m.typ().as_ref().unwrap() != "bind" {
continue;
}

let os_source = m.source().as_ref().unwrap().clone().into_os_string();
let source = os_source.to_string_lossy();

let flags = parse_mount(m);

mount::<str, Path, str, str>(Some(&source), m.destination(), None, flags, None)?;
}
}

let (_child, address) = shim::spawn(opts, &grouping, envs)?;

write_address(&address)?;
Expand Down Expand Up @@ -1386,6 +1395,57 @@ where
}
}

pub fn parse_mount(m: &Mount) -> MsFlags {
let mut flags = MsFlags::empty();
if let Some(options) = &m.options() {
for s in options {
if let Some((is_clear, flag)) = match s.as_str() {
"defaults" => Some((false, MsFlags::empty())),
"ro" => Some((false, MsFlags::MS_RDONLY)),
"rw" => Some((true, MsFlags::MS_RDONLY)),
"suid" => Some((true, MsFlags::MS_NOSUID)),
"nosuid" => Some((false, MsFlags::MS_NOSUID)),
"dev" => Some((true, MsFlags::MS_NODEV)),
"nodev" => Some((false, MsFlags::MS_NODEV)),
"exec" => Some((true, MsFlags::MS_NOEXEC)),
"noexec" => Some((false, MsFlags::MS_NOEXEC)),
"sync" => Some((false, MsFlags::MS_SYNCHRONOUS)),
"async" => Some((true, MsFlags::MS_SYNCHRONOUS)),
"dirsync" => Some((false, MsFlags::MS_DIRSYNC)),
"remount" => Some((false, MsFlags::MS_REMOUNT)),
"mand" => Some((false, MsFlags::MS_MANDLOCK)),
"nomand" => Some((true, MsFlags::MS_MANDLOCK)),
"atime" => Some((true, MsFlags::MS_NOATIME)),
"noatime" => Some((false, MsFlags::MS_NOATIME)),
"diratime" => Some((true, MsFlags::MS_NODIRATIME)),
"nodiratime" => Some((false, MsFlags::MS_NODIRATIME)),
"bind" => Some((false, MsFlags::MS_BIND)),
"rbind" => Some((false, MsFlags::MS_BIND | MsFlags::MS_REC)),
"unbindable" => Some((false, MsFlags::MS_UNBINDABLE)),
"runbindable" => Some((false, MsFlags::MS_UNBINDABLE | MsFlags::MS_REC)),
"private" => Some((true, MsFlags::MS_PRIVATE)),
"rprivate" => Some((true, MsFlags::MS_PRIVATE | MsFlags::MS_REC)),
"shared" => Some((true, MsFlags::MS_SHARED)),
"rshared" => Some((true, MsFlags::MS_SHARED | MsFlags::MS_REC)),
"slave" => Some((true, MsFlags::MS_SLAVE)),
"rslave" => Some((true, MsFlags::MS_SLAVE | MsFlags::MS_REC)),
"relatime" => Some((true, MsFlags::MS_RELATIME)),
"norelatime" => Some((true, MsFlags::MS_RELATIME)),
"strictatime" => Some((true, MsFlags::MS_STRICTATIME)),
"nostrictatime" => Some((true, MsFlags::MS_STRICTATIME)),
_ => None,
} {
if is_clear {
flags &= !flag;
} else {
flags |= flag;
}
};
}
}
flags
}

fn forward_events(
namespace: String,
publisher: RemotePublisher,
Expand Down
94 changes: 88 additions & 6 deletions src/instance.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
use libc::{dup, dup2, STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO};
use std::collections::HashMap;
use std::io::ErrorKind;
use std::io::Write;
use std::fs::OpenOptions;
use std::os::unix::io::{IntoRawFd, RawFd};
use std::os::unix::process::CommandExt;
use std::path::Path;
use std::sync::mpsc::channel;
use std::sync::mpsc::Sender;
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::thread;
use std::{process, thread};

use anyhow::Context;
use chrono::{DateTime, Utc};
use containerd_shim_wasm::sandbox::error::Error;
use containerd_shim_wasm::sandbox::oci;
use containerd_shim_wasm::sandbox::{EngineGetter, Instance, InstanceConfig};
use log::{debug, error};
use nix::{sys::signal, unistd::Pid};
use wasmedge_sdk::{
config::{CommonConfigOptions, ConfigBuilder, HostRegistrationConfigOptions},
params, Vm,
Expand Down Expand Up @@ -167,6 +172,20 @@ pub fn prepare_module(
Ok(vm)
}

fn parse_env(envs: &[String]) -> HashMap<String, String> {
// make NAME=VALUE to HashMap<NAME, VALUE>.
envs.iter()
.filter_map(|e| {
let mut split = e.split('=');

split.next().map(|key| {
let value = split.collect::<Vec<&str>>().join("=");
(key.into(), value)
})
})
.collect()
}

impl Instance for Wasi {
type E = Vm;
fn new(id: String, cfg: Option<&InstanceConfig<Self::E>>) -> Self {
Expand All @@ -182,11 +201,74 @@ impl Instance for Wasi {
}
}
fn start(&self) -> Result<u32, Error> {
// Call prehook before the start
let bundle = self.bundle.clone();
let spec = oci::load(Path::new(&bundle).join("config.json").to_str().unwrap())?;
match spec.hooks().as_ref() {
None => {
log::debug!("no hooks found")
}
Some(hooks) => {
let prestart_hooks = hooks.prestart().as_ref().unwrap();

for hook in prestart_hooks {
let mut hook_command = process::Command::new(&hook.path());
// Based on OCI spec, the first argument of the args vector is the
// arg0, which can be different from the path. For example, path
// may be "/usr/bin/true" and arg0 is set to "true". However, rust
// command differenciates arg0 from args, where rust command arg
// doesn't include arg0. So we have to make the split arg0 from the
// rest of args.
if let Some((arg0, args)) = hook.args().as_ref().and_then(|a| a.split_first()) {
log::debug!("run_hooks arg0: {:?}, args: {:?}", arg0, args);
hook_command.arg0(arg0).args(args)
} else {
hook_command.arg0(&hook.path().display().to_string())
};

let envs: HashMap<String, String> = if let Some(env) = hook.env() {
parse_env(env)
} else {
HashMap::new()
};
log::debug!("run_hooks envs: {:?}", envs);

let mut hook_process = hook_command
.env_clear()
.envs(envs)
.stdin(process::Stdio::piped())
.spawn()
.with_context(|| "Failed to execute hook")?;
let hook_process_pid = Pid::from_raw(hook_process.id() as i32);

if let Some(stdin) = &mut hook_process.stdin {
// We want to ignore BrokenPipe here. A BrokenPipe indicates
// either the hook is crashed/errored or it ran successfully.
// Either way, this is an indication that the hook command
// finished execution. If the hook command was successful,
// which we will check later in this function, we should not
// fail this step here. We still want to check for all the other
// error, in the case that the hook command is waiting for us to
// write to stdin.
let state = format!("{{ \"pid\": {} }}", std::process::id());
if let Err(e) = stdin.write_all(state.as_bytes()) {
if e.kind() != ErrorKind::BrokenPipe {
// Not a broken pipe. The hook command may be waiting
// for us.
let _ = signal::kill(hook_process_pid, signal::Signal::SIGKILL);
}
}
}

hook_process.wait()?;
}
}
}

let engine = self.engine.clone();

let exit_code = self.exit_code.clone();
let (tx, rx) = channel::<Result<(), Error>>();
let bundle = self.bundle.clone();
let stdin = self.stdin.clone();
let stdout = self.stdout.clone();
let stderr = self.stderr.clone();
Expand Down Expand Up @@ -252,10 +334,10 @@ impl Instance for Wasi {
}
}

Ok(1) // TODO: PID: I wanted to use a thread ID here, but threads use a u64, the API wants a u32
Ok(std::process::id())
}

fn kill(&self, signal: u32) -> Result<(), Error> {
fn kill(&self, _signal: u32) -> Result<(), Error> {
match &*JOB.read().unwrap() {
Some(job) => {
job.cancel();
Expand All @@ -264,7 +346,7 @@ impl Instance for Wasi {
let mut ec = lock.lock().unwrap();
*ec = Some((137, Utc::now()));
cvar.notify_one();
},
}
None => {
// no running wasm task
}
Expand All @@ -286,7 +368,7 @@ impl Instance for Wasi {
}
let ec = (*exit).unwrap();
match channel.send(ec) {
Ok(_) => {},
Ok(_) => {}
Err(error) => {
error!("channel disconnect: {}", error);
}
Expand Down