summaryrefslogtreecommitdiff
path: root/scheduler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'scheduler.cpp')
-rw-r--r--scheduler.cpp62
1 files changed, 62 insertions, 0 deletions
diff --git a/scheduler.cpp b/scheduler.cpp
new file mode 100644
index 0000000..452b0bb
--- /dev/null
+++ b/scheduler.cpp
@@ -0,0 +1,62 @@
+#include <iostream>
+#include <chrono>
+#include <cassert>
+#include "scheduler.h"
+
+void Scheduler::workerEntry() {
+ while (true) {
+ Job *job = nullptr;
+
+ {
+ lock_guard commMutGuard(commMut);
+ if (jobs.size() > 0) {
+ job = jobs.front();
+ jobs.pop();
+ }
+ }
+
+ if (job) {
+ job->callback();
+ } else if (finishFlag) {
+ break;
+ } else {
+ this_thread::sleep_for(chrono::milliseconds(100));
+ }
+ }
+}
+
+Scheduler::Scheduler(int nthreads)
+ : nthreads(nthreads) {
+
+ assert(nthreads > 0);
+
+ workers.reserve(nthreads);
+ for (int i = 0; i < nthreads; i++) {
+ workers.emplace_back([this]() { workerEntry(); });
+ }
+}
+
+Scheduler::~Scheduler() {
+ finish();
+}
+
+void Scheduler::submit(const function<void()> &func) {
+ Job *job = new Job(func);
+ lock_guard commMutGuard(commMut);
+ jobs.push(job);
+}
+
+void Scheduler::finish() {
+ if (hasJoined) return;
+
+ {
+ lock_guard commMutGuard(commMut);
+ finishFlag = true;
+ }
+
+ for (int i = 0; i < nthreads; i++) {
+ workers[i].join();
+ }
+
+ hasJoined = true;
+}