summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Smeding <tom.smeding@gmail.com>2018-08-02 18:19:21 +0200
committerTom Smeding <tom.smeding@gmail.com>2018-08-02 18:21:15 +0200
commit134bec17a10030fc7d1b2060b57bf19799f6c780 (patch)
tree287deb14de65bd1676e04aa6bd40b72ef785c6af
parent8ff7ed58020b46d0bcb3b6dcbc0c5b02e85275a8 (diff)
Proper event-driven icmpd recv
-rw-r--r--.gitignore2
-rw-r--r--clientd.c29
-rw-r--r--icmpd.c59
-rw-r--r--icmpd.h8
-rw-r--r--serverd.c62
-rw-r--r--util.c4
-rw-r--r--util.h2
7 files changed, 117 insertions, 49 deletions
diff --git a/.gitignore b/.gitignore
index 042f948..21c78dc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,3 +3,5 @@ client
server
clientd
serverd
+
+bak/
diff --git a/clientd.c b/clientd.c
index a5e4d75..2462669 100644
--- a/clientd.c
+++ b/clientd.c
@@ -15,35 +15,42 @@
int main(void) {
struct icmpd *d = icmpd_create_client(inet_addr("127.0.0.1"));
+ int d_fd = icmpd_get_select_fd(d);
while (true) {
- if (icmpd_peek(d)) {
- struct icmpd_received msg = icmpd_recv(d);
- printf("Recv: %zu\n", msg.length);
- xxd(msg.data, msg.length);
- }
-
fd_set inset;
FD_ZERO(&inset);
FD_SET(0, &inset);
- struct timeval tv;
- tv.tv_sec = 1;
- tv.tv_usec = 0;
- int ret = select(1, &inset, NULL, NULL, &tv);
+ FD_SET(d_fd, &inset);
+
+ int ret = select(d_fd + 1, &inset, NULL, NULL, NULL);
if (ret < 0) {
if (errno == EINTR) continue;
perror("select");
return 1;
}
+
if (ret == 0) continue; // timeout
+
if (FD_ISSET(0, &inset)) {
char *line = NULL;
size_t linelen = 0;
+ errno = 0;
ssize_t nr = getline(&line, &linelen, stdin);
- assert(nr >= 0);
+ if (nr < 0) {
+ if (errno == 0) break; // EOF
+ perror("getline");
+ exit(1);
+ }
if (nr > 0) nr--;
icmpd_send(d, line, nr);
free(line);
}
+
+ if (FD_ISSET(d_fd, &inset) && icmpd_peek(d)) {
+ struct icmpd_received msg = icmpd_recv(d);
+ printf("Recv: %zu\n", msg.length);
+ xxd(msg.data, msg.length);
+ }
}
}
diff --git a/icmpd.c b/icmpd.c
index e3127c7..1cfc051 100644
--- a/icmpd.c
+++ b/icmpd.c
@@ -14,20 +14,21 @@
struct icmpd {
- struct mt_mutex mut; // for this struct
+ // CONSTANTS
- bool isserver; // constant
+ bool isserver;
+ uint32_t other_addr; // IPv4 address of other party
+ int id; // id for messages, -1 if not set
+ int signal_out, signal_in; // 1 byte can be read from signal_out for every message received
- // IPv4 address of other party
- uint32_t other_addr; // constant
+ // VARIABLES
- bool outstanding; // server: whether an echo request hasn't been matched yet
+ struct mt_mutex mut; // for the variables in this struct
- // id for messages, -1 if not set
- int id; // constant
+ bool outstanding; // server: whether an echo request hasn't been matched yet
- // Client: seqnum can change before every send
- // Server: seqnum is the sequence number of the last-received ECHO
+ // client: seqnum can change before every send
+ // server: seqnum is the sequence number of the last-received ECHO
// value is -1 if not set
int seqnum;
};
@@ -338,6 +339,9 @@ static void* thread_entry(void *arg) {
recvqu[recvqu_len].msg.seqnum = msg.seqnum;
recvqu_len++;
+ char c = 42;
+ assert(writeall(d->signal_in, &c, 1) == 1);
+
fprintf(stderr, "server recv: recvqu_len = %zu\n", recvqu_len);
}
@@ -391,6 +395,9 @@ static void* thread_entry(void *arg) {
recvqu[recvqu_len].msg.seqnum = msg.seqnum;
recvqu_len++;
+ char c = 42;
+ assert(writeall(d->signal_in, &c, 1) == 1);
+
fprintf(stderr, "client recv: recvqu_len = %zu\n", recvqu_len);
mt_mutex_lock(&d->mut);
@@ -437,12 +444,17 @@ static struct icmpd* icmpd_create_base(int id, bool isserver, uint32_t other_add
struct icmpd *d = malloc(sizeof(struct icmpd));
mt_mutex_init(&d->mut);
- d->id = id;
- d->seqnum = -1;
d->isserver = isserver;
d->other_addr = other_addr;
+ d->id = id;
+ d->seqnum = -1;
d->outstanding = false;
+ int pp[2];
+ assert(pipe(pp) == 0);
+ d->signal_in = pp[1];
+ d->signal_out = pp[0];
+
send_message(host_out, MSG_NEWCONN, &d, sizeof d);
return d;
@@ -471,6 +483,7 @@ void icmpd_server_set_outstanding(struct icmpd *d, int seqnum) {
}
bool icmpd_peek(struct icmpd *d) {
+#if 0
send_message(host_out, MSG_PEEK, &d, sizeof d);
struct msg_in msg = recv_message(host_in);
assert(msg.hdr.type == MSG_PEEK_ANS);
@@ -478,10 +491,30 @@ bool icmpd_peek(struct icmpd *d) {
bool ret = ((uint8_t*)msg.data)[0];
free(msg.data);
return ret;
+#else
+ fd_set inset;
+ FD_ZERO(&inset);
+ FD_SET(d->signal_out, &inset);
+ while (true) {
+ struct timeval tv;
+ tv.tv_sec = tv.tv_usec = 0;
+ int ret = select(d->signal_out + 1, &inset, NULL, NULL, &tv);
+ if (ret < 0) {
+ if (errno == EINTR) continue;
+ perror("select");
+ assert(false);
+ }
+ return ret > 0;
+ }
+#endif
}
struct icmpd_received icmpd_recv(struct icmpd *d) {
send_message(host_out, MSG_RECV, &d, sizeof d);
+
+ char c;
+ assert(readall(d->signal_out, &c, 1) == 1);
+
struct msg_in msg = recv_message(host_in);
assert(msg.hdr.type == MSG_RECV_ANS);
@@ -500,3 +533,7 @@ void icmpd_send(struct icmpd *d, const void *data, size_t length) {
send_message(host_out, MSG_SEND, &form, sizeof form);
}
+
+int icmpd_get_select_fd(struct icmpd *d) {
+ return d->signal_out;
+}
diff --git a/icmpd.h b/icmpd.h
index 7a87ec6..cb574ee 100644
--- a/icmpd.h
+++ b/icmpd.h
@@ -16,7 +16,8 @@ struct icmpd_received {
struct icmpd;
-// Pass 0 as client_addr to listen for messages from any IP (this channel will then only receive unassigned messages)
+// Pass 0 as client_addr to listen for messages from any IP (this channel
+// will then only receive unassigned messages)
// Pass -1 as id to listen for messages with any id
struct icmpd* icmpd_create_server(int id, uint32_t client_addr);
@@ -28,4 +29,9 @@ bool icmpd_peek(struct icmpd *d);
struct icmpd_received icmpd_recv(struct icmpd *d);
void icmpd_send(struct icmpd *d, const void *data, size_t length);
+// File descriptor that select(2) reports readable when a message might be ready.
+// Do not read from this file descriptor.
+// Use icmpd_peek() to verify that a message is actually ready.
+int icmpd_get_select_fd(struct icmpd *d);
+
void icmpd_destroy(struct icmpd *d);
diff --git a/serverd.c b/serverd.c
index 6f21530..b99e4ea 100644
--- a/serverd.c
+++ b/serverd.c
@@ -15,10 +15,42 @@ int main(void) {
struct icmpd *server = icmpd_create_server(-1, 0);
struct icmpd *conn = NULL;
+ int server_fd = icmpd_get_select_fd(server);
+ int conn_fd = -1;
+
while (true) {
- // printf("Iter\n");
+ fd_set inset;
+ FD_ZERO(&inset);
+ FD_SET(0, &inset);
+ FD_SET(server_fd, &inset);
+ if (conn) FD_SET(conn_fd, &inset);
+ int nfds = maxi(0, maxi(server_fd, conn_fd)) + 1;
+
+ int ret = select(nfds, &inset, NULL, NULL, NULL);
+ if (ret < 0) {
+ if (errno == EINTR) continue;
+ perror("select");
+ return 1;
+ }
+
+ if (ret == 0) continue; // timeout
+
+ if (FD_ISSET(0, &inset)) {
+ char *line = NULL;
+ size_t linelen = 0;
+ errno = 0;
+ ssize_t nr = getline(&line, &linelen, stdin);
+ if (nr < 0) {
+ if (errno == 0) break; // EOF
+ perror("getline");
+ exit(1);
+ }
+ if (nr > 0) nr--;
+ icmpd_send(conn, line, nr);
+ free(line);
+ }
- if (icmpd_peek(server)) {
+ if (FD_ISSET(server_fd, &inset) && icmpd_peek(server)) {
struct icmpd_received msg = icmpd_recv(server);
printf("Server recv: %zu\n", msg.length);
xxd(msg.data, msg.length);
@@ -32,6 +64,7 @@ int main(void) {
} else {
conn = icmpd_create_server(msg.id, msg.source_addr);
icmpd_server_set_outstanding(conn, msg.seqnum);
+ conn_fd = icmpd_get_select_fd(conn);
// icmpd_send(conn, "dankje", 6);
// icmpd_destroy(conn);
@@ -40,34 +73,11 @@ int main(void) {
free(msg.data);
}
- if (icmpd_peek(conn)) {
+ if (conn && FD_ISSET(conn_fd, &inset) && icmpd_peek(conn)) {
struct icmpd_received msg = icmpd_recv(conn);
printf("Connection recv: %zu\n", msg.length);
xxd(msg.data, msg.length);
free(msg.data);
}
-
- fd_set inset;
- FD_ZERO(&inset);
- FD_SET(0, &inset);
- struct timeval tv;
- tv.tv_sec = 1;
- tv.tv_usec = 0;
- int ret = select(1, &inset, NULL, NULL, &tv);
- if (ret < 0) {
- if (errno == EINTR) continue;
- perror("select");
- return 1;
- }
- if (ret == 0) continue; // timeout
- if (FD_ISSET(0, &inset)) {
- char *line = NULL;
- size_t linelen = 0;
- ssize_t nr = getline(&line, &linelen, stdin);
- assert(nr >= 0);
- if (nr > 0) nr--;
- icmpd_send(conn, line, nr);
- free(line);
- }
}
}
diff --git a/util.c b/util.c
index d4bd0ec..0f3f6bc 100644
--- a/util.c
+++ b/util.c
@@ -65,3 +65,7 @@ ssize_t writeall(int fd, const void *data, size_t length) {
}
return length;
}
+
+int maxi(int a, int b) {
+ return a > b ? a : b;
+}
diff --git a/util.h b/util.h
index 4b2b21c..3e228f9 100644
--- a/util.h
+++ b/util.h
@@ -8,3 +8,5 @@ int uniqid(void);
void xxd(const void *buf, size_t length);
ssize_t readall(int fd, void *data, size_t length);
ssize_t writeall(int fd, const void *data, size_t length);
+
+int maxi(int a, int b);