#include #include #include #include "scheduler.h" void Scheduler::workerEntry(bool verbose) { while (true) { Job *job = nullptr; { lock_guard commMutGuard(commMut); if (jobs.size() > 0) { job = jobs.front(); jobs.pop(); } } if (job) { if (verbose) { cout << "SCHED(" << this_thread::get_id() << ") running job" << endl; } job->callback(); delete job; if (verbose) { cout << "SCHED(" << this_thread::get_id() << ") finished job" << endl; } } else if (finishFlag) { if (verbose) { cout << "SCHED(" << this_thread::get_id() << ") spotted finishFlag" << endl; } break; } else { if (verbose) { cout << "SCHED(" << this_thread::get_id() << ") no job, waiting" << endl; } this_thread::sleep_for(chrono::milliseconds(100)); } } } Scheduler::Scheduler(bool verbose, int nthreads) : verbose(verbose), nthreads(nthreads) { assert(nthreads > 0); workers.reserve(nthreads); for (int i = 0; i < nthreads; i++) { workers.emplace_back([this]() { workerEntry(this->verbose); }); } if (verbose) { cout << "SCHED " << nthreads << " workers spawned" << endl; } } Scheduler::~Scheduler() { finish(); } void Scheduler::submit(const function &func) { Job *job = new Job(func); lock_guard commMutGuard(commMut); jobs.push(job); } void Scheduler::finish() { if (hasJoined) return; if (verbose) { cout << "SCHED finish()" << endl; } { lock_guard commMutGuard(commMut); finishFlag = true; } for (int i = 0; i < nthreads; i++) { if (verbose) { cout << "SCHED Joining worker " << i << "..." << endl; } workers[i].join(); } if (verbose) { cout << "SCHED All workers joined" << endl; } hasJoined = true; }