diff options
author | Tom Smeding <tom.smeding@gmail.com> | 2020-03-27 22:47:57 +0100 |
---|---|---|
committer | Tom Smeding <tom.smeding@gmail.com> | 2020-03-27 22:47:57 +0100 |
commit | fd421e32780cad46782c16cd4e15947f295a08c7 (patch) | |
tree | 04632f49f7c8860dee4237a0afe8292a949bdc9e /worker |
Initial, untested version of controller and worker
Worker has been tested to a marginal extent, but the controller is
litereally untested.
Diffstat (limited to 'worker')
-rw-r--r-- | worker/Cargo.toml | 12 | ||||
-rw-r--r-- | worker/src/compute_core.rs | 80 | ||||
-rw-r--r-- | worker/src/connection.rs | 119 | ||||
-rw-r--r-- | worker/src/main.rs | 120 |
4 files changed, 331 insertions, 0 deletions
diff --git a/worker/Cargo.toml b/worker/Cargo.toml new file mode 100644 index 0000000..f9888c4 --- /dev/null +++ b/worker/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "dd-worker" +version = "0.1.0" +authors = ["Tom Smeding <tom.smeding@gmail.com>"] +edition = "2018" + +[dependencies] +libloading = "0.5" +static_assertions = "1.1" +net2 = "0.2" +tempfile = "3.1" +dd-utils = { path = "../utils" } diff --git a/worker/src/compute_core.rs b/worker/src/compute_core.rs new file mode 100644 index 0000000..aec2a9d --- /dev/null +++ b/worker/src/compute_core.rs @@ -0,0 +1,80 @@ +use std::io; +use std::path::Path; +use std::ffi::c_void; +use libloading::{Library, Symbol}; +use static_assertions::assert_eq_size; +use dd_utils::error::*; + +pub const CORE_API_VERSION: i32 = 1; + +pub struct ComputeCore { + #[used] + library: Library, + // init: unsafe extern "C" fn(i32) -> i32, + run_job: unsafe extern "C" fn(u64, *mut c_void, *mut u64, *mut *mut c_void) -> i32, + free_outdata: unsafe extern "C" fn(u64, *mut c_void), +} + +impl ComputeCore { + pub fn load<P: AsRef<Path>>(lib_path: P) -> io::Result<Self> { + let lib_path = lib_path.as_ref(); + let lib_path = lib_path.canonicalize().map_err(|e| { + format!("Library '{}' does not exist", lib_path.display()).perror(e) + })?; + + let library = libloading::Library::new(&lib_path).map_err(|e| { + format!("Cannot load library at '{}'", lib_path.display()).perror(e) + })?; + + macro_rules! load_symbol { + ($name:expr, $symname:expr, $typ:ty) => {{ + let sym: io::Result<Symbol<$typ>> = library.get($symname).map_err(|e| { + format!("Failed to load symbol '{}' from library '{}'", $name, lib_path.display()).perror(e) + }); + sym.map(|s| std::mem::transmute::<*mut c_void, $typ>(s.into_raw().into_raw())) + }} + } + + let init = unsafe { load_symbol!("worker_init", b"worker_init\0", unsafe extern "C" fn(i32) -> i32)? }; + let run_job = unsafe { load_symbol!("worker_run_job",b"worker_run_job\0", unsafe extern "C" fn(u64, *mut c_void, *mut u64, *mut *mut c_void) -> i32)? }; + let free_outdata = unsafe { load_symbol!("worker_free_outdata", b"worker_free_outdata\0", unsafe extern "C" fn(u64, *mut c_void))? }; + + let ret = unsafe { + init(CORE_API_VERSION) + }; + if ret != 0 { + return Err(format!("Library refused initialisation at version {}: '{}'", CORE_API_VERSION, lib_path.display()).ioerr()); + } + + Ok(ComputeCore { library, /*init,*/ run_job, free_outdata }) + } + + /// Will block on this thread, so if you don't want to block, spawn a new thread. + /// This does not syntactically need mutability, but since one can only run one job at a time + /// on a core, semantic mutability seems warranted. + pub fn run_job(&mut self, input_data: &mut [u8]) -> (i32, Vec<u8>) { + let mut outsize = 0u64; + let mut outdataptr: *mut c_void = std::ptr::null_mut(); + + assert_eq_size!(u64, usize); + + let ret = unsafe { + (self.run_job)(input_data.len() as u64, input_data.as_mut_ptr() as *mut c_void, &mut outsize, &mut outdataptr) + }; + + let mut result = (ret, Vec::new()); + + if outsize != 0 && outdataptr != std::ptr::null_mut() { + result.1.resize(outsize as usize, 0u8); + unsafe { + std::ptr::copy(outdataptr as *const u8, result.1.as_mut_ptr(), outsize as usize); + } + } + + unsafe { + (self.free_outdata)(outsize, outdataptr); + } + + result + } +} diff --git a/worker/src/connection.rs b/worker/src/connection.rs new file mode 100644 index 0000000..1222c92 --- /dev/null +++ b/worker/src/connection.rs @@ -0,0 +1,119 @@ +use std::io::{self, BufReader, Write}; +use std::net::TcpStream; +use net2::TcpStreamExt; +use dd_utils::read_ext::ReadExt; +use dd_utils::error::*; +use dd_utils::protocol::*; + +fn send_vectored<W: Write, V: AsRef<[u8]>>( + writer: &mut W, + typ: u8, + id: u64, + payload: &[V] +) -> io::Result<()> { + let mut header = [0u8; 17]; + header[0] = typ; + header[1..9].copy_from_slice(&id.to_le_bytes()); + let sumlen: usize = payload.iter().map(|v| v.as_ref().len()).sum(); + header[9..17].copy_from_slice(&sumlen.to_le_bytes()); + writer.write_all(&header)?; + for part in payload { + writer.write_all(part.as_ref())?; + } + Ok(()) +} + +pub struct Message { + pub id: u64, + pub body: MessageBody, +} + +fn interpret_message(raw: RawMessage) -> io::Result<Message> { + let mut reader: &[u8] = &raw.payload; + + macro_rules! too_short { () => { "Message payload too short!".ioerr() } } + + match raw.typ { + 1 => { + let version = reader.read_le_u32().ok_or(too_short!())?; + Ok(Message { id: raw.id, body: MessageBody::Version(version) }) + } + + 2 => { + let name = reader.read_pascal_string() + .map(|r| r.iores()) + .ok_or(too_short!())??; + let libfile = reader.read_pascal_blob() + .map(|b| b.to_vec()) + .ok_or(too_short!())?; + Ok(Message { id: raw.id, body: MessageBody::NewCore(name, libfile) }) + } + + 3 => { + let jobid = reader.read_le_u64().ok_or(too_short!())?; + let jobdata = reader.read_pascal_blob().map(|b| b.to_vec()).ok_or(too_short!())?; + Ok(Message { id: raw.id, body: MessageBody::Job(jobid, jobdata) }) + } + + _ => { + Err(format!("Unknown message type {}", raw.typ).ioerr()) + } + } +} + +pub struct Connection { + reader: BufReader<TcpStream>, +} + +impl Connection { + pub fn new(address: &str) -> io::Result<Self> { + let socket = TcpStream::connect(address)?; + + socket.set_keepalive_ms(Some(60000))?; + + Ok(Self { + reader: BufReader::new(socket), + }) + } + + pub fn close(self) {} + + pub fn receive(&mut self) -> io::Result<Option<Message>> { + RawMessage::receive(&mut self.reader) + .and_then(|opt| + opt.map(|rawmsg| interpret_message(rawmsg)).transpose() + ) + } + + pub fn reply(&mut self, msgid: u64, rmsg: Result<Reply, String>) -> io::Result<()> { + let socket = self.reader.get_mut(); + let mut payload: Vec<Vec<u8>> = Vec::new(); + + let response_type; + + match rmsg { + Ok(msg) => { + response_type = 1; + match msg { + Reply::Version(ok) => { + payload.push(vec![if ok { 1u8 } else { 0u8 }]); + } + + Reply::NewCore => {} + + Reply::Job(jobid, output) => { + payload.push(jobid.to_le_bytes().to_vec()); + payload.push(output); + } + } + } + + Err(err) => { + response_type = 0xff; + payload.push(err.into_bytes()); + } + } + + send_vectored(socket, response_type, msgid, &payload) + } +} diff --git a/worker/src/main.rs b/worker/src/main.rs new file mode 100644 index 0000000..0300278 --- /dev/null +++ b/worker/src/main.rs @@ -0,0 +1,120 @@ +use std::env; +use std::fs::File; +use std::io::{self, Write}; +use std::process; +use tempfile; +use dd_utils::protocol::*; +use crate::connection::Connection; +use crate::compute_core::ComputeCore; + +mod compute_core; +mod connection; + +#[repr(C)] +struct mandel_input { + ltx: f64, + lty: f64, + rbx: f64, + rby: f64, + width: usize, + height: usize, + maxiter: usize, +} + +fn aligned_buffer(num_bytes: usize, align_bits: usize, backend: &mut Vec<u8>) -> &mut [u8] { + assert!(align_bits > 0); + + backend.resize(num_bytes + (1 << align_bits) - 1, 0u8); + let begin_ptr = backend.as_ptr() as usize; + let aligned_ptr = ((begin_ptr - 1) & !((1 << align_bits) - 1)) + (1 << align_bits); + let offset = aligned_ptr - begin_ptr; + &mut backend[offset .. offset + num_bytes] +} + +fn main() -> io::Result<()> { + let args = env::args().collect::<Vec<_>>(); + if args.len() != 2 { + eprintln!("Usage: {} <controller ip:port>", args[0]); + process::exit(1); + } + + let controller_addr = &args[1]; + + let tempdir = tempfile::tempdir()?; + + let mut conn = Connection::new(controller_addr)?; + + let mut ccore: Option<ComputeCore> = None; + + loop { + let opt_msg = conn.receive()?; + let msg = match opt_msg { Some(msg) => msg, None => break }; + + match msg.body { + MessageBody::Version(version) => { + if version == 1 { + conn.reply(msg.id, Ok(Reply::Version(true)))?; + } else { + conn.reply(msg.id, Ok(Reply::Version(false)))?; + conn.close(); + break; + } + } + + MessageBody::NewCore(_name, libfile) => { + let path = tempdir.path().join("core.so"); + + let res = + File::open(&path) + .and_then(|mut f| { f.write_all(&libfile)?; Ok(f) }) + .and_then(|f| { + drop(f); + ComputeCore::load(&path) + }); + + match res { + Ok(new_ccore) => { + ccore = Some(new_ccore); + conn.reply(msg.id, Ok(Reply::NewCore))?; + } + Err(err) => { + conn.reply(msg.id, Err(format!("Could not load core: {}", err)))?; + } + } + } + + MessageBody::Job(_jobid, mut input) => { + if let Some(ccore) = &mut ccore { + let (ret, output) = ccore.run_job(&mut input); + conn.reply(msg.id, Ok(Reply::Job(ret, output)))?; + } else { + conn.reply(msg.id, Err("No core loaded".to_string()))?; + } + } + } + } + + // let mut core = compute_core::ComputeCore::load(&lib_fname)?; + + // let mut input = b"kaas\0".to_vec(); + // println!("Input: {:?} = {}", input, String::from_utf8_lossy(&input)); + // let (ret, output) = core.run_job(&mut input); + // println!("Exit code: {}", ret); + // println!("Output: {:?} = {}", output, String::from_utf8_lossy(&output)); + + // let input = mandel_input { + // ltx: -1.5, lty: 1.0, + // rbx: 1.0, rby: 1.0, + // width: 2000, height: 1600, + // maxiter: 1024 + // }; + // let mut input_bytes_backend = Vec::new(); + // let input_bytes = aligned_buffer(std::mem::size_of::<mandel_input>(), 3, &mut input_bytes_backend); + // unsafe { + // std::ptr::copy(&input as *const mandel_input as *const u8, input_bytes.as_mut_ptr(), std::mem::size_of::<mandel_input>()); + // } + // let (ret, output) = core.run_job(input_bytes); + // println!("ret = {}", ret); + + Ok(()) +} |