diff options
author | Tom Smeding <tom.smeding@gmail.com> | 2020-03-27 22:47:57 +0100 |
---|---|---|
committer | Tom Smeding <tom.smeding@gmail.com> | 2020-03-27 22:47:57 +0100 |
commit | fd421e32780cad46782c16cd4e15947f295a08c7 (patch) | |
tree | 04632f49f7c8860dee4237a0afe8292a949bdc9e |
Initial, untested version of controller and worker
Worker has been tested to a marginal extent, but the controller is
litereally untested.
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | Cargo.lock | 479 | ||||
-rw-r--r-- | Cargo.toml | 3 | ||||
-rw-r--r-- | README.md | 92 | ||||
-rw-r--r-- | controller/Cargo.toml | 9 | ||||
-rw-r--r-- | controller/src/lib.rs | 492 | ||||
-rw-r--r-- | examples/.gitignore | 1 | ||||
-rw-r--r-- | examples/kaas/Makefile | 9 | ||||
-rw-r--r-- | examples/kaas/kaas.c | 32 | ||||
-rw-r--r-- | examples/mandel/Makefile | 9 | ||||
-rw-r--r-- | examples/mandel/mandel.c | 57 | ||||
-rw-r--r-- | utils/Cargo.toml | 7 | ||||
-rw-r--r-- | utils/src/enums.rs | 11 | ||||
-rw-r--r-- | utils/src/error.rs | 27 | ||||
-rw-r--r-- | utils/src/idgen.rs | 16 | ||||
-rw-r--r-- | utils/src/lib.rs | 4 | ||||
-rw-r--r-- | utils/src/protocol.rs | 44 | ||||
-rw-r--r-- | utils/src/read_ext.rs | 38 | ||||
-rw-r--r-- | worker/Cargo.toml | 12 | ||||
-rw-r--r-- | worker/src/compute_core.rs | 80 | ||||
-rw-r--r-- | worker/src/connection.rs | 119 | ||||
-rw-r--r-- | worker/src/main.rs | 120 |
22 files changed, 1662 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..e34ba49 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,479 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +[[package]] +name = "arc-swap" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d663a8e9a99154b5fb793032533f6328da35e23aac63d5c152279aa8ba356825" + +[[package]] +name = "bitflags" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" + +[[package]] +name = "bytes" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1" + +[[package]] +name = "cc" +version = "1.0.50" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95e28fa049fda1c330bcf9d723be7663a899c4679724b34c81e9f5a326aab8cd" + +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + +[[package]] +name = "dd-controller" +version = "0.1.0" +dependencies = [ + "dd-utils", + "tokio", +] + +[[package]] +name = "dd-utils" +version = "0.1.0" + +[[package]] +name = "dd-worker" +version = "0.1.0" +dependencies = [ + "dd-utils", + "libloading", + "net2", + "static_assertions", + "tempfile", +] + +[[package]] +name = "fnv" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" + +[[package]] +name = "fuchsia-zircon" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" +dependencies = [ + "bitflags", + "fuchsia-zircon-sys", +] + +[[package]] +name = "fuchsia-zircon-sys" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" + +[[package]] +name = "futures-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f25592f769825e89b92358db00d26f965761e094951ac44d3663ef25b7ac464a" + +[[package]] +name = "getrandom" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "hermit-abi" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1010591b26bbfe835e9faeabeb11866061cc7dcebffd56ad7d0942d0e61aefd8" +dependencies = [ + "libc", +] + +[[package]] +name = "iovec" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" +dependencies = [ + "libc", +] + +[[package]] +name = "kernel32-sys" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" +dependencies = [ + "winapi 0.2.8", + "winapi-build", +] + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dea0c0405123bba743ee3f91f49b1c7cfb684eef0da0a50110f758ccf24cdff0" + +[[package]] +name = "libloading" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b111a074963af1d37a139918ac6d49ad1d0d5e47f72fd55388619691a7d753" +dependencies = [ + "cc", + "winapi 0.3.8", +] + +[[package]] +name = "log" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "memchr" +version = "2.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400" + +[[package]] +name = "mio" +version = "0.6.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "302dec22bcf6bae6dfb69c647187f4b4d0fb6f535521f7bc022430ce8e12008f" +dependencies = [ + "cfg-if", + "fuchsia-zircon", + "fuchsia-zircon-sys", + "iovec", + "kernel32-sys", + "libc", + "log", + "miow 0.2.1", + "net2", + "slab", + "winapi 0.2.8", +] + +[[package]] +name = "mio-named-pipes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5e374eff525ce1c5b7687c4cef63943e7686524a387933ad27ca7ec43779cb3" +dependencies = [ + "log", + "mio", + "miow 0.3.3", + "winapi 0.3.8", +] + +[[package]] +name = "mio-uds" +version = "0.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "966257a94e196b11bb43aca423754d87429960a768de9414f3691d6957abf125" +dependencies = [ + "iovec", + "libc", + "mio", +] + +[[package]] +name = "miow" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" +dependencies = [ + "kernel32-sys", + "net2", + "winapi 0.2.8", + "ws2_32-sys", +] + +[[package]] +name = "miow" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "396aa0f2003d7df8395cb93e09871561ccc3e785f0acb369170e8cc74ddf9226" +dependencies = [ + "socket2", + "winapi 0.3.8", +] + +[[package]] +name = "net2" +version = "0.2.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42550d9fb7b6684a6d404d9fa7250c2eb2646df731d1c06afc06dcee9e1bcf88" +dependencies = [ + "cfg-if", + "libc", + "winapi 0.3.8", +] + +[[package]] +name = "num_cpus" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46203554f085ff89c235cd12f7075f3233af9b11ed7c9e16dfe2560d03313ce6" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "pin-project-lite" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "237844750cfbb86f67afe27eee600dfbbcb6188d734139b534cbfbf4f96792ae" + +[[package]] +name = "ppv-lite86" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74490b50b9fbe561ac330df47c08f3f33073d2d00c150f719147d7c54522fa1b" + +[[package]] +name = "proc-macro2" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c09721c6781493a2a492a96b5a5bf19b65917fe6728884e7c44dd0c60ca3435" +dependencies = [ + "unicode-xid", +] + +[[package]] +name = "quote" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bdc6c187c65bca4260c9011c9e3132efe4909da44726bad24cf7572ae338d7f" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rand" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" +dependencies = [ + "getrandom", + "libc", + "rand_chacha", + "rand_core", + "rand_hc", +] + +[[package]] +name = "rand_chacha" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" +dependencies = [ + "getrandom", +] + +[[package]] +name = "rand_hc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" +dependencies = [ + "rand_core", +] + +[[package]] +name = "redox_syscall" +version = "0.1.56" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84" + +[[package]] +name = "remove_dir_all" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a83fa3702a688b9359eccba92d153ac33fd2e8462f9e0e3fdf155239ea7792e" +dependencies = [ + "winapi 0.3.8", +] + +[[package]] +name = "signal-hook-registry" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94f478ede9f64724c5d173d7bb56099ec3e2d9fc2774aac65d34b8b890405f41" +dependencies = [ + "arc-swap", + "libc", +] + +[[package]] +name = "slab" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" + +[[package]] +name = "socket2" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8b74de517221a2cb01a53349cf54182acdc31a074727d3079068448c0676d85" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "winapi 0.3.8", +] + +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + +[[package]] +name = "syn" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0df0eb663f387145cab623dea85b09c2c5b4b0aef44e945d928e682fce71bb03" +dependencies = [ + "proc-macro2", + "quote", + "unicode-xid", +] + +[[package]] +name = "tempfile" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9" +dependencies = [ + "cfg-if", + "libc", + "rand", + "redox_syscall", + "remove_dir_all", + "winapi 0.3.8", +] + +[[package]] +name = "tokio" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fa5e81d6bc4e67fe889d5783bd2a128ab2e0cfa487e0be16b6a8d177b101616" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "iovec", + "lazy_static", + "libc", + "memchr", + "mio", + "mio-named-pipes", + "mio-uds", + "num_cpus", + "pin-project-lite", + "signal-hook-registry", + "slab", + "tokio-macros", + "winapi 0.3.8", +] + +[[package]] +name = "tokio-macros" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-xid" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" + +[[package]] +name = "wasi" +version = "0.9.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" + +[[package]] +name = "winapi" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" + +[[package]] +name = "winapi" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8093091eeb260906a183e6ae1abdba2ef5ef2257a21801128899c3fc699229c6" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-build" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "ws2_32-sys" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" +dependencies = [ + "winapi 0.2.8", + "winapi-build", +] diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..f6a5141 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,3 @@ +[workspace] + +members = ["worker", "controller", "utils"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..256ecd2 --- /dev/null +++ b/README.md @@ -0,0 +1,92 @@ +# Distributed computing + +A framework for distributed computing -- that is, running jobs across multiple +machines. + +## Compute core API version 1 + +Functions that should be exposed, without mangling, by the compute core `.so` +library: + +- `int32_t worker_init(int32_t version)` + - Will be called when initialising the library. The integer argument is the + version of this API specification; the library should check that it is + equal to the expected value. + + Should return 0 on successful initialisation, or nonzero if an error + occurred. + +- `int32_t worker_run_job(uint64_t size, void *data, uint64_t *outsize, void **outdata)` + - Run a job. The job data is specified in the data blob pointed to by `data`, + which is `size` bytes in size. The worker is allowed to modify the memory + behind `data` during execution of this function, but the memory will be + deallocated as soon as `worker_run_job` returns. + + A pointer to memory containing the computed results should be stored in + `outdata`, and its size in `outsize`. When the memory allocated for the + output data may be freed, `worker_free_outdata` will be called. If there + is no data to return, for example because an error occurred, store 0 in + `outsize` and a null pointer in `outdata`. + + Should return 0 on successful execution, or nonzero if an error occurred. + +- `void worker_free_outdata(uint64_t size, void *outdata)` + - Free memory allocated for the output data of a job. This is called when the + memory for the output data of the last job is no longer needed, and will + always be called before the next job is started. + +Note that there is no function called before unloading the library. If you need +such a thing, please use [destructors][1], or unload necessary things in +`worker_free_outdata`. + +## Worker socket protocol version 1 + +All integers in the below description are little-endian. + +Common data types used in the message descriptions below: +- **String**/**Blob**: 8-byte unsigned integer indicating the length of + the data, then that many bytes making up the string or blob. A string is + valid UTF-8, while a blob can contain arbitrary data. + +A **message from controller to worker** has the following format: +- Message type [1 byte] +- ID [8 bytes] +- Payload length [8-byte unsigned integer] +- Payload [variable length and contents] + +A **response from worker to controller** has the following format: +- Response type [1 byte] +- ID of message replied to [8 bytes] +- Payload length [8-byte unsigned integer] +- Payload [variable length and contents] + +The possible **response types** are the following: +- `0x01`: Successful response to a message, as described in the table of + message types. +- `0xff`: An error response; something went wrong. The entire payload is an + UTF-8 error message. + +The possible **message types** are the following: + +- `0x01`: Version exchange + - Payload: 4-byte unsigned integer, the protocol version of the + server. In this version, this is 1. + - Successful response: 1 byte, 1 if the version is accepted by the worker, 0 + if not. If the version is not accepted, the connection is closed by both + sides. + +- `0x02`: New compute core + - Payload: A string giving the name of the compute core, then a blob giving + the contents of a dynamic library file that can be loaded at runtime, e.g. + a `.so` file. This library will be loaded as the compute core for the + worker. + - Successful response: Empty. + +- `0x03`: New job + - Payload: An 8-byte unsigned integer giving the ID of the job, then a blob + giving the input data for the compute core. + - Successful response: A 4-byte signed integer giving the exit code of the + job as returned by the compute core, then a blob giving the output data. + + +[1]: https://gcc.gnu.org/onlinedocs/gcc/Common-Function-Attributes.html#index-destructor-function-attribute diff --git a/controller/Cargo.toml b/controller/Cargo.toml new file mode 100644 index 0000000..319e1b7 --- /dev/null +++ b/controller/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "dd-controller" +version = "0.1.0" +authors = ["Tom Smeding <tom.smeding@gmail.com>"] +edition = "2018" + +[dependencies] +tokio = { version = "0.2", features = ["full"] } +dd-utils = { path = "../utils" } diff --git a/controller/src/lib.rs b/controller/src/lib.rs new file mode 100644 index 0000000..ee02b3b --- /dev/null +++ b/controller/src/lib.rs @@ -0,0 +1,492 @@ +use std::collections::{HashMap, VecDeque}; +use std::convert::TryInto; +use std::io::{self, ErrorKind}; +use std::net::Shutdown; +use std::sync::Arc; +use std::thread; +use tokio::io::{AsyncReadExt, AsyncWriteExt, WriteHalf}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::runtime; +use tokio::task; +use tokio::sync::{mpsc, oneshot, Mutex}; +use dd_utils::error::*; +use dd_utils::idgen::IdGen; +use dd_utils::protocol::*; +use dd_utils::read_ext::ReadExt; + +async fn send_vectored( + writer: &mut (impl AsyncWriteExt + Unpin), + typ: u8, + id: u64, + payload: &[impl AsRef<[u8]>] +) -> io::Result<()> { + let mut header = [0u8; 17]; + header[0] = typ; + header[1..9].copy_from_slice(&id.to_le_bytes()); + let sumlen: usize = payload.iter().map(|v| v.as_ref().len()).sum(); + header[9..17].copy_from_slice(&sumlen.to_le_bytes()); + writer.write_all(&header).await?; + for part in payload { + writer.write_all(part.as_ref()).await?; + } + Ok(()) +} + +async fn receive_message(reader: &mut (impl AsyncReadExt + Unpin)) + -> io::Result<Option<RawMessage>> { + let mut header = [0u8; 17]; + if let Err(e) = reader.read(&mut header).await { + if e.kind() == ErrorKind::UnexpectedEof { return Ok(None); } + else { return Err(e); } + } + + let typ = header[0]; + let id = u64::from_le_bytes(header[1..9].try_into().unwrap()); + let length = usize::from_le_bytes(header[9..17].try_into().unwrap()); + + let mut payload = Vec::new(); + payload.resize(length, 0u8); + if let Err(e) = reader.read(&mut payload).await { + if e.kind() == ErrorKind::UnexpectedEof { return Ok(None); } + else { return Err(e); } + } + + Ok(Some(RawMessage { typ, id, payload })) +} + +fn encode_message(msg: MessageBody) -> Vec<Vec<u8>> { + let mut payload = Vec::new(); + + match msg { + MessageBody::Version(version) => { + payload.push(version.to_le_bytes().to_vec()); + } + + MessageBody::NewCore(name, libfile) => { + payload.push(name.len().to_le_bytes().to_vec()); + payload.push(name.into_bytes()); + payload.push(libfile.len().to_le_bytes().to_vec()); + payload.push(libfile); + } + + MessageBody::Job(jobid, input) => { + payload.push(jobid.to_le_bytes().to_vec()); + payload.push(input.len().to_le_bytes().to_vec()); + payload.push(input); + } + } + + payload +} + +#[derive(Debug)] +pub struct CompletionEvent { + pub jobid: u64, + pub result: Result<(i32, Vec<u8>), String>, +} + +#[derive(Debug)] +enum Inbound { + Completion(CompletionEvent), +} + +#[derive(Debug)] +enum Outbound { + NewCore(String, Vec<u8>), + NewJob(u64, Vec<u8>), + NumWorkers(oneshot::Sender<u64>), + Quit, +} + +pub struct ComputePool { + runtime: runtime::Runtime, + + iothread: Option<thread::JoinHandle<()>>, + inbound: mpsc::UnboundedReceiver<Inbound>, + outbound: mpsc::Sender<Outbound>, + + // The number of jobs for which the completion event has not yet been consumed + num_running: u64, +} + +#[derive(Debug)] +enum ThreadCollect { + NewWorker(TcpStream, u64), // Worker socket, and next unused message id + WorkerReady(u64), // Worker id (indicates that this worker has its core initialised) + Query(Outbound), + Completion(u64, CompletionEvent), // Worker id, and event +} + +async fn thread_handshake_handler(mut listener: TcpListener, sink: mpsc::Sender<ThreadCollect>) { + loop { + let (mut sock, _) = listener.accept().await.expect("Accept failed on TCP server socket"); + let mut sink = sink.clone(); + task::spawn(async move { + let payload = encode_message(MessageBody::Version(1)); + if send_vectored(&mut sock, 1, 1, &payload).await.is_err() { + match sock.shutdown(Shutdown::Both) { + Ok(()) => {} + Err(_) => {} // explicitly ignore errors here, we're closing anyway + } + return; + } + + match receive_message(&mut sock).await { + Ok(Some(rawmsg)) if rawmsg.typ == 1 && rawmsg.id == 1 && + rawmsg.payload.len() == 1 && + rawmsg.payload[0] == 1 + => { + sink.send(ThreadCollect::NewWorker(sock, 2)).await.unwrap(); + } + + _ => { + match sock.shutdown(Shutdown::Both) { + Ok(()) => {} + Err(_) => {} // explicitly ignore errors here, we're closing anyway + } + } + } + }); + } +} + +async fn thread_query_handler( + mut chan: mpsc::Receiver<Outbound>, + mut sink: mpsc::Sender<ThreadCollect> +) { + loop { + if let Some(msg) = chan.recv().await { + sink.send(ThreadCollect::Query(msg)).await.unwrap(); + } else { + return; + } + } +} + +struct Worker { + socket: WriteHalf<TcpStream>, + msg_idgen: IdGen, + loaded_core: u64, + handler_map: Arc<Mutex<HashMap<u64, Box<dyn FnOnce(u64, RawMessage) + Send>>>>, +} + +impl Worker { + fn new(worker_id: u64, next_msg_id: u64, socket: TcpStream) -> Self { + let (mut read_half, write_half) = tokio::io::split(socket); + let worker = Worker { + socket: write_half, + msg_idgen: IdGen::new(next_msg_id), + loaded_core: 0, + handler_map: Arc::new(Mutex::new(HashMap::new())), + }; + + { + let handler_map = worker.handler_map.clone(); + task::spawn(async move { + loop { + let rawmsg = match receive_message(&mut read_half).await { + Ok(Some(rawmsg)) => rawmsg, + _ => break, + }; + + let mut handler_map = handler_map.lock().await; + match handler_map.remove(&rawmsg.id) { + Some(handler) => handler(worker_id, rawmsg), + None => { + eprintln!("Warning: no handler found for worker reply id {}", rawmsg.id); + } + } + } + }); + } + + worker + } + + /// Returns whether the send succeeded. In case of failure, the handler is not registered. + async fn send( + &mut self, + typ: u8, + payload: &[impl AsRef<[u8]>], + handler: impl FnOnce(u64, RawMessage) + Send + 'static + ) -> bool { + let msgid = self.msg_idgen.gen(); + self.handler_map.lock().await.insert(msgid, Box::new(handler)); + match send_vectored(&mut self.socket, typ, msgid, &payload).await { + Ok(()) => true, + Err(_) => { + self.handler_map.lock().await.remove(&msgid); + false + } + } + } +} + +#[derive(Debug, Clone)] +struct Job { + id: u64, + input: Vec<u8>, +} + +#[derive(Debug, Clone)] +struct ComputeCore { + name: String, + libfile: Vec<u8>, +} + +/// Returns whether job was successfully sent to the worker +async fn worker_run_job( + worker: &mut Worker, + job: Job, + mut result_chan: mpsc::Sender<ThreadCollect> +) -> bool { + let Job { id: jobid, input } = job; + let payload = encode_message(MessageBody::Job(jobid, input)); + let handler = move |wid: u64, rawmsg: RawMessage| { + task::spawn(async move { + let result = if rawmsg.typ == 1 { + let mut reader: &[u8] = &rawmsg.payload; + if let Some(retval) = reader.read_le_i32() { + if let Some(output) = reader.read_pascal_blob() { + Ok((retval, output.to_vec())) + } else { + Err("<Invalid reply format!>".to_string()) + } + } else { + Err("<Invalid reply format!>".to_string()) + } + } else { + Err(String::from_utf8_lossy(&rawmsg.payload).to_string()) + }; + + let event = ThreadCollect::Completion( + wid, + CompletionEvent { jobid, result } + ); + result_chan.send(event).await.unwrap(); + }); + }; + + worker.send(3, &payload, handler).await +} + +/// Returns whether the message was successfully sent to the worker. +async fn worker_set_new_core( + worker: &mut Worker, + new_core_id: u64, + core: ComputeCore, + workers_map: Arc<Mutex<HashMap<u64, Worker>>>, + result_chan: mpsc::Sender<ThreadCollect> +) -> bool { + worker_set_new_core_payload( + worker, + new_core_id, + &encode_message(MessageBody::NewCore(core.name, core.libfile)), + workers_map, + result_chan + ).await +} + +/// Returns whether the message was successfully sent to the worker. +async fn worker_set_new_core_payload( + worker: &mut Worker, + new_core_id: u64, + payload: &[impl AsRef<[u8]>], + workers_map: Arc<Mutex<HashMap<u64, Worker>>>, + mut result_chan: mpsc::Sender<ThreadCollect> +) -> bool { + let handler = move |wid: u64, rawmsg: RawMessage| { + task::spawn(async move { + if rawmsg.typ == 1 { + if let Some(worker) = workers_map.lock().await.get_mut(&wid) { + worker.loaded_core = new_core_id; + result_chan.send(ThreadCollect::WorkerReady(wid)).await.unwrap(); + } + } else { + eprintln!("Worker {} could not load new core:\n{}", + wid, String::from_utf8_lossy(&rawmsg.payload)); + } + }); + }; + worker.send(2, payload, handler).await +} + +// IO thread: +// - waits for worker connections +// - has a channel open to the parent thread on which it receives commands to send to workers +// - receives responses from workers and puts those on the channel +fn thread_entry( + listen_port: u16, + query_chan: mpsc::Receiver<Outbound>, + event_chan: mpsc::UnboundedSender<Inbound> +) { + runtime::Runtime::new().unwrap().block_on(async { + let (mut collector_sink, mut collector_source) = mpsc::channel(10); + let listener = TcpListener::bind(("0.0.0.0", listen_port)).await.unwrap(); + + task::spawn(thread_handshake_handler(listener, collector_sink.clone())); + task::spawn(thread_query_handler(query_chan, collector_sink.clone())); + + let mut worker_idgen = IdGen::new(1); + + let workers: Arc<Mutex<HashMap<u64, Worker>>> = Arc::new(Mutex::new(HashMap::new())); + let mut free_workers: VecDeque<u64> = VecDeque::new(); + let mut current_core: Option<ComputeCore> = None; + let mut current_core_id: u64 = 0; + let mut core_idgen = IdGen::new(1); + let mut job_queue: VecDeque<Job> = VecDeque::new(); + + loop { + let message = match collector_source.recv().await { + Some(message) => message, + None => break, + }; + + match message { + ThreadCollect::Query(Outbound::NewCore(name, libfile)) => { + let new_core_id = core_idgen.gen(); + current_core = Some(ComputeCore { name: name.clone(), libfile: libfile.clone() }); + current_core_id = new_core_id; + + let payload = encode_message(MessageBody::NewCore(name, libfile)); + + let mut workers_locked = workers.lock().await; + + while let Some(worker_id) = free_workers.pop_front() { + if let Some(worker) = workers_locked.get_mut(&worker_id) { + if !worker_set_new_core_payload( + worker, new_core_id, &payload, workers.clone(), + collector_sink.clone()).await { + workers_locked.remove(&worker_id); + } + } + } + } + + ThreadCollect::Query(Outbound::NewJob(jobid, input)) => { + job_queue.push_back(Job { id: jobid, input }); + } + + ThreadCollect::Query(Outbound::NumWorkers(chan)) => { + chan.send(workers.lock().await.len() as u64).unwrap(); + } + + ThreadCollect::Query(Outbound::Quit) => { + break; + } + + ThreadCollect::Completion(worker_id, event) => { + event_chan.send(Inbound::Completion(event)).unwrap(); + collector_sink.send(ThreadCollect::WorkerReady(worker_id)).await.unwrap(); + } + + ThreadCollect::NewWorker(socket, next_msg_id) => { + let wid = worker_idgen.gen(); + let worker = Worker::new(wid, next_msg_id, socket); + + workers.lock().await.insert(wid, worker); + + // Delegate the core setup to the WorkerReady event + collector_sink.send(ThreadCollect::WorkerReady(wid)).await.unwrap(); + } + + ThreadCollect::WorkerReady(worker_id) => { + let mut workers_locked = workers.lock().await; + + if let Some(worker) = workers_locked.get_mut(&worker_id) { + if worker.loaded_core != current_core_id { + let current_core = current_core.unwrap().clone(); + worker_set_new_core( + worker, current_core_id, current_core, workers.clone(), + collector_sink.clone() + ).await; + return; + } + + free_workers.push_back(worker_id); + } + } + } + + if job_queue.len() > 0 && free_workers.len() > 0 { + let job = job_queue.pop_front().unwrap(); + let mut workers_locked = workers.lock().await; + + // Loop until a worker exists and is functional + loop { + let worker_id = free_workers.pop_front().unwrap(); + if let Some(worker) = workers_locked.get_mut(&worker_id) { + let result_chan = collector_sink.clone(); + if !worker_run_job(worker, job.clone(), result_chan).await { + workers_locked.remove(&worker_id); + } else { + break; + } + } + } + } + } + }); +} + +/// Will call ComputePool::close(), `unwrap`-ing the result. +impl Drop for ComputePool { + fn drop(&mut self) { + self.close().unwrap(); + } +} + +impl ComputePool { + pub fn new(port: u16) -> io::Result<Self> { + // Spawn the IO thread + let (inbound_sender, inbound_receiver) = mpsc::unbounded_channel(); + let (outbound_sender, outbound_receiver) = mpsc::channel(1); + let jh = thread::spawn(move || { thread_entry(port, outbound_receiver, inbound_sender) }); + Ok(ComputePool { + runtime: runtime::Runtime::new()?, + iothread: Some(jh), + inbound: inbound_receiver, + outbound: outbound_sender, + num_running: 0, + }) + } + + pub fn close(&mut self) -> io::Result<()> { + self.runtime.block_on(self.outbound.send(Outbound::Quit)).iores()?; + self.iothread.take().map(|jh| jh.join().unwrap()); + Ok(()) + } + + pub fn set_core(&mut self, name: String, libfile: Vec<u8>) -> io::Result<()> { + // Instruct the IO thread to send the core to all workers, and also set it on every new + // worker that arrives + self.runtime.block_on(self.outbound.send(Outbound::NewCore(name, libfile))).iores() + } + + pub fn current_job_parallelism(&mut self) -> io::Result<u64> { + // Query the IO thread for the number of workers currently registered + let mut outbound = self.outbound.clone(); + self.runtime.block_on(async { + let (sender, receiver) = oneshot::channel(); + outbound.send(Outbound::NumWorkers(sender)).await.iores()?; + receiver.await.iores() + }) + } + + pub fn submit_job(&mut self, jobid: u64, input: Vec<u8>) -> io::Result<()> { + // Send the job to the IO thread, which will send it to a round-robin worker + self.runtime.block_on(self.outbound.send(Outbound::NewJob(jobid, input))).iores() + } + + pub fn next_completion_event(&mut self) -> io::Result<Option<CompletionEvent>> { + // If the counter is still positive, wait for events from the IO thread + if self.num_running > 0 { + match self.runtime.block_on(self.inbound.recv()) { + Some(Inbound::Completion(event)) => Ok(Some(event)), + None => Err("IO thread unexpectedly quit".ioerr()), + } + } else { + Ok(None) + } + } +} diff --git a/examples/.gitignore b/examples/.gitignore new file mode 100644 index 0000000..140f8cf --- /dev/null +++ b/examples/.gitignore @@ -0,0 +1 @@ +*.so diff --git a/examples/kaas/Makefile b/examples/kaas/Makefile new file mode 100644 index 0000000..e7d5154 --- /dev/null +++ b/examples/kaas/Makefile @@ -0,0 +1,9 @@ +.PHONY: all clean + +all: kaas.so + +clean: + rm -f kaas.so + +kaas.so: kaas.c + gcc -Wall -Wextra -fPIC -shared $< -o $@ diff --git a/examples/kaas/kaas.c b/examples/kaas/kaas.c new file mode 100644 index 0000000..f823442 --- /dev/null +++ b/examples/kaas/kaas.c @@ -0,0 +1,32 @@ +#include <stdio.h> +#include <stdlib.h> +#include <ctype.h> + +__attribute__((constructor)) +void constr() { + printf("kaas: constructor!\n"); +} + +__attribute__((destructor)) +void destr() { + printf("kaas: destructor!\n"); +} + +int worker_init(int version) { + printf("kaas: initialised with version %d\n", version); + return 0; +} + +int worker_run_job(size_t size, void *input_, size_t *outsize, void **outputp) { + const char *input = input_; + char *output = malloc(size); + for (size_t i = 0; i < size; i++) output[i] = toupper(input[i]); + *outsize = size; + *outputp = output; + return 0; +} + +void worker_free_outdata(size_t size, void *data) { + (void)size; + free(data); +} diff --git a/examples/mandel/Makefile b/examples/mandel/Makefile new file mode 100644 index 0000000..4311ee9 --- /dev/null +++ b/examples/mandel/Makefile @@ -0,0 +1,9 @@ +.PHONY: all clean + +all: mandel.so + +clean: + rm -f mandel.so + +mandel.so: mandel.c + gcc -Wall -Wextra -fPIC -shared -O3 -fopenmp $< -o $@ diff --git a/examples/mandel/mandel.c b/examples/mandel/mandel.c new file mode 100644 index 0000000..f7517ba --- /dev/null +++ b/examples/mandel/mandel.c @@ -0,0 +1,57 @@ +#include <stdio.h> +#include <stdlib.h> +#include <stdint.h> + +struct input { + double ltx, lty; + double rbx, rby; + size_t width, height; + size_t maxiter; +}; + +int worker_init(int version) { + fprintf(stderr, "mandel: init(%d)\n", version); + return version == 1 ? 0 : 1; +} + +int worker_run_job(size_t inputsize, const void *input_, size_t *outputsize, void **outputp) { + if (inputsize != sizeof(struct input)) { + fprintf(stderr, "mandel: Input has invalid size %zu (expected %zu)\n", inputsize, sizeof(struct input)); + return -1; + } + + fprintf(stderr, "mandel: run_job()\n"); + + const struct input *const input = input_; + + *outputsize = 4 * input->width * input->height; + uint32_t *const output = *outputp = malloc(*outputsize); + +#pragma omp parallel for + for (size_t yi = 0; yi < input->height; yi++) { + for (size_t xi = 0; xi < input->width; xi++) { + const double y = (input->rby - input->lty) / (input->height - 1) * yi; + const double x = (input->rbx - input->ltx) / (input->width - 1) * xi; + + double a = x, b = y, a2 = a * a, b2 = b * b; + size_t n; + for (n = 0; n < input->maxiter && a2 + b2 < 4; n++) { + b = 2 * a * b + y; + a = a2 - b2 + x; + a2 = a * a; b2 = b * b; + } + + output[input->width * yi + xi] = n; + } + } + + fprintf(stderr, "mandel: run_job() finished\n"); + + return 0; +} + +void worker_free_outdata(size_t size, void *output) { + (void)size; + fprintf(stderr, "mandel: free_outdata()\n"); + free(output); +} diff --git a/utils/Cargo.toml b/utils/Cargo.toml new file mode 100644 index 0000000..53767d0 --- /dev/null +++ b/utils/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "dd-utils" +version = "0.1.0" +authors = ["Tom Smeding <tom.smeding@gmail.com>"] +edition = "2018" + +[dependencies] diff --git a/utils/src/enums.rs b/utils/src/enums.rs new file mode 100644 index 0000000..d08c7ea --- /dev/null +++ b/utils/src/enums.rs @@ -0,0 +1,11 @@ +pub enum MessageBody { + Version(u32), + NewCore(String, Vec<u8>), + Job(u64, Vec<u8>), +} + +pub enum Reply { + Version(bool), + NewCore, + Job(i32, Vec<u8>), +} diff --git a/utils/src/error.rs b/utils/src/error.rs new file mode 100644 index 0000000..f25bc5d --- /dev/null +++ b/utils/src/error.rs @@ -0,0 +1,27 @@ +use std::io; + +pub trait IntoIOError { + fn ioerr(self) -> io::Error; + fn perror(self, parent: io::Error) -> io::Error; +} + +// This impl bound is taken directly from the io::Error::new function. +impl<E: Into<Box<dyn std::error::Error + Send + Sync>>> IntoIOError for E { + fn ioerr(self) -> io::Error { + io::Error::new(io::ErrorKind::Other, self) + } + + fn perror(self, parent: io::Error) -> io::Error { + io::Error::new(parent.kind(), format!("{}: {}", self.into(), parent)) + } +} + +pub trait IntoIOResult<T> { + fn iores(self) -> io::Result<T>; +} + +impl<T, E: IntoIOError> IntoIOResult<T> for Result<T, E> { + fn iores(self) -> io::Result<T> { + self.map_err(|e| e.ioerr()) + } +} diff --git a/utils/src/idgen.rs b/utils/src/idgen.rs new file mode 100644 index 0000000..20f13ff --- /dev/null +++ b/utils/src/idgen.rs @@ -0,0 +1,16 @@ +#[derive(Debug)] +pub struct IdGen { + next: u64, +} + +impl IdGen { + pub fn new(start: u64) -> Self { + IdGen { next: start } + } + + pub fn gen(&mut self) -> u64 { + let res = self.next; + self.next += 1; + res + } +} diff --git a/utils/src/lib.rs b/utils/src/lib.rs new file mode 100644 index 0000000..867fd97 --- /dev/null +++ b/utils/src/lib.rs @@ -0,0 +1,4 @@ +pub mod error; +pub mod idgen; +pub mod protocol; +pub mod read_ext; diff --git a/utils/src/protocol.rs b/utils/src/protocol.rs new file mode 100644 index 0000000..4820f79 --- /dev/null +++ b/utils/src/protocol.rs @@ -0,0 +1,44 @@ +use std::convert::TryInto; +use std::io::{self, BufReader, ErrorKind, Read}; +use std::net::TcpStream; + +pub enum MessageBody { + Version(u32), + NewCore(String, Vec<u8>), + Job(u64, Vec<u8>), +} + +pub enum Reply { + Version(bool), + NewCore, + Job(i32, Vec<u8>), +} + +pub struct RawMessage { + pub typ: u8, + pub id: u64, + pub payload: Vec<u8>, +} + +impl RawMessage { + pub fn receive(reader: &mut BufReader<TcpStream>) -> io::Result<Option<Self>> { + let mut header = [0u8; 17]; + if let Err(e) = reader.read(&mut header) { + if e.kind() == ErrorKind::UnexpectedEof { return Ok(None); } + else { return Err(e); } + } + + let typ = header[0]; + let id = u64::from_le_bytes(header[1..9].try_into().unwrap()); + let length = usize::from_le_bytes(header[9..17].try_into().unwrap()); + + let mut payload = Vec::new(); + payload.resize(length, 0u8); + if let Err(e) = reader.read(&mut payload) { + if e.kind() == ErrorKind::UnexpectedEof { return Ok(None); } + else { return Err(e); } + } + + Ok(Some(Self { typ, id, payload })) + } +} diff --git a/utils/src/read_ext.rs b/utils/src/read_ext.rs new file mode 100644 index 0000000..7695ada --- /dev/null +++ b/utils/src/read_ext.rs @@ -0,0 +1,38 @@ +use std::convert::TryInto; + +pub trait ReadExt { + fn read_exact(&mut self, n: usize) -> Option<&[u8]>; + + fn read_le_i32(&mut self) -> Option<i32> { + self.read_exact(4).map(|bytes| i32::from_le_bytes(bytes.try_into().unwrap())) + } + + fn read_le_u32(&mut self) -> Option<u32> { + self.read_exact(4).map(|bytes| u32::from_le_bytes(bytes.try_into().unwrap())) + } + + fn read_le_u64(&mut self) -> Option<u64> { + self.read_exact(8).map(|bytes| u64::from_le_bytes(bytes.try_into().unwrap())) + } + + fn read_pascal_blob(&mut self) -> Option<&[u8]> { + let len = self.read_le_u64()?; + self.read_exact(len as usize) + } + + fn read_pascal_string(&mut self) -> Option<Result<String, std::string::FromUtf8Error>> { + self.read_pascal_blob().map(|b| String::from_utf8(b.to_vec())) + } +} + +impl ReadExt for &[u8] { + fn read_exact(&mut self, n: usize) -> Option<&[u8]> { + if self.len() >= n { + let (res, newlist) = self.split_at(n); + *self = newlist; + Some(res) + } else { + None + } + } +} diff --git a/worker/Cargo.toml b/worker/Cargo.toml new file mode 100644 index 0000000..f9888c4 --- /dev/null +++ b/worker/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "dd-worker" +version = "0.1.0" +authors = ["Tom Smeding <tom.smeding@gmail.com>"] +edition = "2018" + +[dependencies] +libloading = "0.5" +static_assertions = "1.1" +net2 = "0.2" +tempfile = "3.1" +dd-utils = { path = "../utils" } diff --git a/worker/src/compute_core.rs b/worker/src/compute_core.rs new file mode 100644 index 0000000..aec2a9d --- /dev/null +++ b/worker/src/compute_core.rs @@ -0,0 +1,80 @@ +use std::io; +use std::path::Path; +use std::ffi::c_void; +use libloading::{Library, Symbol}; +use static_assertions::assert_eq_size; +use dd_utils::error::*; + +pub const CORE_API_VERSION: i32 = 1; + +pub struct ComputeCore { + #[used] + library: Library, + // init: unsafe extern "C" fn(i32) -> i32, + run_job: unsafe extern "C" fn(u64, *mut c_void, *mut u64, *mut *mut c_void) -> i32, + free_outdata: unsafe extern "C" fn(u64, *mut c_void), +} + +impl ComputeCore { + pub fn load<P: AsRef<Path>>(lib_path: P) -> io::Result<Self> { + let lib_path = lib_path.as_ref(); + let lib_path = lib_path.canonicalize().map_err(|e| { + format!("Library '{}' does not exist", lib_path.display()).perror(e) + })?; + + let library = libloading::Library::new(&lib_path).map_err(|e| { + format!("Cannot load library at '{}'", lib_path.display()).perror(e) + })?; + + macro_rules! load_symbol { + ($name:expr, $symname:expr, $typ:ty) => {{ + let sym: io::Result<Symbol<$typ>> = library.get($symname).map_err(|e| { + format!("Failed to load symbol '{}' from library '{}'", $name, lib_path.display()).perror(e) + }); + sym.map(|s| std::mem::transmute::<*mut c_void, $typ>(s.into_raw().into_raw())) + }} + } + + let init = unsafe { load_symbol!("worker_init", b"worker_init\0", unsafe extern "C" fn(i32) -> i32)? }; + let run_job = unsafe { load_symbol!("worker_run_job",b"worker_run_job\0", unsafe extern "C" fn(u64, *mut c_void, *mut u64, *mut *mut c_void) -> i32)? }; + let free_outdata = unsafe { load_symbol!("worker_free_outdata", b"worker_free_outdata\0", unsafe extern "C" fn(u64, *mut c_void))? }; + + let ret = unsafe { + init(CORE_API_VERSION) + }; + if ret != 0 { + return Err(format!("Library refused initialisation at version {}: '{}'", CORE_API_VERSION, lib_path.display()).ioerr()); + } + + Ok(ComputeCore { library, /*init,*/ run_job, free_outdata }) + } + + /// Will block on this thread, so if you don't want to block, spawn a new thread. + /// This does not syntactically need mutability, but since one can only run one job at a time + /// on a core, semantic mutability seems warranted. + pub fn run_job(&mut self, input_data: &mut [u8]) -> (i32, Vec<u8>) { + let mut outsize = 0u64; + let mut outdataptr: *mut c_void = std::ptr::null_mut(); + + assert_eq_size!(u64, usize); + + let ret = unsafe { + (self.run_job)(input_data.len() as u64, input_data.as_mut_ptr() as *mut c_void, &mut outsize, &mut outdataptr) + }; + + let mut result = (ret, Vec::new()); + + if outsize != 0 && outdataptr != std::ptr::null_mut() { + result.1.resize(outsize as usize, 0u8); + unsafe { + std::ptr::copy(outdataptr as *const u8, result.1.as_mut_ptr(), outsize as usize); + } + } + + unsafe { + (self.free_outdata)(outsize, outdataptr); + } + + result + } +} diff --git a/worker/src/connection.rs b/worker/src/connection.rs new file mode 100644 index 0000000..1222c92 --- /dev/null +++ b/worker/src/connection.rs @@ -0,0 +1,119 @@ +use std::io::{self, BufReader, Write}; +use std::net::TcpStream; +use net2::TcpStreamExt; +use dd_utils::read_ext::ReadExt; +use dd_utils::error::*; +use dd_utils::protocol::*; + +fn send_vectored<W: Write, V: AsRef<[u8]>>( + writer: &mut W, + typ: u8, + id: u64, + payload: &[V] +) -> io::Result<()> { + let mut header = [0u8; 17]; + header[0] = typ; + header[1..9].copy_from_slice(&id.to_le_bytes()); + let sumlen: usize = payload.iter().map(|v| v.as_ref().len()).sum(); + header[9..17].copy_from_slice(&sumlen.to_le_bytes()); + writer.write_all(&header)?; + for part in payload { + writer.write_all(part.as_ref())?; + } + Ok(()) +} + +pub struct Message { + pub id: u64, + pub body: MessageBody, +} + +fn interpret_message(raw: RawMessage) -> io::Result<Message> { + let mut reader: &[u8] = &raw.payload; + + macro_rules! too_short { () => { "Message payload too short!".ioerr() } } + + match raw.typ { + 1 => { + let version = reader.read_le_u32().ok_or(too_short!())?; + Ok(Message { id: raw.id, body: MessageBody::Version(version) }) + } + + 2 => { + let name = reader.read_pascal_string() + .map(|r| r.iores()) + .ok_or(too_short!())??; + let libfile = reader.read_pascal_blob() + .map(|b| b.to_vec()) + .ok_or(too_short!())?; + Ok(Message { id: raw.id, body: MessageBody::NewCore(name, libfile) }) + } + + 3 => { + let jobid = reader.read_le_u64().ok_or(too_short!())?; + let jobdata = reader.read_pascal_blob().map(|b| b.to_vec()).ok_or(too_short!())?; + Ok(Message { id: raw.id, body: MessageBody::Job(jobid, jobdata) }) + } + + _ => { + Err(format!("Unknown message type {}", raw.typ).ioerr()) + } + } +} + +pub struct Connection { + reader: BufReader<TcpStream>, +} + +impl Connection { + pub fn new(address: &str) -> io::Result<Self> { + let socket = TcpStream::connect(address)?; + + socket.set_keepalive_ms(Some(60000))?; + + Ok(Self { + reader: BufReader::new(socket), + }) + } + + pub fn close(self) {} + + pub fn receive(&mut self) -> io::Result<Option<Message>> { + RawMessage::receive(&mut self.reader) + .and_then(|opt| + opt.map(|rawmsg| interpret_message(rawmsg)).transpose() + ) + } + + pub fn reply(&mut self, msgid: u64, rmsg: Result<Reply, String>) -> io::Result<()> { + let socket = self.reader.get_mut(); + let mut payload: Vec<Vec<u8>> = Vec::new(); + + let response_type; + + match rmsg { + Ok(msg) => { + response_type = 1; + match msg { + Reply::Version(ok) => { + payload.push(vec![if ok { 1u8 } else { 0u8 }]); + } + + Reply::NewCore => {} + + Reply::Job(jobid, output) => { + payload.push(jobid.to_le_bytes().to_vec()); + payload.push(output); + } + } + } + + Err(err) => { + response_type = 0xff; + payload.push(err.into_bytes()); + } + } + + send_vectored(socket, response_type, msgid, &payload) + } +} diff --git a/worker/src/main.rs b/worker/src/main.rs new file mode 100644 index 0000000..0300278 --- /dev/null +++ b/worker/src/main.rs @@ -0,0 +1,120 @@ +use std::env; +use std::fs::File; +use std::io::{self, Write}; +use std::process; +use tempfile; +use dd_utils::protocol::*; +use crate::connection::Connection; +use crate::compute_core::ComputeCore; + +mod compute_core; +mod connection; + +#[repr(C)] +struct mandel_input { + ltx: f64, + lty: f64, + rbx: f64, + rby: f64, + width: usize, + height: usize, + maxiter: usize, +} + +fn aligned_buffer(num_bytes: usize, align_bits: usize, backend: &mut Vec<u8>) -> &mut [u8] { + assert!(align_bits > 0); + + backend.resize(num_bytes + (1 << align_bits) - 1, 0u8); + let begin_ptr = backend.as_ptr() as usize; + let aligned_ptr = ((begin_ptr - 1) & !((1 << align_bits) - 1)) + (1 << align_bits); + let offset = aligned_ptr - begin_ptr; + &mut backend[offset .. offset + num_bytes] +} + +fn main() -> io::Result<()> { + let args = env::args().collect::<Vec<_>>(); + if args.len() != 2 { + eprintln!("Usage: {} <controller ip:port>", args[0]); + process::exit(1); + } + + let controller_addr = &args[1]; + + let tempdir = tempfile::tempdir()?; + + let mut conn = Connection::new(controller_addr)?; + + let mut ccore: Option<ComputeCore> = None; + + loop { + let opt_msg = conn.receive()?; + let msg = match opt_msg { Some(msg) => msg, None => break }; + + match msg.body { + MessageBody::Version(version) => { + if version == 1 { + conn.reply(msg.id, Ok(Reply::Version(true)))?; + } else { + conn.reply(msg.id, Ok(Reply::Version(false)))?; + conn.close(); + break; + } + } + + MessageBody::NewCore(_name, libfile) => { + let path = tempdir.path().join("core.so"); + + let res = + File::open(&path) + .and_then(|mut f| { f.write_all(&libfile)?; Ok(f) }) + .and_then(|f| { + drop(f); + ComputeCore::load(&path) + }); + + match res { + Ok(new_ccore) => { + ccore = Some(new_ccore); + conn.reply(msg.id, Ok(Reply::NewCore))?; + } + Err(err) => { + conn.reply(msg.id, Err(format!("Could not load core: {}", err)))?; + } + } + } + + MessageBody::Job(_jobid, mut input) => { + if let Some(ccore) = &mut ccore { + let (ret, output) = ccore.run_job(&mut input); + conn.reply(msg.id, Ok(Reply::Job(ret, output)))?; + } else { + conn.reply(msg.id, Err("No core loaded".to_string()))?; + } + } + } + } + + // let mut core = compute_core::ComputeCore::load(&lib_fname)?; + + // let mut input = b"kaas\0".to_vec(); + // println!("Input: {:?} = {}", input, String::from_utf8_lossy(&input)); + // let (ret, output) = core.run_job(&mut input); + // println!("Exit code: {}", ret); + // println!("Output: {:?} = {}", output, String::from_utf8_lossy(&output)); + + // let input = mandel_input { + // ltx: -1.5, lty: 1.0, + // rbx: 1.0, rby: 1.0, + // width: 2000, height: 1600, + // maxiter: 1024 + // }; + // let mut input_bytes_backend = Vec::new(); + // let input_bytes = aligned_buffer(std::mem::size_of::<mandel_input>(), 3, &mut input_bytes_backend); + // unsafe { + // std::ptr::copy(&input as *const mandel_input as *const u8, input_bytes.as_mut_ptr(), std::mem::size_of::<mandel_input>()); + // } + // let (ret, output) = core.run_job(input_bytes); + // println!("ret = {}", ret); + + Ok(()) +} |