aboutsummaryrefslogtreecommitdiff
path: root/worker
diff options
context:
space:
mode:
authorTom Smeding <tom.smeding@gmail.com>2020-03-27 22:47:57 +0100
committerTom Smeding <tom.smeding@gmail.com>2020-03-27 22:47:57 +0100
commitfd421e32780cad46782c16cd4e15947f295a08c7 (patch)
tree04632f49f7c8860dee4237a0afe8292a949bdc9e /worker
Initial, untested version of controller and worker
Worker has been tested to a marginal extent, but the controller is litereally untested.
Diffstat (limited to 'worker')
-rw-r--r--worker/Cargo.toml12
-rw-r--r--worker/src/compute_core.rs80
-rw-r--r--worker/src/connection.rs119
-rw-r--r--worker/src/main.rs120
4 files changed, 331 insertions, 0 deletions
diff --git a/worker/Cargo.toml b/worker/Cargo.toml
new file mode 100644
index 0000000..f9888c4
--- /dev/null
+++ b/worker/Cargo.toml
@@ -0,0 +1,12 @@
+[package]
+name = "dd-worker"
+version = "0.1.0"
+authors = ["Tom Smeding <tom.smeding@gmail.com>"]
+edition = "2018"
+
+[dependencies]
+libloading = "0.5"
+static_assertions = "1.1"
+net2 = "0.2"
+tempfile = "3.1"
+dd-utils = { path = "../utils" }
diff --git a/worker/src/compute_core.rs b/worker/src/compute_core.rs
new file mode 100644
index 0000000..aec2a9d
--- /dev/null
+++ b/worker/src/compute_core.rs
@@ -0,0 +1,80 @@
+use std::io;
+use std::path::Path;
+use std::ffi::c_void;
+use libloading::{Library, Symbol};
+use static_assertions::assert_eq_size;
+use dd_utils::error::*;
+
+pub const CORE_API_VERSION: i32 = 1;
+
+pub struct ComputeCore {
+ #[used]
+ library: Library,
+ // init: unsafe extern "C" fn(i32) -> i32,
+ run_job: unsafe extern "C" fn(u64, *mut c_void, *mut u64, *mut *mut c_void) -> i32,
+ free_outdata: unsafe extern "C" fn(u64, *mut c_void),
+}
+
+impl ComputeCore {
+ pub fn load<P: AsRef<Path>>(lib_path: P) -> io::Result<Self> {
+ let lib_path = lib_path.as_ref();
+ let lib_path = lib_path.canonicalize().map_err(|e| {
+ format!("Library '{}' does not exist", lib_path.display()).perror(e)
+ })?;
+
+ let library = libloading::Library::new(&lib_path).map_err(|e| {
+ format!("Cannot load library at '{}'", lib_path.display()).perror(e)
+ })?;
+
+ macro_rules! load_symbol {
+ ($name:expr, $symname:expr, $typ:ty) => {{
+ let sym: io::Result<Symbol<$typ>> = library.get($symname).map_err(|e| {
+ format!("Failed to load symbol '{}' from library '{}'", $name, lib_path.display()).perror(e)
+ });
+ sym.map(|s| std::mem::transmute::<*mut c_void, $typ>(s.into_raw().into_raw()))
+ }}
+ }
+
+ let init = unsafe { load_symbol!("worker_init", b"worker_init\0", unsafe extern "C" fn(i32) -> i32)? };
+ let run_job = unsafe { load_symbol!("worker_run_job",b"worker_run_job\0", unsafe extern "C" fn(u64, *mut c_void, *mut u64, *mut *mut c_void) -> i32)? };
+ let free_outdata = unsafe { load_symbol!("worker_free_outdata", b"worker_free_outdata\0", unsafe extern "C" fn(u64, *mut c_void))? };
+
+ let ret = unsafe {
+ init(CORE_API_VERSION)
+ };
+ if ret != 0 {
+ return Err(format!("Library refused initialisation at version {}: '{}'", CORE_API_VERSION, lib_path.display()).ioerr());
+ }
+
+ Ok(ComputeCore { library, /*init,*/ run_job, free_outdata })
+ }
+
+ /// Will block on this thread, so if you don't want to block, spawn a new thread.
+ /// This does not syntactically need mutability, but since one can only run one job at a time
+ /// on a core, semantic mutability seems warranted.
+ pub fn run_job(&mut self, input_data: &mut [u8]) -> (i32, Vec<u8>) {
+ let mut outsize = 0u64;
+ let mut outdataptr: *mut c_void = std::ptr::null_mut();
+
+ assert_eq_size!(u64, usize);
+
+ let ret = unsafe {
+ (self.run_job)(input_data.len() as u64, input_data.as_mut_ptr() as *mut c_void, &mut outsize, &mut outdataptr)
+ };
+
+ let mut result = (ret, Vec::new());
+
+ if outsize != 0 && outdataptr != std::ptr::null_mut() {
+ result.1.resize(outsize as usize, 0u8);
+ unsafe {
+ std::ptr::copy(outdataptr as *const u8, result.1.as_mut_ptr(), outsize as usize);
+ }
+ }
+
+ unsafe {
+ (self.free_outdata)(outsize, outdataptr);
+ }
+
+ result
+ }
+}
diff --git a/worker/src/connection.rs b/worker/src/connection.rs
new file mode 100644
index 0000000..1222c92
--- /dev/null
+++ b/worker/src/connection.rs
@@ -0,0 +1,119 @@
+use std::io::{self, BufReader, Write};
+use std::net::TcpStream;
+use net2::TcpStreamExt;
+use dd_utils::read_ext::ReadExt;
+use dd_utils::error::*;
+use dd_utils::protocol::*;
+
+fn send_vectored<W: Write, V: AsRef<[u8]>>(
+ writer: &mut W,
+ typ: u8,
+ id: u64,
+ payload: &[V]
+) -> io::Result<()> {
+ let mut header = [0u8; 17];
+ header[0] = typ;
+ header[1..9].copy_from_slice(&id.to_le_bytes());
+ let sumlen: usize = payload.iter().map(|v| v.as_ref().len()).sum();
+ header[9..17].copy_from_slice(&sumlen.to_le_bytes());
+ writer.write_all(&header)?;
+ for part in payload {
+ writer.write_all(part.as_ref())?;
+ }
+ Ok(())
+}
+
+pub struct Message {
+ pub id: u64,
+ pub body: MessageBody,
+}
+
+fn interpret_message(raw: RawMessage) -> io::Result<Message> {
+ let mut reader: &[u8] = &raw.payload;
+
+ macro_rules! too_short { () => { "Message payload too short!".ioerr() } }
+
+ match raw.typ {
+ 1 => {
+ let version = reader.read_le_u32().ok_or(too_short!())?;
+ Ok(Message { id: raw.id, body: MessageBody::Version(version) })
+ }
+
+ 2 => {
+ let name = reader.read_pascal_string()
+ .map(|r| r.iores())
+ .ok_or(too_short!())??;
+ let libfile = reader.read_pascal_blob()
+ .map(|b| b.to_vec())
+ .ok_or(too_short!())?;
+ Ok(Message { id: raw.id, body: MessageBody::NewCore(name, libfile) })
+ }
+
+ 3 => {
+ let jobid = reader.read_le_u64().ok_or(too_short!())?;
+ let jobdata = reader.read_pascal_blob().map(|b| b.to_vec()).ok_or(too_short!())?;
+ Ok(Message { id: raw.id, body: MessageBody::Job(jobid, jobdata) })
+ }
+
+ _ => {
+ Err(format!("Unknown message type {}", raw.typ).ioerr())
+ }
+ }
+}
+
+pub struct Connection {
+ reader: BufReader<TcpStream>,
+}
+
+impl Connection {
+ pub fn new(address: &str) -> io::Result<Self> {
+ let socket = TcpStream::connect(address)?;
+
+ socket.set_keepalive_ms(Some(60000))?;
+
+ Ok(Self {
+ reader: BufReader::new(socket),
+ })
+ }
+
+ pub fn close(self) {}
+
+ pub fn receive(&mut self) -> io::Result<Option<Message>> {
+ RawMessage::receive(&mut self.reader)
+ .and_then(|opt|
+ opt.map(|rawmsg| interpret_message(rawmsg)).transpose()
+ )
+ }
+
+ pub fn reply(&mut self, msgid: u64, rmsg: Result<Reply, String>) -> io::Result<()> {
+ let socket = self.reader.get_mut();
+ let mut payload: Vec<Vec<u8>> = Vec::new();
+
+ let response_type;
+
+ match rmsg {
+ Ok(msg) => {
+ response_type = 1;
+ match msg {
+ Reply::Version(ok) => {
+ payload.push(vec![if ok { 1u8 } else { 0u8 }]);
+ }
+
+ Reply::NewCore => {}
+
+ Reply::Job(jobid, output) => {
+ payload.push(jobid.to_le_bytes().to_vec());
+ payload.push(output);
+ }
+ }
+ }
+
+ Err(err) => {
+ response_type = 0xff;
+ payload.push(err.into_bytes());
+ }
+ }
+
+ send_vectored(socket, response_type, msgid, &payload)
+ }
+}
diff --git a/worker/src/main.rs b/worker/src/main.rs
new file mode 100644
index 0000000..0300278
--- /dev/null
+++ b/worker/src/main.rs
@@ -0,0 +1,120 @@
+use std::env;
+use std::fs::File;
+use std::io::{self, Write};
+use std::process;
+use tempfile;
+use dd_utils::protocol::*;
+use crate::connection::Connection;
+use crate::compute_core::ComputeCore;
+
+mod compute_core;
+mod connection;
+
+#[repr(C)]
+struct mandel_input {
+ ltx: f64,
+ lty: f64,
+ rbx: f64,
+ rby: f64,
+ width: usize,
+ height: usize,
+ maxiter: usize,
+}
+
+fn aligned_buffer(num_bytes: usize, align_bits: usize, backend: &mut Vec<u8>) -> &mut [u8] {
+ assert!(align_bits > 0);
+
+ backend.resize(num_bytes + (1 << align_bits) - 1, 0u8);
+ let begin_ptr = backend.as_ptr() as usize;
+ let aligned_ptr = ((begin_ptr - 1) & !((1 << align_bits) - 1)) + (1 << align_bits);
+ let offset = aligned_ptr - begin_ptr;
+ &mut backend[offset .. offset + num_bytes]
+}
+
+fn main() -> io::Result<()> {
+ let args = env::args().collect::<Vec<_>>();
+ if args.len() != 2 {
+ eprintln!("Usage: {} <controller ip:port>", args[0]);
+ process::exit(1);
+ }
+
+ let controller_addr = &args[1];
+
+ let tempdir = tempfile::tempdir()?;
+
+ let mut conn = Connection::new(controller_addr)?;
+
+ let mut ccore: Option<ComputeCore> = None;
+
+ loop {
+ let opt_msg = conn.receive()?;
+ let msg = match opt_msg { Some(msg) => msg, None => break };
+
+ match msg.body {
+ MessageBody::Version(version) => {
+ if version == 1 {
+ conn.reply(msg.id, Ok(Reply::Version(true)))?;
+ } else {
+ conn.reply(msg.id, Ok(Reply::Version(false)))?;
+ conn.close();
+ break;
+ }
+ }
+
+ MessageBody::NewCore(_name, libfile) => {
+ let path = tempdir.path().join("core.so");
+
+ let res =
+ File::open(&path)
+ .and_then(|mut f| { f.write_all(&libfile)?; Ok(f) })
+ .and_then(|f| {
+ drop(f);
+ ComputeCore::load(&path)
+ });
+
+ match res {
+ Ok(new_ccore) => {
+ ccore = Some(new_ccore);
+ conn.reply(msg.id, Ok(Reply::NewCore))?;
+ }
+ Err(err) => {
+ conn.reply(msg.id, Err(format!("Could not load core: {}", err)))?;
+ }
+ }
+ }
+
+ MessageBody::Job(_jobid, mut input) => {
+ if let Some(ccore) = &mut ccore {
+ let (ret, output) = ccore.run_job(&mut input);
+ conn.reply(msg.id, Ok(Reply::Job(ret, output)))?;
+ } else {
+ conn.reply(msg.id, Err("No core loaded".to_string()))?;
+ }
+ }
+ }
+ }
+
+ // let mut core = compute_core::ComputeCore::load(&lib_fname)?;
+
+ // let mut input = b"kaas\0".to_vec();
+ // println!("Input: {:?} = {}", input, String::from_utf8_lossy(&input));
+ // let (ret, output) = core.run_job(&mut input);
+ // println!("Exit code: {}", ret);
+ // println!("Output: {:?} = {}", output, String::from_utf8_lossy(&output));
+
+ // let input = mandel_input {
+ // ltx: -1.5, lty: 1.0,
+ // rbx: 1.0, rby: 1.0,
+ // width: 2000, height: 1600,
+ // maxiter: 1024
+ // };
+ // let mut input_bytes_backend = Vec::new();
+ // let input_bytes = aligned_buffer(std::mem::size_of::<mandel_input>(), 3, &mut input_bytes_backend);
+ // unsafe {
+ // std::ptr::copy(&input as *const mandel_input as *const u8, input_bytes.as_mut_ptr(), std::mem::size_of::<mandel_input>());
+ // }
+ // let (ret, output) = core.run_job(input_bytes);
+ // println!("ret = {}", ret);
+
+ Ok(())
+}