aboutsummaryrefslogtreecommitdiff
path: root/worker/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'worker/src/main.rs')
-rw-r--r--worker/src/main.rs120
1 files changed, 120 insertions, 0 deletions
diff --git a/worker/src/main.rs b/worker/src/main.rs
new file mode 100644
index 0000000..0300278
--- /dev/null
+++ b/worker/src/main.rs
@@ -0,0 +1,120 @@
+use std::env;
+use std::fs::File;
+use std::io::{self, Write};
+use std::process;
+use tempfile;
+use dd_utils::protocol::*;
+use crate::connection::Connection;
+use crate::compute_core::ComputeCore;
+
+mod compute_core;
+mod connection;
+
+#[repr(C)]
+struct mandel_input {
+ ltx: f64,
+ lty: f64,
+ rbx: f64,
+ rby: f64,
+ width: usize,
+ height: usize,
+ maxiter: usize,
+}
+
+fn aligned_buffer(num_bytes: usize, align_bits: usize, backend: &mut Vec<u8>) -> &mut [u8] {
+ assert!(align_bits > 0);
+
+ backend.resize(num_bytes + (1 << align_bits) - 1, 0u8);
+ let begin_ptr = backend.as_ptr() as usize;
+ let aligned_ptr = ((begin_ptr - 1) & !((1 << align_bits) - 1)) + (1 << align_bits);
+ let offset = aligned_ptr - begin_ptr;
+ &mut backend[offset .. offset + num_bytes]
+}
+
+fn main() -> io::Result<()> {
+ let args = env::args().collect::<Vec<_>>();
+ if args.len() != 2 {
+ eprintln!("Usage: {} <controller ip:port>", args[0]);
+ process::exit(1);
+ }
+
+ let controller_addr = &args[1];
+
+ let tempdir = tempfile::tempdir()?;
+
+ let mut conn = Connection::new(controller_addr)?;
+
+ let mut ccore: Option<ComputeCore> = None;
+
+ loop {
+ let opt_msg = conn.receive()?;
+ let msg = match opt_msg { Some(msg) => msg, None => break };
+
+ match msg.body {
+ MessageBody::Version(version) => {
+ if version == 1 {
+ conn.reply(msg.id, Ok(Reply::Version(true)))?;
+ } else {
+ conn.reply(msg.id, Ok(Reply::Version(false)))?;
+ conn.close();
+ break;
+ }
+ }
+
+ MessageBody::NewCore(_name, libfile) => {
+ let path = tempdir.path().join("core.so");
+
+ let res =
+ File::open(&path)
+ .and_then(|mut f| { f.write_all(&libfile)?; Ok(f) })
+ .and_then(|f| {
+ drop(f);
+ ComputeCore::load(&path)
+ });
+
+ match res {
+ Ok(new_ccore) => {
+ ccore = Some(new_ccore);
+ conn.reply(msg.id, Ok(Reply::NewCore))?;
+ }
+ Err(err) => {
+ conn.reply(msg.id, Err(format!("Could not load core: {}", err)))?;
+ }
+ }
+ }
+
+ MessageBody::Job(_jobid, mut input) => {
+ if let Some(ccore) = &mut ccore {
+ let (ret, output) = ccore.run_job(&mut input);
+ conn.reply(msg.id, Ok(Reply::Job(ret, output)))?;
+ } else {
+ conn.reply(msg.id, Err("No core loaded".to_string()))?;
+ }
+ }
+ }
+ }
+
+ // let mut core = compute_core::ComputeCore::load(&lib_fname)?;
+
+ // let mut input = b"kaas\0".to_vec();
+ // println!("Input: {:?} = {}", input, String::from_utf8_lossy(&input));
+ // let (ret, output) = core.run_job(&mut input);
+ // println!("Exit code: {}", ret);
+ // println!("Output: {:?} = {}", output, String::from_utf8_lossy(&output));
+
+ // let input = mandel_input {
+ // ltx: -1.5, lty: 1.0,
+ // rbx: 1.0, rby: 1.0,
+ // width: 2000, height: 1600,
+ // maxiter: 1024
+ // };
+ // let mut input_bytes_backend = Vec::new();
+ // let input_bytes = aligned_buffer(std::mem::size_of::<mandel_input>(), 3, &mut input_bytes_backend);
+ // unsafe {
+ // std::ptr::copy(&input as *const mandel_input as *const u8, input_bytes.as_mut_ptr(), std::mem::size_of::<mandel_input>());
+ // }
+ // let (ret, output) = core.run_job(input_bytes);
+ // println!("ret = {}", ret);
+
+ Ok(())
+}