From 20d80818a3588aea6f5b8d258958da402a41f6bd Mon Sep 17 00:00:00 2001 From: Tom Smeding Date: Sat, 28 Mar 2020 21:25:31 +0100 Subject: controller: Properly handle close() and related stuff --- controller/src/lib.rs | 40 +++++++++++++++++++++++++++++++++++----- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/controller/src/lib.rs b/controller/src/lib.rs index 0918542..e5d19ca 100644 --- a/controller/src/lib.rs +++ b/controller/src/lib.rs @@ -440,7 +440,7 @@ fn thread_entry( /// Will call ComputePool::close(), `unwrap`-ing the result. impl Drop for ComputePool { fn drop(&mut self) { - self.close().unwrap(); + self.close_non_consume().unwrap(); } } @@ -459,19 +459,38 @@ impl ComputePool { }) } - pub fn close(&mut self) -> io::Result<()> { + fn close_non_consume(&mut self) -> io::Result<()> { + // This allows the user to call close() manually, to catch errors. + if self.iothread.is_none() { + return Ok(()) + } + self.runtime.block_on(self.outbound.send(Outbound::Quit)).iores()?; - self.iothread.take().map(|jh| jh.join().unwrap()); + self.iothread.take().unwrap().join().unwrap(); Ok(()) } + /// Also called by the `Drop` implementation; use this if you want to catch any errors thrown + /// instead of panicing on them. + pub fn close(mut self) -> io::Result<()> { + self.close_non_consume() + } + pub fn set_core(&mut self, name: String, libfile: Vec) -> io::Result<()> { + if self.iothread.is_none() { + return Err("ComputePool is closed".ioerr()); + } + // Instruct the IO thread to send the core to all workers, and also set it on every new // worker that arrives self.runtime.block_on(self.outbound.send(Outbound::NewCore(name, libfile))).iores() } pub fn current_job_parallelism(&mut self) -> io::Result { + if self.iothread.is_none() { + return Err("ComputePool is closed".ioerr()); + } + // Query the IO thread for the number of workers currently registered let mut outbound = self.outbound.clone(); self.runtime.block_on(async { @@ -482,15 +501,26 @@ impl ComputePool { } pub fn submit_job(&mut self, jobid: u64, input: Vec) -> io::Result<()> { + if self.iothread.is_none() { + return Err("ComputePool is closed".ioerr()); + } + // Send the job to the IO thread, which will send it to a round-robin worker - self.runtime.block_on(self.outbound.send(Outbound::NewJob(jobid, input))).iores() + self.runtime.block_on(self.outbound.send(Outbound::NewJob(jobid, input))).iores()?; + Ok(()) } pub fn next_completion_event(&mut self) -> io::Result> { + if self.iothread.is_none() { + return Err("ComputePool is closed".ioerr()); + } + // If the counter is still positive, wait for events from the IO thread if self.num_running > 0 { match self.runtime.block_on(self.inbound.recv()) { - Some(Inbound::Completion(event)) => Ok(Some(event)), + Some(Inbound::Completion(event)) => { + Ok(Some(event)) + }, None => Err("IO thread unexpectedly quit".ioerr()), } } else { -- cgit v1.2.3