From f0b6d1b9c46578183b427bcec4bbe99ab10c7b97 Mon Sep 17 00:00:00 2001 From: Tom Smeding Date: Sat, 21 Jul 2018 22:03:31 +0200 Subject: competition: reorganisation and multithread --- competition/Makefile | 2 +- competition/error.cpp | 9 ++ competition/error.h | 12 +++ competition/job.cpp | 51 ++++++++++++ competition/job.h | 39 +++++++++ competition/main.cpp | 213 ++++++++++------------------------------------- competition/multilog.cpp | 57 +++++++++++++ competition/multilog.h | 32 +++++++ competition/process.cpp | 136 ++++++++++++++++++++++++++++++ competition/process.h | 32 +++++++ 10 files changed, 413 insertions(+), 170 deletions(-) create mode 100644 competition/error.cpp create mode 100644 competition/error.h create mode 100644 competition/job.cpp create mode 100644 competition/job.h create mode 100644 competition/multilog.cpp create mode 100644 competition/multilog.h create mode 100644 competition/process.cpp create mode 100644 competition/process.h diff --git a/competition/Makefile b/competition/Makefile index a39c58d..8b2641c 100644 --- a/competition/Makefile +++ b/competition/Makefile @@ -1,5 +1,5 @@ CXX = g++ -CXXFLAGS = -Wall -Wextra -O3 -std=c++17 -fwrapv -flto +CXXFLAGS = -Wall -Wextra -O3 -std=c++17 -fwrapv -flto -pthread TARGET = competition diff --git a/competition/error.cpp b/competition/error.cpp new file mode 100644 index 0000000..4c737c2 --- /dev/null +++ b/competition/error.cpp @@ -0,0 +1,9 @@ +#include "error.h" + + +StopCompetitionError::StopCompetitionError() + : runtime_error("StopCompetitionError") {} +StopCompetitionError::StopCompetitionError(const string &what_arg) + : runtime_error(what_arg) {} +StopCompetitionError::StopCompetitionError(const char *what_arg) + : runtime_error(what_arg) {} diff --git a/competition/error.h b/competition/error.h new file mode 100644 index 0000000..403f81b --- /dev/null +++ b/competition/error.h @@ -0,0 +1,12 @@ +#include +#include + +using namespace std; + + +class StopCompetitionError : public runtime_error { +public: + StopCompetitionError(); + explicit StopCompetitionError(const string &what_arg); + explicit StopCompetitionError(const char *what_arg); +}; diff --git a/competition/job.cpp b/competition/job.cpp new file mode 100644 index 0000000..043074e --- /dev/null +++ b/competition/job.cpp @@ -0,0 +1,51 @@ +#include +#include +#include "job.h" + +void Scheduler::workerEntry() { + while (true) { + Job *job = nullptr; + + { + lock_guard commMutGuard(commMut); + if (terminateFlag) break; + if (jobs.size() > 0) { + job = jobs.front(); + jobs.pop(); + } + } + + if (job) { + job->callback(); + } else { + this_thread::sleep_for(chrono::milliseconds(100)); + } + } +} + +Scheduler::Scheduler(int nthreads) + : nthreads(nthreads) { + + assert(nthreads > 0); + + workers.reserve(nthreads); + for (int i = 0; i < nthreads; i++) { + workers.emplace_back([this]() { workerEntry(); }); + } +} + +Scheduler::~Scheduler() { + finish(); +} + +void Scheduler::submit(const function &func) { + Job *job = new Job(func); + lock_guard commMutGuard(commMut); + jobs.push(job); +} + +void Scheduler::finish() { + for (int i = 0; i < nthreads; i++) { + workers[i].join(); + } +} diff --git a/competition/job.h b/competition/job.h new file mode 100644 index 0000000..9dba159 --- /dev/null +++ b/competition/job.h @@ -0,0 +1,39 @@ +#pragma once + +#include +#include +#include +#include +#include + +using namespace std; + + +class Scheduler { + struct Job { + function callback; + + Job(const function callback) + : callback(callback) {} + }; + + queue jobs; + bool terminateFlag = false; + mutex commMut; + + vector workers; + + void workerEntry(); + +public: + const int nthreads; + + Scheduler(int nthreads); + ~Scheduler(); + + // func is run in child thread + // doneCallback is run in host thread + void submit(const function &func); + + void finish(); +}; diff --git a/competition/main.cpp b/competition/main.cpp index 90579db..604263c 100644 --- a/competition/main.cpp +++ b/competition/main.cpp @@ -4,7 +4,6 @@ #include #include #include -#include #include #include #include @@ -16,12 +15,13 @@ #include #include #include -#include #include #include -#include -#include #include "../board.h" +#include "job.h" +#include "process.h" +#include "error.h" +#include "multilog.h" using namespace std; @@ -33,6 +33,10 @@ static const int num_matches = 5; static const int timeout_msec = 60000; +// TODO no globals +static MultiLog gMultiLog; + + static char hexchar(int n) { return "0123456789ABCDEF"[n]; } @@ -64,6 +68,16 @@ static void mkdirp(const string_view name) { } } +static int64_t fileLastModified(const string_view fname) { + struct stat st; + if (stat(fname.data(), &st) < 0) { + if (errno == EEXIST) return 0; + perror("stat"); + exit(1); + } + return st.st_mtim.tv_sec * 1000000LL + st.st_mtim.tv_nsec; +} + static int64_t gettimestamp() { struct timeval tv; gettimeofday(&tv, nullptr); @@ -89,9 +103,11 @@ struct MatchResult { struct Player { string fname, safename; vector results; + int64_t lastModified; Player(const string &fname) - : fname(fname), safename(makeSafe(fname)) {} + : fname(fname), safename(makeSafe(fname)), + lastModified(fileLastModified(fname)) {} }; int MatchResult::score() const { @@ -127,155 +143,6 @@ MatchResult MatchResult::inverted() const { return r; } -class StopCompetitionError : public runtime_error { -public: - StopCompetitionError() - : runtime_error("StopCompetitionError") {} - explicit StopCompetitionError(const string &what_arg) - : runtime_error(what_arg) {} - explicit StopCompetitionError(const char *what_arg) - : runtime_error(what_arg) {} -}; - -class Process { - string execname; - optional stderrRedirect; - pid_t pid = -1; - int infd = -1, outfd = -1; - - string readBuf; - -public: - Process(const string_view execname) - : execname(execname) {} - - void redirectStderr(const string_view fname) { - stderrRedirect = fname; - } - - void run() { - int stderrfd = -1; - if (stderrRedirect) { - stderrfd = open(stderrRedirect->data(), O_WRONLY|O_CREAT|O_TRUNC, 0644); - if (stderrfd < 0) { - perror("open"); - cout << endl << "ERROR: Cannot open player log file '" << *stderrRedirect << "'" << endl; - throw StopCompetitionError(); - } - } - - int pipefds[2]; - if (pipe(pipefds) < 0) { - perror("pipe"); - exit(1); - } - infd = pipefds[1]; - int child_in = pipefds[0]; - - if (pipe(pipefds) < 0) { - perror("pipe"); - exit(1); - } - outfd = pipefds[0]; - int child_out = pipefds[1]; - - pid = fork(); - if (pid < 0) { - perror("fork"); - exit(1); - } - - if (pid == 0) { - if (stderrRedirect) dup2(stderrfd, STDERR_FILENO); - dup2(child_in, STDIN_FILENO); - dup2(child_out, STDOUT_FILENO); - close(infd); - close(outfd); - - execlp(execname.data(), execname.data(), NULL); - cerr << endl << "ERROR: Error executing player file '" << execname << "'" << endl; - exit(255); - } - - if (stderrfd >= 0) close(stderrfd); - close(child_in); - close(child_out); - } - - void wait() { - while (true) { - int status; - if (waitpid(pid, &status, 0) < 0) { - if (errno == EINTR) continue; - perror("waitpid"); - break; - } - if (WIFEXITED(status)) break; - } - } - - void stop() { - if (pid != -1) kill(pid, SIGSTOP); - } - - void unStop() { - if (pid != -1) kill(pid, SIGCONT); - } - - bool writeLine(const string_view line) { - string str; - str.reserve(line.size() + 1); - str += line; - str += '\n'; - - size_t cursor = 0; - while (cursor < str.size()) { - ssize_t nw = write(infd, str.data() + cursor, str.size() - cursor); - if (nw < 0) { - if (errno == EINTR) continue; - perror("write"); - return false; - } - cursor += nw; - } - - return true; - } - - optional readLine() { - size_t idx = readBuf.find('\n'); - if (idx != string::npos) { - string res = readBuf.substr(0, idx); - readBuf = readBuf.substr(idx + 1); - return res; - } - - while (true) { - string s(1024, '\0'); - ssize_t nr = read(outfd, &s[0], s.size()); - if (nr < 0) { - if (errno == EINTR) continue; - perror("read"); - return nullopt; - } - s.resize(nr); - - idx = s.find('\n'); - if (idx != string::npos) { - string res = readBuf + s.substr(0, idx); - readBuf = s.substr(idx + 1); - return res; - } - - readBuf += s; - } - } - - void terminate() { - if (pid != -1) kill(pid, SIGTERM); - } -}; - static string gameCodeName(const Player &p1, const Player &p2, int index) { return p1.safename + "-" + p2.safename + "-" + to_string(index); } @@ -293,9 +160,13 @@ static string gameLogPath(const Player &p1, const Player &p2, int index) { } static optional readMatchCache(const Player &p1, const Player &p2, int index) { - ifstream f(matchCachePath(p1, p2, index)); + string path = matchCachePath(p1, p2, index); + ifstream f(path); if (!f) return nullopt; + int64_t cacheStamp = fileLastModified(path); + if (p1.lastModified > cacheStamp || p2.lastModified > cacheStamp) return nullopt; + MatchResult mres; string word; f >> word >> mres.ms1 >> mres.ms2; @@ -334,10 +205,11 @@ static void recordResult(Player &p1, Player &p2, const MatchResult &result) { } static void playMatch(Player &p1, Player &p2, int index) { - cout << p1.fname << " - " << p2.fname << ": " << flush; + int logId = gMultiLog.add(p1.fname + " - " + p2.fname + ": "); if (optional optres = readMatchCache(p1, p2, index)) { - cout << optres->describe(p1, p2) << " (cached)" << endl; + gMultiLog.append(logId, optres->describe(p1, p2) + " (cached)"); + gMultiLog.complete(logId); recordResult(p1, p2, *optres); return; } @@ -348,7 +220,7 @@ static void playMatch(Player &p1, Player &p2, int index) { string gamelog_path = gameLogPath(p1, p2, index); ofstream gamelog(gamelog_path); if (!gamelog) { - cout << endl << "ERROR opening game log file " << gamelog_path << endl; + cout << "ERROR opening game log file " << gamelog_path << endl; throw StopCompetitionError(); } @@ -367,7 +239,7 @@ static void playMatch(Player &p1, Player &p2, int index) { while (true) { for (int i = 0; i < 2; i++) { if (!procs[i].writeLine(lastMove)) { - cout << endl << "ERROR writing move to player " << i+1 << endl; + cout << "ERROR writing move to player " << i+1 << endl; throw StopCompetitionError(); } @@ -375,7 +247,7 @@ static void playMatch(Player &p1, Player &p2, int index) { int64_t start = gettimestamp(); optional oline = procs[i].readLine(); if (!oline) { - cout << endl << "ERROR reading move from player " << i+1 << endl; + cout << "ERROR reading move from player " << i+1 << endl; throw StopCompetitionError(); } @@ -392,11 +264,11 @@ static void playMatch(Player &p1, Player &p2, int index) { optional omv = Move::parse(lastMove); if (!omv) { - cout << endl << "ERROR in player " << i+1 << ": unreadable move '" << lastMove << "'" << endl; + cout << "ERROR in player " << i+1 << ": unreadable move '" << lastMove << "'" << endl; throw StopCompetitionError(); } if (!board.isValid(*omv, i == 0 ? -1 : 1)) { - cout << endl << "ERROR in player " << i+1 << ": invalid move " << *omv << endl; + cout << "ERROR in player " << i+1 << ": invalid move " << *omv << endl; throw StopCompetitionError(); } @@ -425,7 +297,8 @@ match_done: procs[i].wait(); } - cout << mres.describe(p1, p2) << endl; + gMultiLog.append(logId, mres.describe(p1, p2)); + gMultiLog.complete(logId); gamelog << "\nResult: " << mres.describe(p1, p2) << "\n" << "P1 took " << mres.ms1 / 1000000.0 << " seconds\n" @@ -435,17 +308,17 @@ match_done: recordResult(p1, p2, mres); } -static void playerPit(Player &p1, Player &p2) { +static void playerPit(Scheduler &scheduler, Player &p1, Player &p2) { for (int i = 0; i < num_matches; i++) { - playMatch(p1, p2, i + 1); + scheduler.submit([&p1, &p2, i]() { playMatch(p1, p2, i + 1); }); } } -static void fullCompetition(vector &players) { +static void fullCompetition(Scheduler &scheduler, vector &players) { for (size_t p1i = 0; p1i < players.size(); p1i++) { for (size_t p2i = p1i + 1; p2i < players.size(); p2i++) { - playerPit(players[p1i], players[p2i]); - playerPit(players[p2i], players[p1i]); + playerPit(scheduler, players[p1i], players[p2i]); + playerPit(scheduler, players[p2i], players[p1i]); } } } @@ -465,7 +338,9 @@ int main(int argc, char **argv) { mkdirp(playerlogdir); mkdirp(gamelogdir); - fullCompetition(players); + Scheduler scheduler(2); + fullCompetition(scheduler, players); + scheduler.finish(); vector> scores; for (const Player &player : players) { diff --git a/competition/multilog.cpp b/competition/multilog.cpp new file mode 100644 index 0000000..bf10ec6 --- /dev/null +++ b/competition/multilog.cpp @@ -0,0 +1,57 @@ +#include +#include +#include +#include "multilog.h" + + +static int uniqid() { + static int i = 0; + return i++; +} + + +MultiLog::Item::Item() + : id(uniqid()) {} + +int MultiLog::add(const string_view prefix) { + lock_guard guard(mut); + + items.emplace_back(); + items.back().line = prefix; + cout << prefix << endl; + return items.back().id; +} + +void MultiLog::append(int id, const string_view text) { + lock_guard guard(mut); + + assert(text.find('\n') == string::npos); + + size_t idx = findId(id); + items[idx].line += text; + redrawLine(idx); +} + +void MultiLog::complete(int id) { + lock_guard guard(mut); + + size_t idx = findId(id); + items[idx].complete = true; + size_t nextIdx; + for (nextIdx = 0; nextIdx < items.size(); nextIdx++) { + if (!items[nextIdx].complete) break; + } + if (nextIdx > 0) items.erase(items.begin(), items.begin() + (nextIdx - 1)); +} + +size_t MultiLog::findId(int id) { + for (int i = 0; i < (int)items.size(); i++) { + if (items[i].id == id) return i; + } + assert(false); +} + +void MultiLog::redrawLine(size_t idx) { + size_t offset = items.size() - idx; + cout << "\r\x1B[" << offset << "A\x1B[K" << items[idx].line << "\r\x1B[" << offset << "B" << flush; +} diff --git a/competition/multilog.h b/competition/multilog.h new file mode 100644 index 0000000..ba0ff4d --- /dev/null +++ b/competition/multilog.h @@ -0,0 +1,32 @@ +#pragma once + +#include +#include +#include + +using namespace std; + + +// Access is thread-safe +class MultiLog { + struct Item { + int id; + string line; + bool complete = false; + + Item(); + }; + + mutex mut; + vector items; + + // Methods assume mutex is taken. + size_t findId(int id); + void redrawLine(size_t idx); + +public: + // Returns id of new item + int add(const string_view prefix); + void append(int id, const string_view text); + void complete(int id); +}; diff --git a/competition/process.cpp b/competition/process.cpp new file mode 100644 index 0000000..a728401 --- /dev/null +++ b/competition/process.cpp @@ -0,0 +1,136 @@ +#include +#include +#include +#include +#include "process.h" +#include "error.h" + + +Process::Process(const string_view execname) + : execname(execname) {} + +void Process::redirectStderr(const string_view fname) { + stderrRedirect = fname; +} + +void Process::run() { + int stderrfd = -1; + if (stderrRedirect) { + stderrfd = open(stderrRedirect->data(), O_WRONLY|O_CREAT|O_TRUNC, 0644); + if (stderrfd < 0) { + perror("open"); + cout << endl << "ERROR: Cannot open player log file '" << *stderrRedirect << "'" << endl; + throw StopCompetitionError(); + } + } + + int pipefds[2]; + if (pipe(pipefds) < 0) { + perror("pipe"); + exit(1); + } + infd = pipefds[1]; + int child_in = pipefds[0]; + + if (pipe(pipefds) < 0) { + perror("pipe"); + exit(1); + } + outfd = pipefds[0]; + int child_out = pipefds[1]; + + pid = fork(); + if (pid < 0) { + perror("fork"); + exit(1); + } + + if (pid == 0) { + if (stderrRedirect) dup2(stderrfd, STDERR_FILENO); + dup2(child_in, STDIN_FILENO); + dup2(child_out, STDOUT_FILENO); + close(infd); + close(outfd); + + execlp(execname.data(), execname.data(), NULL); + cerr << endl << "ERROR: Error executing player file '" << execname << "'" << endl; + exit(255); + } + + if (stderrfd >= 0) close(stderrfd); + close(child_in); + close(child_out); +} + +void Process::wait() { + while (true) { + int status; + if (waitpid(pid, &status, 0) < 0) { + if (errno == EINTR) continue; + perror("waitpid"); + break; + } + if (WIFEXITED(status)) break; + } +} + +void Process::stop() { + if (pid != -1) kill(pid, SIGSTOP); +} + +void Process::unStop() { + if (pid != -1) kill(pid, SIGCONT); +} + +bool Process::writeLine(const string_view line) { + string str; + str.reserve(line.size() + 1); + str += line; + str += '\n'; + + size_t cursor = 0; + while (cursor < str.size()) { + ssize_t nw = write(infd, str.data() + cursor, str.size() - cursor); + if (nw < 0) { + if (errno == EINTR) continue; + perror("write"); + return false; + } + cursor += nw; + } + + return true; +} + +optional Process::readLine() { + size_t idx = readBuf.find('\n'); + if (idx != string::npos) { + string res = readBuf.substr(0, idx); + readBuf = readBuf.substr(idx + 1); + return res; + } + + while (true) { + string s(1024, '\0'); + ssize_t nr = read(outfd, &s[0], s.size()); + if (nr < 0) { + if (errno == EINTR) continue; + perror("read"); + return nullopt; + } + s.resize(nr); + + idx = s.find('\n'); + if (idx != string::npos) { + string res = readBuf + s.substr(0, idx); + readBuf = s.substr(idx + 1); + return res; + } + + readBuf += s; + } +} + +void Process::terminate() { + if (pid != -1) kill(pid, SIGTERM); +} diff --git a/competition/process.h b/competition/process.h new file mode 100644 index 0000000..ddd80b9 --- /dev/null +++ b/competition/process.h @@ -0,0 +1,32 @@ +#pragma once + +#include +#include +#include +#include + +using namespace std; + + +class Process { + string execname; + optional stderrRedirect; + pid_t pid = -1; + int infd = -1, outfd = -1; + + string readBuf; + +public: + Process(const string_view execname); + + void redirectStderr(const string_view fname); + + void run(); + void wait(); + void stop(); + void unStop(); + void terminate(); + + bool writeLine(const string_view line); + optional readLine(); +}; -- cgit v1.2.3-70-g09d2