diff options
-rw-r--r-- | Cargo.lock | 8 | ||||
-rw-r--r-- | Cargo.toml | 2 | ||||
-rw-r--r-- | controller-cli/.gitignore | 1 | ||||
-rw-r--r-- | controller-cli/Cargo.toml | 9 | ||||
-rw-r--r-- | controller-cli/src/main.rs | 120 | ||||
-rw-r--r-- | controller/.gitignore | 1 | ||||
-rw-r--r-- | examples/kaas/kaas.c | 1 | ||||
-rw-r--r-- | worker/.gitignore | 1 |
8 files changed, 142 insertions, 1 deletions
@@ -39,6 +39,14 @@ dependencies = [ ] [[package]] +name = "dd-controller-cli" +version = "0.1.0" +dependencies = [ + "dd-controller", + "dd-utils", +] + +[[package]] name = "dd-utils" version = "0.1.0" @@ -1,3 +1,3 @@ [workspace] -members = ["worker", "controller", "utils"] +members = ["worker", "controller", "controller-cli", "utils"] diff --git a/controller-cli/.gitignore b/controller-cli/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/controller-cli/.gitignore @@ -0,0 +1 @@ +/target diff --git a/controller-cli/Cargo.toml b/controller-cli/Cargo.toml new file mode 100644 index 0000000..4e9cfaa --- /dev/null +++ b/controller-cli/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "dd-controller-cli" +version = "0.1.0" +authors = ["Tom Smeding <tom.smeding@gmail.com>"] +edition = "2018" + +[dependencies] +dd-controller = { path = "../controller" } +dd-utils = { path = "../utils" } diff --git a/controller-cli/src/main.rs b/controller-cli/src/main.rs new file mode 100644 index 0000000..bf058b7 --- /dev/null +++ b/controller-cli/src/main.rs @@ -0,0 +1,120 @@ +use std::io::{self, Write}; +use dd_controller::ComputePool; +use dd_utils::idgen::IdGen; + +fn prompt(prefix: &str) -> io::Result<Option<String>> { + print!("{}> ", prefix); + io::stdout().flush()?; + let mut line = String::new(); + let n = io::stdin().read_line(&mut line)?; + if n == 0 { + Ok(None) + } else { + line.pop(); // remove the newline + Ok(Some(line)) + } +} + +fn main() -> io::Result<()> { + let mut pool = ComputePool::new(12345)?; + + // for _ in 0..10 { + // println!("Current parallelism: {}", pool.current_job_parallelism()?); + // std::thread::sleep(std::time::Duration::from_secs(2)); + // } + + let mut job_idgen = IdGen::new(1); + + loop { + let num_workers = match pool.current_job_parallelism() { + Ok(num) => num, + Err(e) => { + eprintln!("ERROR getting number of workers: {}", e); + break; + } + }; + + let line = match prompt(&format!("[{}]", num_workers))? { + Some(line) => line, + None => break + }; + + let words = line.split_whitespace().collect::<Vec<_>>(); + if words.len() == 0 { + continue; + } + + match words[0] { + "quit" | "exit" => { + break; + } + + "core" => { + let filename = match prompt("file name")? { Some(s) => s, None => break }; + + match std::fs::read(&filename) { + Ok(contents) => { + match pool.set_core("user-core".to_string(), contents) { + Ok(()) => println!("Core set."), + Err(e) => println!("Could not set core: {}", e), + } + } + + Err(e) => { + println!("Cannot open file '{}': {}", filename, e); + } + } + } + + "job" => { + let filename = match prompt("job input file name")? { Some(s) => s, None => break }; + + match std::fs::read(&filename) { + Ok(contents) => { + let jobid = job_idgen.gen(); + match pool.submit_job(jobid, contents) { + Ok(()) => println!("Submitted with id {}.", jobid), + Err(e) => println!("Could not submit: {}", e), + } + } + + Err(e) => { + println!("Cannot open file '{}': {}", filename, e); + } + } + } + + "wait" => { + match pool.next_completion_event() { + Ok(Some(event)) => { + println!("Result from job {}:", event.jobid); + match event.result { + Ok((retcode, output)) => { + println!("Return code {}, output = {}", retcode, String::from_utf8_lossy(&output)); + } + Err(err) => { + println!("Worker returned error: {}", err); + } + } + } + + Ok(None) => { + println!("No jobs running"); + } + + Err(err) => { + println!("ERROR getting events: {}", err); + break; + } + } + } + + cmd => { + println!("Unknown command '{}'", cmd); + } + } + } + + pool.close()?; + Ok(()) +} diff --git a/controller/.gitignore b/controller/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/controller/.gitignore @@ -0,0 +1 @@ +/target diff --git a/examples/kaas/kaas.c b/examples/kaas/kaas.c index f823442..4469474 100644 --- a/examples/kaas/kaas.c +++ b/examples/kaas/kaas.c @@ -18,6 +18,7 @@ int worker_init(int version) { } int worker_run_job(size_t size, void *input_, size_t *outsize, void **outputp) { + printf("kaas: running job (size = %zu)\n", size); const char *input = input_; char *output = malloc(size); for (size_t i = 0; i < size; i++) output[i] = toupper(input[i]); diff --git a/worker/.gitignore b/worker/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/worker/.gitignore @@ -0,0 +1 @@ +/target |