1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
use std::env;
use std::fs::File;
use std::io::{self, Write};
use std::process;
use tempfile;
use dd_utils::protocol::*;
use crate::connection::Connection;
use crate::compute_core::ComputeCore;
mod compute_core;
mod connection;
#[repr(C)]
struct mandel_input {
ltx: f64,
lty: f64,
rbx: f64,
rby: f64,
width: usize,
height: usize,
maxiter: usize,
}
fn aligned_buffer(num_bytes: usize, align_bits: usize, backend: &mut Vec<u8>) -> &mut [u8] {
assert!(align_bits > 0);
backend.resize(num_bytes + (1 << align_bits) - 1, 0u8);
let begin_ptr = backend.as_ptr() as usize;
let aligned_ptr = ((begin_ptr - 1) & !((1 << align_bits) - 1)) + (1 << align_bits);
let offset = aligned_ptr - begin_ptr;
&mut backend[offset .. offset + num_bytes]
}
fn main() -> io::Result<()> {
let args = env::args().collect::<Vec<_>>();
if args.len() != 2 {
eprintln!("Usage: {} <controller ip:port>", args[0]);
process::exit(1);
}
let controller_addr = &args[1];
let tempdir = tempfile::tempdir()?;
let mut conn = Connection::new(controller_addr)?;
let mut ccore: Option<ComputeCore> = None;
loop {
let opt_msg = conn.receive()?;
let msg = match opt_msg { Some(msg) => msg, None => break };
match msg.body {
MessageBody::Version(version) => {
if version == 1 {
conn.reply(msg.id, Ok(Reply::Version(true)))?;
} else {
conn.reply(msg.id, Ok(Reply::Version(false)))?;
conn.close();
break;
}
}
MessageBody::NewCore(_name, libfile) => {
let path = tempdir.path().join("core.so");
let res =
File::open(&path)
.and_then(|mut f| { f.write_all(&libfile)?; Ok(f) })
.and_then(|f| {
drop(f);
ComputeCore::load(&path)
});
match res {
Ok(new_ccore) => {
ccore = Some(new_ccore);
conn.reply(msg.id, Ok(Reply::NewCore))?;
}
Err(err) => {
conn.reply(msg.id, Err(format!("Could not load core: {}", err)))?;
}
}
}
MessageBody::Job(_jobid, mut input) => {
if let Some(ccore) = &mut ccore {
let (ret, output) = ccore.run_job(&mut input);
conn.reply(msg.id, Ok(Reply::Job(ret, output)))?;
} else {
conn.reply(msg.id, Err("No core loaded".to_string()))?;
}
}
}
}
// let mut core = compute_core::ComputeCore::load(&lib_fname)?;
// let mut input = b"kaas\0".to_vec();
// println!("Input: {:?} = {}", input, String::from_utf8_lossy(&input));
// let (ret, output) = core.run_job(&mut input);
// println!("Exit code: {}", ret);
// println!("Output: {:?} = {}", output, String::from_utf8_lossy(&output));
// let input = mandel_input {
// ltx: -1.5, lty: 1.0,
// rbx: 1.0, rby: 1.0,
// width: 2000, height: 1600,
// maxiter: 1024
// };
// let mut input_bytes_backend = Vec::new();
// let input_bytes = aligned_buffer(std::mem::size_of::<mandel_input>(), 3, &mut input_bytes_backend);
// unsafe {
// std::ptr::copy(&input as *const mandel_input as *const u8, input_bytes.as_mut_ptr(), std::mem::size_of::<mandel_input>());
// }
// let (ret, output) = core.run_job(input_bytes);
// println!("ret = {}", ret);
Ok(())
}
|