aboutsummaryrefslogtreecommitdiff
path: root/controller-cli/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'controller-cli/src/main.rs')
-rw-r--r--controller-cli/src/main.rs120
1 files changed, 120 insertions, 0 deletions
diff --git a/controller-cli/src/main.rs b/controller-cli/src/main.rs
new file mode 100644
index 0000000..bf058b7
--- /dev/null
+++ b/controller-cli/src/main.rs
@@ -0,0 +1,120 @@
+use std::io::{self, Write};
+use dd_controller::ComputePool;
+use dd_utils::idgen::IdGen;
+
+fn prompt(prefix: &str) -> io::Result<Option<String>> {
+ print!("{}> ", prefix);
+ io::stdout().flush()?;
+ let mut line = String::new();
+ let n = io::stdin().read_line(&mut line)?;
+ if n == 0 {
+ Ok(None)
+ } else {
+ line.pop(); // remove the newline
+ Ok(Some(line))
+ }
+}
+
+fn main() -> io::Result<()> {
+ let mut pool = ComputePool::new(12345)?;
+
+ // for _ in 0..10 {
+ // println!("Current parallelism: {}", pool.current_job_parallelism()?);
+ // std::thread::sleep(std::time::Duration::from_secs(2));
+ // }
+
+ let mut job_idgen = IdGen::new(1);
+
+ loop {
+ let num_workers = match pool.current_job_parallelism() {
+ Ok(num) => num,
+ Err(e) => {
+ eprintln!("ERROR getting number of workers: {}", e);
+ break;
+ }
+ };
+
+ let line = match prompt(&format!("[{}]", num_workers))? {
+ Some(line) => line,
+ None => break
+ };
+
+ let words = line.split_whitespace().collect::<Vec<_>>();
+ if words.len() == 0 {
+ continue;
+ }
+
+ match words[0] {
+ "quit" | "exit" => {
+ break;
+ }
+
+ "core" => {
+ let filename = match prompt("file name")? { Some(s) => s, None => break };
+
+ match std::fs::read(&filename) {
+ Ok(contents) => {
+ match pool.set_core("user-core".to_string(), contents) {
+ Ok(()) => println!("Core set."),
+ Err(e) => println!("Could not set core: {}", e),
+ }
+ }
+
+ Err(e) => {
+ println!("Cannot open file '{}': {}", filename, e);
+ }
+ }
+ }
+
+ "job" => {
+ let filename = match prompt("job input file name")? { Some(s) => s, None => break };
+
+ match std::fs::read(&filename) {
+ Ok(contents) => {
+ let jobid = job_idgen.gen();
+ match pool.submit_job(jobid, contents) {
+ Ok(()) => println!("Submitted with id {}.", jobid),
+ Err(e) => println!("Could not submit: {}", e),
+ }
+ }
+
+ Err(e) => {
+ println!("Cannot open file '{}': {}", filename, e);
+ }
+ }
+ }
+
+ "wait" => {
+ match pool.next_completion_event() {
+ Ok(Some(event)) => {
+ println!("Result from job {}:", event.jobid);
+ match event.result {
+ Ok((retcode, output)) => {
+ println!("Return code {}, output = {}", retcode, String::from_utf8_lossy(&output));
+ }
+ Err(err) => {
+ println!("Worker returned error: {}", err);
+ }
+ }
+ }
+
+ Ok(None) => {
+ println!("No jobs running");
+ }
+
+ Err(err) => {
+ println!("ERROR getting events: {}", err);
+ break;
+ }
+ }
+ }
+
+ cmd => {
+ println!("Unknown command '{}'", cmd);
+ }
+ }
+ }
+
+ pool.close()?;
+ Ok(())
+}