aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Smeding <tom.smeding@gmail.com>2020-03-28 21:25:31 +0100
committerTom Smeding <tom.smeding@gmail.com>2020-03-28 21:30:41 +0100
commit20d80818a3588aea6f5b8d258958da402a41f6bd (patch)
treea3f76645c9a7f5fcb3b9c89bf5b1204684f86254
parent3de8521e37c7d27439e7b599a32958920a31d176 (diff)
controller: Properly handle close() and related stuff
-rw-r--r--controller/src/lib.rs40
1 files changed, 35 insertions, 5 deletions
diff --git a/controller/src/lib.rs b/controller/src/lib.rs
index 0918542..e5d19ca 100644
--- a/controller/src/lib.rs
+++ b/controller/src/lib.rs
@@ -440,7 +440,7 @@ fn thread_entry(
/// Will call ComputePool::close(), `unwrap`-ing the result.
impl Drop for ComputePool {
fn drop(&mut self) {
- self.close().unwrap();
+ self.close_non_consume().unwrap();
}
}
@@ -459,19 +459,38 @@ impl ComputePool {
})
}
- pub fn close(&mut self) -> io::Result<()> {
+ fn close_non_consume(&mut self) -> io::Result<()> {
+ // This allows the user to call close() manually, to catch errors.
+ if self.iothread.is_none() {
+ return Ok(())
+ }
+
self.runtime.block_on(self.outbound.send(Outbound::Quit)).iores()?;
- self.iothread.take().map(|jh| jh.join().unwrap());
+ self.iothread.take().unwrap().join().unwrap();
Ok(())
}
+ /// Also called by the `Drop` implementation; use this if you want to catch any errors thrown
+ /// instead of panicing on them.
+ pub fn close(mut self) -> io::Result<()> {
+ self.close_non_consume()
+ }
+
pub fn set_core(&mut self, name: String, libfile: Vec<u8>) -> io::Result<()> {
+ if self.iothread.is_none() {
+ return Err("ComputePool is closed".ioerr());
+ }
+
// Instruct the IO thread to send the core to all workers, and also set it on every new
// worker that arrives
self.runtime.block_on(self.outbound.send(Outbound::NewCore(name, libfile))).iores()
}
pub fn current_job_parallelism(&mut self) -> io::Result<u64> {
+ if self.iothread.is_none() {
+ return Err("ComputePool is closed".ioerr());
+ }
+
// Query the IO thread for the number of workers currently registered
let mut outbound = self.outbound.clone();
self.runtime.block_on(async {
@@ -482,15 +501,26 @@ impl ComputePool {
}
pub fn submit_job(&mut self, jobid: u64, input: Vec<u8>) -> io::Result<()> {
+ if self.iothread.is_none() {
+ return Err("ComputePool is closed".ioerr());
+ }
+
// Send the job to the IO thread, which will send it to a round-robin worker
- self.runtime.block_on(self.outbound.send(Outbound::NewJob(jobid, input))).iores()
+ self.runtime.block_on(self.outbound.send(Outbound::NewJob(jobid, input))).iores()?;
+ Ok(())
}
pub fn next_completion_event(&mut self) -> io::Result<Option<CompletionEvent>> {
+ if self.iothread.is_none() {
+ return Err("ComputePool is closed".ioerr());
+ }
+
// If the counter is still positive, wait for events from the IO thread
if self.num_running > 0 {
match self.runtime.block_on(self.inbound.recv()) {
- Some(Inbound::Completion(event)) => Ok(Some(event)),
+ Some(Inbound::Completion(event)) => {
+ Ok(Some(event))
+ },
None => Err("IO thread unexpectedly quit".ioerr()),
}
} else {