From 134bec17a10030fc7d1b2060b57bf19799f6c780 Mon Sep 17 00:00:00 2001 From: Tom Smeding Date: Thu, 2 Aug 2018 18:19:21 +0200 Subject: Proper event-driven icmpd recv --- .gitignore | 2 ++ clientd.c | 29 ++++++++++++++++++----------- icmpd.c | 59 ++++++++++++++++++++++++++++++++++++++++++++++++----------- icmpd.h | 8 +++++++- serverd.c | 62 ++++++++++++++++++++++++++++++++++++-------------------------- util.c | 4 ++++ util.h | 2 ++ 7 files changed, 117 insertions(+), 49 deletions(-) diff --git a/.gitignore b/.gitignore index 042f948..21c78dc 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ client server clientd serverd + +bak/ diff --git a/clientd.c b/clientd.c index a5e4d75..2462669 100644 --- a/clientd.c +++ b/clientd.c @@ -15,35 +15,42 @@ int main(void) { struct icmpd *d = icmpd_create_client(inet_addr("127.0.0.1")); + int d_fd = icmpd_get_select_fd(d); while (true) { - if (icmpd_peek(d)) { - struct icmpd_received msg = icmpd_recv(d); - printf("Recv: %zu\n", msg.length); - xxd(msg.data, msg.length); - } - fd_set inset; FD_ZERO(&inset); FD_SET(0, &inset); - struct timeval tv; - tv.tv_sec = 1; - tv.tv_usec = 0; - int ret = select(1, &inset, NULL, NULL, &tv); + FD_SET(d_fd, &inset); + + int ret = select(d_fd + 1, &inset, NULL, NULL, NULL); if (ret < 0) { if (errno == EINTR) continue; perror("select"); return 1; } + if (ret == 0) continue; // timeout + if (FD_ISSET(0, &inset)) { char *line = NULL; size_t linelen = 0; + errno = 0; ssize_t nr = getline(&line, &linelen, stdin); - assert(nr >= 0); + if (nr < 0) { + if (errno == 0) break; // EOF + perror("getline"); + exit(1); + } if (nr > 0) nr--; icmpd_send(d, line, nr); free(line); } + + if (FD_ISSET(d_fd, &inset) && icmpd_peek(d)) { + struct icmpd_received msg = icmpd_recv(d); + printf("Recv: %zu\n", msg.length); + xxd(msg.data, msg.length); + } } } diff --git a/icmpd.c b/icmpd.c index e3127c7..1cfc051 100644 --- a/icmpd.c +++ b/icmpd.c @@ -14,20 +14,21 @@ struct icmpd { - struct mt_mutex mut; // for this struct + // CONSTANTS - bool isserver; // constant + bool isserver; + uint32_t other_addr; // IPv4 address of other party + int id; // id for messages, -1 if not set + int signal_out, signal_in; // 1 byte can be read from signal_out for every message received - // IPv4 address of other party - uint32_t other_addr; // constant + // VARIABLES - bool outstanding; // server: whether an echo request hasn't been matched yet + struct mt_mutex mut; // for the variables in this struct - // id for messages, -1 if not set - int id; // constant + bool outstanding; // server: whether an echo request hasn't been matched yet - // Client: seqnum can change before every send - // Server: seqnum is the sequence number of the last-received ECHO + // client: seqnum can change before every send + // server: seqnum is the sequence number of the last-received ECHO // value is -1 if not set int seqnum; }; @@ -338,6 +339,9 @@ static void* thread_entry(void *arg) { recvqu[recvqu_len].msg.seqnum = msg.seqnum; recvqu_len++; + char c = 42; + assert(writeall(d->signal_in, &c, 1) == 1); + fprintf(stderr, "server recv: recvqu_len = %zu\n", recvqu_len); } @@ -391,6 +395,9 @@ static void* thread_entry(void *arg) { recvqu[recvqu_len].msg.seqnum = msg.seqnum; recvqu_len++; + char c = 42; + assert(writeall(d->signal_in, &c, 1) == 1); + fprintf(stderr, "client recv: recvqu_len = %zu\n", recvqu_len); mt_mutex_lock(&d->mut); @@ -437,12 +444,17 @@ static struct icmpd* icmpd_create_base(int id, bool isserver, uint32_t other_add struct icmpd *d = malloc(sizeof(struct icmpd)); mt_mutex_init(&d->mut); - d->id = id; - d->seqnum = -1; d->isserver = isserver; d->other_addr = other_addr; + d->id = id; + d->seqnum = -1; d->outstanding = false; + int pp[2]; + assert(pipe(pp) == 0); + d->signal_in = pp[1]; + d->signal_out = pp[0]; + send_message(host_out, MSG_NEWCONN, &d, sizeof d); return d; @@ -471,6 +483,7 @@ void icmpd_server_set_outstanding(struct icmpd *d, int seqnum) { } bool icmpd_peek(struct icmpd *d) { +#if 0 send_message(host_out, MSG_PEEK, &d, sizeof d); struct msg_in msg = recv_message(host_in); assert(msg.hdr.type == MSG_PEEK_ANS); @@ -478,10 +491,30 @@ bool icmpd_peek(struct icmpd *d) { bool ret = ((uint8_t*)msg.data)[0]; free(msg.data); return ret; +#else + fd_set inset; + FD_ZERO(&inset); + FD_SET(d->signal_out, &inset); + while (true) { + struct timeval tv; + tv.tv_sec = tv.tv_usec = 0; + int ret = select(d->signal_out + 1, &inset, NULL, NULL, &tv); + if (ret < 0) { + if (errno == EINTR) continue; + perror("select"); + assert(false); + } + return ret > 0; + } +#endif } struct icmpd_received icmpd_recv(struct icmpd *d) { send_message(host_out, MSG_RECV, &d, sizeof d); + + char c; + assert(readall(d->signal_out, &c, 1) == 1); + struct msg_in msg = recv_message(host_in); assert(msg.hdr.type == MSG_RECV_ANS); @@ -500,3 +533,7 @@ void icmpd_send(struct icmpd *d, const void *data, size_t length) { send_message(host_out, MSG_SEND, &form, sizeof form); } + +int icmpd_get_select_fd(struct icmpd *d) { + return d->signal_out; +} diff --git a/icmpd.h b/icmpd.h index 7a87ec6..cb574ee 100644 --- a/icmpd.h +++ b/icmpd.h @@ -16,7 +16,8 @@ struct icmpd_received { struct icmpd; -// Pass 0 as client_addr to listen for messages from any IP (this channel will then only receive unassigned messages) +// Pass 0 as client_addr to listen for messages from any IP (this channel +// will then only receive unassigned messages) // Pass -1 as id to listen for messages with any id struct icmpd* icmpd_create_server(int id, uint32_t client_addr); @@ -28,4 +29,9 @@ bool icmpd_peek(struct icmpd *d); struct icmpd_received icmpd_recv(struct icmpd *d); void icmpd_send(struct icmpd *d, const void *data, size_t length); +// File descriptor that select(2) reports readable when a message might be ready. +// Do not read from this file descriptor. +// Use icmpd_peek() to verify that a message is actually ready. +int icmpd_get_select_fd(struct icmpd *d); + void icmpd_destroy(struct icmpd *d); diff --git a/serverd.c b/serverd.c index 6f21530..b99e4ea 100644 --- a/serverd.c +++ b/serverd.c @@ -15,10 +15,42 @@ int main(void) { struct icmpd *server = icmpd_create_server(-1, 0); struct icmpd *conn = NULL; + int server_fd = icmpd_get_select_fd(server); + int conn_fd = -1; + while (true) { - // printf("Iter\n"); + fd_set inset; + FD_ZERO(&inset); + FD_SET(0, &inset); + FD_SET(server_fd, &inset); + if (conn) FD_SET(conn_fd, &inset); + int nfds = maxi(0, maxi(server_fd, conn_fd)) + 1; + + int ret = select(nfds, &inset, NULL, NULL, NULL); + if (ret < 0) { + if (errno == EINTR) continue; + perror("select"); + return 1; + } + + if (ret == 0) continue; // timeout + + if (FD_ISSET(0, &inset)) { + char *line = NULL; + size_t linelen = 0; + errno = 0; + ssize_t nr = getline(&line, &linelen, stdin); + if (nr < 0) { + if (errno == 0) break; // EOF + perror("getline"); + exit(1); + } + if (nr > 0) nr--; + icmpd_send(conn, line, nr); + free(line); + } - if (icmpd_peek(server)) { + if (FD_ISSET(server_fd, &inset) && icmpd_peek(server)) { struct icmpd_received msg = icmpd_recv(server); printf("Server recv: %zu\n", msg.length); xxd(msg.data, msg.length); @@ -32,6 +64,7 @@ int main(void) { } else { conn = icmpd_create_server(msg.id, msg.source_addr); icmpd_server_set_outstanding(conn, msg.seqnum); + conn_fd = icmpd_get_select_fd(conn); // icmpd_send(conn, "dankje", 6); // icmpd_destroy(conn); @@ -40,34 +73,11 @@ int main(void) { free(msg.data); } - if (icmpd_peek(conn)) { + if (conn && FD_ISSET(conn_fd, &inset) && icmpd_peek(conn)) { struct icmpd_received msg = icmpd_recv(conn); printf("Connection recv: %zu\n", msg.length); xxd(msg.data, msg.length); free(msg.data); } - - fd_set inset; - FD_ZERO(&inset); - FD_SET(0, &inset); - struct timeval tv; - tv.tv_sec = 1; - tv.tv_usec = 0; - int ret = select(1, &inset, NULL, NULL, &tv); - if (ret < 0) { - if (errno == EINTR) continue; - perror("select"); - return 1; - } - if (ret == 0) continue; // timeout - if (FD_ISSET(0, &inset)) { - char *line = NULL; - size_t linelen = 0; - ssize_t nr = getline(&line, &linelen, stdin); - assert(nr >= 0); - if (nr > 0) nr--; - icmpd_send(conn, line, nr); - free(line); - } } } diff --git a/util.c b/util.c index d4bd0ec..0f3f6bc 100644 --- a/util.c +++ b/util.c @@ -65,3 +65,7 @@ ssize_t writeall(int fd, const void *data, size_t length) { } return length; } + +int maxi(int a, int b) { + return a > b ? a : b; +} diff --git a/util.h b/util.h index 4b2b21c..3e228f9 100644 --- a/util.h +++ b/util.h @@ -8,3 +8,5 @@ int uniqid(void); void xxd(const void *buf, size_t length); ssize_t readall(int fd, void *data, size_t length); ssize_t writeall(int fd, const void *data, size_t length); + +int maxi(int a, int b); -- cgit v1.2.3