diff options
Diffstat (limited to 'worker/src/main.rs')
-rw-r--r-- | worker/src/main.rs | 120 |
1 files changed, 120 insertions, 0 deletions
diff --git a/worker/src/main.rs b/worker/src/main.rs new file mode 100644 index 0000000..0300278 --- /dev/null +++ b/worker/src/main.rs @@ -0,0 +1,120 @@ +use std::env; +use std::fs::File; +use std::io::{self, Write}; +use std::process; +use tempfile; +use dd_utils::protocol::*; +use crate::connection::Connection; +use crate::compute_core::ComputeCore; + +mod compute_core; +mod connection; + +#[repr(C)] +struct mandel_input { + ltx: f64, + lty: f64, + rbx: f64, + rby: f64, + width: usize, + height: usize, + maxiter: usize, +} + +fn aligned_buffer(num_bytes: usize, align_bits: usize, backend: &mut Vec<u8>) -> &mut [u8] { + assert!(align_bits > 0); + + backend.resize(num_bytes + (1 << align_bits) - 1, 0u8); + let begin_ptr = backend.as_ptr() as usize; + let aligned_ptr = ((begin_ptr - 1) & !((1 << align_bits) - 1)) + (1 << align_bits); + let offset = aligned_ptr - begin_ptr; + &mut backend[offset .. offset + num_bytes] +} + +fn main() -> io::Result<()> { + let args = env::args().collect::<Vec<_>>(); + if args.len() != 2 { + eprintln!("Usage: {} <controller ip:port>", args[0]); + process::exit(1); + } + + let controller_addr = &args[1]; + + let tempdir = tempfile::tempdir()?; + + let mut conn = Connection::new(controller_addr)?; + + let mut ccore: Option<ComputeCore> = None; + + loop { + let opt_msg = conn.receive()?; + let msg = match opt_msg { Some(msg) => msg, None => break }; + + match msg.body { + MessageBody::Version(version) => { + if version == 1 { + conn.reply(msg.id, Ok(Reply::Version(true)))?; + } else { + conn.reply(msg.id, Ok(Reply::Version(false)))?; + conn.close(); + break; + } + } + + MessageBody::NewCore(_name, libfile) => { + let path = tempdir.path().join("core.so"); + + let res = + File::open(&path) + .and_then(|mut f| { f.write_all(&libfile)?; Ok(f) }) + .and_then(|f| { + drop(f); + ComputeCore::load(&path) + }); + + match res { + Ok(new_ccore) => { + ccore = Some(new_ccore); + conn.reply(msg.id, Ok(Reply::NewCore))?; + } + Err(err) => { + conn.reply(msg.id, Err(format!("Could not load core: {}", err)))?; + } + } + } + + MessageBody::Job(_jobid, mut input) => { + if let Some(ccore) = &mut ccore { + let (ret, output) = ccore.run_job(&mut input); + conn.reply(msg.id, Ok(Reply::Job(ret, output)))?; + } else { + conn.reply(msg.id, Err("No core loaded".to_string()))?; + } + } + } + } + + // let mut core = compute_core::ComputeCore::load(&lib_fname)?; + + // let mut input = b"kaas\0".to_vec(); + // println!("Input: {:?} = {}", input, String::from_utf8_lossy(&input)); + // let (ret, output) = core.run_job(&mut input); + // println!("Exit code: {}", ret); + // println!("Output: {:?} = {}", output, String::from_utf8_lossy(&output)); + + // let input = mandel_input { + // ltx: -1.5, lty: 1.0, + // rbx: 1.0, rby: 1.0, + // width: 2000, height: 1600, + // maxiter: 1024 + // }; + // let mut input_bytes_backend = Vec::new(); + // let input_bytes = aligned_buffer(std::mem::size_of::<mandel_input>(), 3, &mut input_bytes_backend); + // unsafe { + // std::ptr::copy(&input as *const mandel_input as *const u8, input_bytes.as_mut_ptr(), std::mem::size_of::<mandel_input>()); + // } + // let (ret, output) = core.run_job(input_bytes); + // println!("ret = {}", ret); + + Ok(()) +} |