diff options
author | Tom Smeding <tom.smeding@gmail.com> | 2020-03-28 21:25:31 +0100 |
---|---|---|
committer | Tom Smeding <tom.smeding@gmail.com> | 2020-03-28 21:30:41 +0100 |
commit | 20d80818a3588aea6f5b8d258958da402a41f6bd (patch) | |
tree | a3f76645c9a7f5fcb3b9c89bf5b1204684f86254 /controller/src | |
parent | 3de8521e37c7d27439e7b599a32958920a31d176 (diff) |
controller: Properly handle close() and related stuff
Diffstat (limited to 'controller/src')
-rw-r--r-- | controller/src/lib.rs | 40 |
1 files changed, 35 insertions, 5 deletions
diff --git a/controller/src/lib.rs b/controller/src/lib.rs index 0918542..e5d19ca 100644 --- a/controller/src/lib.rs +++ b/controller/src/lib.rs @@ -440,7 +440,7 @@ fn thread_entry( /// Will call ComputePool::close(), `unwrap`-ing the result. impl Drop for ComputePool { fn drop(&mut self) { - self.close().unwrap(); + self.close_non_consume().unwrap(); } } @@ -459,19 +459,38 @@ impl ComputePool { }) } - pub fn close(&mut self) -> io::Result<()> { + fn close_non_consume(&mut self) -> io::Result<()> { + // This allows the user to call close() manually, to catch errors. + if self.iothread.is_none() { + return Ok(()) + } + self.runtime.block_on(self.outbound.send(Outbound::Quit)).iores()?; - self.iothread.take().map(|jh| jh.join().unwrap()); + self.iothread.take().unwrap().join().unwrap(); Ok(()) } + /// Also called by the `Drop` implementation; use this if you want to catch any errors thrown + /// instead of panicing on them. + pub fn close(mut self) -> io::Result<()> { + self.close_non_consume() + } + pub fn set_core(&mut self, name: String, libfile: Vec<u8>) -> io::Result<()> { + if self.iothread.is_none() { + return Err("ComputePool is closed".ioerr()); + } + // Instruct the IO thread to send the core to all workers, and also set it on every new // worker that arrives self.runtime.block_on(self.outbound.send(Outbound::NewCore(name, libfile))).iores() } pub fn current_job_parallelism(&mut self) -> io::Result<u64> { + if self.iothread.is_none() { + return Err("ComputePool is closed".ioerr()); + } + // Query the IO thread for the number of workers currently registered let mut outbound = self.outbound.clone(); self.runtime.block_on(async { @@ -482,15 +501,26 @@ impl ComputePool { } pub fn submit_job(&mut self, jobid: u64, input: Vec<u8>) -> io::Result<()> { + if self.iothread.is_none() { + return Err("ComputePool is closed".ioerr()); + } + // Send the job to the IO thread, which will send it to a round-robin worker - self.runtime.block_on(self.outbound.send(Outbound::NewJob(jobid, input))).iores() + self.runtime.block_on(self.outbound.send(Outbound::NewJob(jobid, input))).iores()?; + Ok(()) } pub fn next_completion_event(&mut self) -> io::Result<Option<CompletionEvent>> { + if self.iothread.is_none() { + return Err("ComputePool is closed".ioerr()); + } + // If the counter is still positive, wait for events from the IO thread if self.num_running > 0 { match self.runtime.block_on(self.inbound.recv()) { - Some(Inbound::Completion(event)) => Ok(Some(event)), + Some(Inbound::Completion(event)) => { + Ok(Some(event)) + }, None => Err("IO thread unexpectedly quit".ioerr()), } } else { |