aboutsummaryrefslogtreecommitdiff
path: root/controller-cli/src/main.rs
blob: bf058b7ecf0d5d0cdb0ad320b3d5874f55d22321 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use std::io::{self, Write};
use dd_controller::ComputePool;
use dd_utils::idgen::IdGen;

fn prompt(prefix: &str) -> io::Result<Option<String>> {
    print!("{}> ", prefix);
    io::stdout().flush()?;
    let mut line = String::new();
    let n = io::stdin().read_line(&mut line)?;
    if n == 0 {
        Ok(None)
    } else {
        line.pop();  // remove the newline
        Ok(Some(line))
    }
}

fn main() -> io::Result<()> {
    let mut pool = ComputePool::new(12345)?;

    // for _ in 0..10 {
    //     println!("Current parallelism: {}", pool.current_job_parallelism()?);
    //     std::thread::sleep(std::time::Duration::from_secs(2));
    // }

    let mut job_idgen = IdGen::new(1);

    loop {
        let num_workers = match pool.current_job_parallelism() {
            Ok(num) => num,
            Err(e) => {
                eprintln!("ERROR getting number of workers: {}", e);
                break;
            }
        };

        let line = match prompt(&format!("[{}]", num_workers))? {
            Some(line) => line,
            None => break
        };

        let words = line.split_whitespace().collect::<Vec<_>>();
        if words.len() == 0 {
            continue;
        }

        match words[0] {
            "quit" | "exit" => {
                break;
            }

            "core" => {
                let filename = match prompt("file name")? { Some(s) => s, None => break };

                match std::fs::read(&filename) {
                    Ok(contents) => {
                        match pool.set_core("user-core".to_string(), contents) {
                            Ok(()) => println!("Core set."),
                            Err(e) => println!("Could not set core: {}", e),
                        }
                    }

                    Err(e) => {
                        println!("Cannot open file '{}': {}", filename, e);
                    }
                }
            }

            "job" => {
                let filename = match prompt("job input file name")? { Some(s) => s, None => break };

                match std::fs::read(&filename) {
                    Ok(contents) => {
                        let jobid = job_idgen.gen();
                        match pool.submit_job(jobid, contents) {
                            Ok(()) => println!("Submitted with id {}.", jobid),
                            Err(e) => println!("Could not submit: {}", e),
                        }
                    }

                    Err(e) => {
                        println!("Cannot open file '{}': {}", filename, e);
                    }
                }
            }

            "wait" => {
                match pool.next_completion_event() {
                    Ok(Some(event)) => {
                        println!("Result from job {}:", event.jobid);
                        match event.result {
                            Ok((retcode, output)) => {
                                println!("Return code {}, output = {}", retcode, String::from_utf8_lossy(&output));
                            }
                            Err(err) => {
                                println!("Worker returned error: {}", err);
                            }
                        }
                    }

                    Ok(None) => {
                        println!("No jobs running");
                    }

                    Err(err) => {
                        println!("ERROR getting events: {}", err);
                        break;
                    }
                }
            }

            cmd => {
                println!("Unknown command '{}'", cmd);
            }
        }
    }

    pool.close()?;
    Ok(())
}