aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Smeding <tom.smeding@gmail.com>2020-03-28 21:29:50 +0100
committerTom Smeding <tom.smeding@gmail.com>2020-03-28 21:30:41 +0100
commitbe5ba0b1c3cb993543e50a4325e40ceaeb8082fb (patch)
tree768976ab41954e864c8fc98d425389fc8b6591d7
parent768804117c01d3b7a80b8899cb8fae0347dfb1fc (diff)
controller-cli: Proof of concept CLIHEADmaster
-rw-r--r--Cargo.lock8
-rw-r--r--Cargo.toml2
-rw-r--r--controller-cli/.gitignore1
-rw-r--r--controller-cli/Cargo.toml9
-rw-r--r--controller-cli/src/main.rs120
-rw-r--r--controller/.gitignore1
-rw-r--r--examples/kaas/kaas.c1
-rw-r--r--worker/.gitignore1
8 files changed, 142 insertions, 1 deletions
diff --git a/Cargo.lock b/Cargo.lock
index e34ba49..ea44bf9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -39,6 +39,14 @@ dependencies = [
]
[[package]]
+name = "dd-controller-cli"
+version = "0.1.0"
+dependencies = [
+ "dd-controller",
+ "dd-utils",
+]
+
+[[package]]
name = "dd-utils"
version = "0.1.0"
diff --git a/Cargo.toml b/Cargo.toml
index f6a5141..339f427 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,3 +1,3 @@
[workspace]
-members = ["worker", "controller", "utils"]
+members = ["worker", "controller", "controller-cli", "utils"]
diff --git a/controller-cli/.gitignore b/controller-cli/.gitignore
new file mode 100644
index 0000000..ea8c4bf
--- /dev/null
+++ b/controller-cli/.gitignore
@@ -0,0 +1 @@
+/target
diff --git a/controller-cli/Cargo.toml b/controller-cli/Cargo.toml
new file mode 100644
index 0000000..4e9cfaa
--- /dev/null
+++ b/controller-cli/Cargo.toml
@@ -0,0 +1,9 @@
+[package]
+name = "dd-controller-cli"
+version = "0.1.0"
+authors = ["Tom Smeding <tom.smeding@gmail.com>"]
+edition = "2018"
+
+[dependencies]
+dd-controller = { path = "../controller" }
+dd-utils = { path = "../utils" }
diff --git a/controller-cli/src/main.rs b/controller-cli/src/main.rs
new file mode 100644
index 0000000..bf058b7
--- /dev/null
+++ b/controller-cli/src/main.rs
@@ -0,0 +1,120 @@
+use std::io::{self, Write};
+use dd_controller::ComputePool;
+use dd_utils::idgen::IdGen;
+
+fn prompt(prefix: &str) -> io::Result<Option<String>> {
+ print!("{}> ", prefix);
+ io::stdout().flush()?;
+ let mut line = String::new();
+ let n = io::stdin().read_line(&mut line)?;
+ if n == 0 {
+ Ok(None)
+ } else {
+ line.pop(); // remove the newline
+ Ok(Some(line))
+ }
+}
+
+fn main() -> io::Result<()> {
+ let mut pool = ComputePool::new(12345)?;
+
+ // for _ in 0..10 {
+ // println!("Current parallelism: {}", pool.current_job_parallelism()?);
+ // std::thread::sleep(std::time::Duration::from_secs(2));
+ // }
+
+ let mut job_idgen = IdGen::new(1);
+
+ loop {
+ let num_workers = match pool.current_job_parallelism() {
+ Ok(num) => num,
+ Err(e) => {
+ eprintln!("ERROR getting number of workers: {}", e);
+ break;
+ }
+ };
+
+ let line = match prompt(&format!("[{}]", num_workers))? {
+ Some(line) => line,
+ None => break
+ };
+
+ let words = line.split_whitespace().collect::<Vec<_>>();
+ if words.len() == 0 {
+ continue;
+ }
+
+ match words[0] {
+ "quit" | "exit" => {
+ break;
+ }
+
+ "core" => {
+ let filename = match prompt("file name")? { Some(s) => s, None => break };
+
+ match std::fs::read(&filename) {
+ Ok(contents) => {
+ match pool.set_core("user-core".to_string(), contents) {
+ Ok(()) => println!("Core set."),
+ Err(e) => println!("Could not set core: {}", e),
+ }
+ }
+
+ Err(e) => {
+ println!("Cannot open file '{}': {}", filename, e);
+ }
+ }
+ }
+
+ "job" => {
+ let filename = match prompt("job input file name")? { Some(s) => s, None => break };
+
+ match std::fs::read(&filename) {
+ Ok(contents) => {
+ let jobid = job_idgen.gen();
+ match pool.submit_job(jobid, contents) {
+ Ok(()) => println!("Submitted with id {}.", jobid),
+ Err(e) => println!("Could not submit: {}", e),
+ }
+ }
+
+ Err(e) => {
+ println!("Cannot open file '{}': {}", filename, e);
+ }
+ }
+ }
+
+ "wait" => {
+ match pool.next_completion_event() {
+ Ok(Some(event)) => {
+ println!("Result from job {}:", event.jobid);
+ match event.result {
+ Ok((retcode, output)) => {
+ println!("Return code {}, output = {}", retcode, String::from_utf8_lossy(&output));
+ }
+ Err(err) => {
+ println!("Worker returned error: {}", err);
+ }
+ }
+ }
+
+ Ok(None) => {
+ println!("No jobs running");
+ }
+
+ Err(err) => {
+ println!("ERROR getting events: {}", err);
+ break;
+ }
+ }
+ }
+
+ cmd => {
+ println!("Unknown command '{}'", cmd);
+ }
+ }
+ }
+
+ pool.close()?;
+ Ok(())
+}
diff --git a/controller/.gitignore b/controller/.gitignore
new file mode 100644
index 0000000..ea8c4bf
--- /dev/null
+++ b/controller/.gitignore
@@ -0,0 +1 @@
+/target
diff --git a/examples/kaas/kaas.c b/examples/kaas/kaas.c
index f823442..4469474 100644
--- a/examples/kaas/kaas.c
+++ b/examples/kaas/kaas.c
@@ -18,6 +18,7 @@ int worker_init(int version) {
}
int worker_run_job(size_t size, void *input_, size_t *outsize, void **outputp) {
+ printf("kaas: running job (size = %zu)\n", size);
const char *input = input_;
char *output = malloc(size);
for (size_t i = 0; i < size; i++) output[i] = toupper(input[i]);
diff --git a/worker/.gitignore b/worker/.gitignore
new file mode 100644
index 0000000..ea8c4bf
--- /dev/null
+++ b/worker/.gitignore
@@ -0,0 +1 @@
+/target