From a239f9feadd015fa91d391df01365dcade8ce503 Mon Sep 17 00:00:00 2001 From: Tom Smeding Date: Wed, 1 Aug 2018 23:12:41 +0200 Subject: Threaded communication (icmpd) --- .gitignore | 2 + Makefile | 12 +- client.c | 10 +- clientd.c | 49 ++++++ icmp.c | 184 ++++++++++++--------- icmp.h | 9 ++ icmp_client.h | 23 +-- icmp_server.h | 19 +-- icmpd.c | 502 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ icmpd.h | 31 ++++ mt.c | 29 ++++ mt.h | 23 +++ run_server.sh | 2 +- server.c | 14 +- serverd.c | 73 +++++++++ util.c | 37 +++++ util.h | 4 + 17 files changed, 904 insertions(+), 119 deletions(-) create mode 100644 clientd.c create mode 100644 icmpd.c create mode 100644 icmpd.h create mode 100644 mt.c create mode 100644 mt.h create mode 100644 serverd.c diff --git a/.gitignore b/.gitignore index 1c102fa..042f948 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ *.o client server +clientd +serverd diff --git a/Makefile b/Makefile index b1de31f..eadce97 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,8 @@ CC = gcc -CFLAGS = -Wall -Wextra -O2 -g -std=c11 -fwrapv +CFLAGS = -Wall -Wextra -O2 -g -std=c11 -fwrapv -pthread +LDFLAGS = -pthread -TARGETS = server client +TARGETS = server client serverd clientd SOURCES = $(filter-out $(patsubst %,%.c,$(TARGETS)),$(wildcard *.c)) OBJECTS = $(patsubst %.c,%.o,$(SOURCES)) @@ -14,11 +15,8 @@ clean: rm -f $(TARGETS) *.o -server: server.o $(OBJECTS) - $(CC) $(CFLAGS) $^ -o $@ - -client: client.o $(OBJECTS) - $(CC) $(CFLAGS) $^ -o $@ +$(TARGETS): %: %.o $(OBJECTS) + $(CC) $^ -o $@ $(LDFLAGS) %.o: %.c $(wildcard *.h) $(CC) $(CFLAGS) -c -o $@ $< diff --git a/client.c b/client.c index 4c1a32f..aa9b13c 100644 --- a/client.c +++ b/client.c @@ -20,10 +20,14 @@ int main(void) { // const char *ip_address = "192.168.43.220"; // const char *ip_address = "198.211.118.67"; // tomsmeding.com - struct icmp_client_incoming reply = - icmp_client_communicate(sock, ip_address, 42, 1234, "kaashandel", 10); + if (icmp_client_send(sock, inet_addr(ip_address), 1234, "kaashandel", 10) < 0) { + perror("icmp_client_send"); + return 1; + } + + struct icmp_incoming reply = icmp_client_receive(sock); if (reply.data == NULL) { - perror("icmp_client_communicate"); + perror("icmp_client_receive"); return 1; } diff --git a/clientd.c b/clientd.c new file mode 100644 index 0000000..a5e4d75 --- /dev/null +++ b/clientd.c @@ -0,0 +1,49 @@ +#define _GNU_SOURCE // getline +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "icmpd.h" +#include "util.h" + +int main(void) { + struct icmpd *d = icmpd_create_client(inet_addr("127.0.0.1")); + + while (true) { + if (icmpd_peek(d)) { + struct icmpd_received msg = icmpd_recv(d); + printf("Recv: %zu\n", msg.length); + xxd(msg.data, msg.length); + } + + fd_set inset; + FD_ZERO(&inset); + FD_SET(0, &inset); + struct timeval tv; + tv.tv_sec = 1; + tv.tv_usec = 0; + int ret = select(1, &inset, NULL, NULL, &tv); + if (ret < 0) { + if (errno == EINTR) continue; + perror("select"); + return 1; + } + if (ret == 0) continue; // timeout + if (FD_ISSET(0, &inset)) { + char *line = NULL; + size_t linelen = 0; + ssize_t nr = getline(&line, &linelen, stdin); + assert(nr >= 0); + if (nr > 0) nr--; + icmpd_send(d, line, nr); + free(line); + } + } +} diff --git a/icmp.c b/icmp.c index bef1fd6..7951b91 100644 --- a/icmp.c +++ b/icmp.c @@ -21,6 +21,7 @@ static void make_sockaddr_u32(struct sockaddr_in *dst, uint32_t addr) { dst->sin_addr.s_addr = addr; } +__attribute__((unused)) static void make_sockaddr(struct sockaddr_in *dst, const char *ip_address) { make_sockaddr_u32(dst, inet_addr(ip_address)); } @@ -43,6 +44,8 @@ static uint16_t compute_checksum(const void *buf_, size_t buflen) { return res; } +// TODO: Maybe SOCK_{DGRAM,RAW} | SOCK_NONBLOCK to make non-blocking sockets; +// this allows non-blocking reads in the icmpd thread. int icmp_client_open_socket(void) { return socket(PF_INET, SOCK_DGRAM, IPPROTO_ICMP); } @@ -51,72 +54,23 @@ int icmp_server_open_socket(void) { return socket(AF_INET, SOCK_RAW, IPPROTO_ICMP); } -struct icmp_client_incoming icmp_client_communicate( - int sock, const char *ip_address, - int id, int seqnum, - const void *data_, size_t length) { - - static uint8_t buffer[ICMP_MAX_PAYLOAD_LENGTH]; - - const uint8_t *data = (const uint8_t*)data_; - - assert(length <= ICMP_MAX_PAYLOAD_LENGTH); - - struct icmp_client_incoming errret = - (struct icmp_client_incoming){.data = NULL, .length = 0, .id = 0, .seqnum = 0}; +struct icmp_incoming icmp_client_communicate( + int sock, uint32_t addr, + int seqnum, + const void *data, size_t length) { - struct sockaddr_in addr; - make_sockaddr(&addr, ip_address); - - struct icmp_echo msg; - memset(&msg, 0, sizeof msg); - msg.type = ICMP_ECHO; - msg.code = 0; - msg.id = htons(id); - msg.seqnum = htons(seqnum); - - memcpy(msg.payload, data, length); - - if (sendto(sock, &msg, ICMP_PAYLOAD_OFFSET + length, 0, (struct sockaddr*)&addr, sizeof addr) < 0) { - return errret; + if (icmp_client_send(sock, addr, seqnum, data, length) < 0) { + return (struct icmp_incoming){.data = NULL, .length = 0, .id = 0, .seqnum = 0, .source_addr = 0}; } - struct msghdr replyhdr; - memset(&replyhdr, 0, sizeof replyhdr); - - struct iovec iov1; - memset(&iov1, 0, sizeof iov1); - iov1.iov_base = &msg; - iov1.iov_len = sizeof msg; - - replyhdr.msg_name = &addr; - replyhdr.msg_namelen = sizeof addr; - replyhdr.msg_iov = &iov1; - replyhdr.msg_iovlen = 1; - - ssize_t nr = recvmsg(sock, &replyhdr, 0); - if (nr < 0) { - return errret; - } - - size_t payloadlen = nr - ICMP_PAYLOAD_OFFSET; - assert(payloadlen <= MAX_DATAGRAM_SIZE); - - memcpy(buffer, msg.payload, payloadlen); - - return (struct icmp_client_incoming){ - .data = buffer, - .length = payloadlen, - .id = ntohs(msg.id), - .seqnum = ntohs(msg.seqnum) - }; + return icmp_client_receive(sock); } -struct icmp_server_incoming icmp_server_receive(int sock) { +struct icmp_incoming icmp_server_receive(int sock) { static uint8_t buffer[ICMP_MAX_PAYLOAD_LENGTH]; - struct icmp_server_incoming errret = - (struct icmp_server_incoming){.data = NULL, .length = 0, .id = 0, .seqnum = 0, .source_addr = 0}; + struct icmp_incoming errret = + (struct icmp_incoming){.data = NULL, .length = 0, .id = 0, .seqnum = 0, .source_addr = 0}; char buf[MAX_IP_PACKET_SIZE]; @@ -140,9 +94,6 @@ struct icmp_server_incoming icmp_server_receive(int sock) { // buf now contains received data starting at the IP header - // printf("Full packet received:\n"); - // xxd(buf, nr); - struct iphdr *hdr = (struct iphdr*)buf; int hdr_len = hdr->ihl * 4; uint32_t saddr = hdr->saddr; @@ -150,20 +101,13 @@ struct icmp_server_incoming icmp_server_receive(int sock) { struct icmp_echo *msg = (struct icmp_echo*)(buf + hdr_len); int msg_len = nr - hdr_len; int payloadlen = msg_len - offsetof(struct icmp_echo, payload); - // printf("Received: type %u code %u id %hu seqnum %hu payload:\n", - // (unsigned)msg->type, (unsigned)msg->code, msg->id, msg->seqnum); - // xxd(msg->payload, payloadlen); - - if (msg->type != ICMP_ECHO && msg->type != ICMP_ECHOREPLY) { - // printf("Not an ICMP_ECHO or ICMP_ECHOREPLY, ignoring\n"); - continue; - } else { + + if (msg->type == ICMP_ECHO) { memcpy(buffer, msg->payload, payloadlen); - return (struct icmp_server_incoming){ + return (struct icmp_incoming){ .data = buffer, .length = payloadlen, - .type = msg->type, .id = ntohs(msg->id), .seqnum = ntohs(msg->seqnum), .source_addr = saddr @@ -172,19 +116,64 @@ struct icmp_server_incoming icmp_server_receive(int sock) { } } -int icmp_server_send_reply( +struct icmp_incoming icmp_client_receive(int sock) { + static uint8_t buffer[ICMP_MAX_PAYLOAD_LENGTH]; + + struct icmp_incoming errret = + (struct icmp_incoming){.data = NULL, .length = 0, .id = 0, .seqnum = 0, .source_addr = 0}; + + struct msghdr replyhdr; + memset(&replyhdr, 0, sizeof replyhdr); + + struct icmp_echo msg; + + struct iovec iov1; + memset(&iov1, 0, sizeof iov1); + iov1.iov_base = &msg; + iov1.iov_len = sizeof msg; + + struct sockaddr_in addr; + + replyhdr.msg_name = &addr; + replyhdr.msg_namelen = sizeof addr; + replyhdr.msg_iov = &iov1; + replyhdr.msg_iovlen = 1; + + ssize_t nr = recvmsg(sock, &replyhdr, 0); + if (nr < 0) { + return errret; + } + + size_t payloadlen = nr - ICMP_PAYLOAD_OFFSET; + assert(payloadlen <= MAX_DATAGRAM_SIZE); + + memcpy(buffer, msg.payload, payloadlen); + + return (struct icmp_incoming){ + .data = buffer, + .length = payloadlen, + .id = ntohs(msg.id), + .seqnum = ntohs(msg.seqnum), + .source_addr = addr.sin_addr.s_addr + }; +} + +static int icmp_server_send( int sock, uint32_t addr_u32, int id, int seqnum, - const void *data_, size_t length) { + const void *data_, size_t length, + int type) { const uint8_t *data = (const uint8_t*)data_; + assert(length <= ICMP_MAX_PAYLOAD_LENGTH); + struct sockaddr_in addr; make_sockaddr_u32(&addr, addr_u32); struct icmp_echo msg; memset(&msg, 0, sizeof msg); - msg.type = ICMP_ECHOREPLY; + msg.type = type; msg.code = 0; msg.id = htons(id); msg.seqnum = htons(seqnum); @@ -201,3 +190,48 @@ int icmp_server_send_reply( return 0; } + +int icmp_server_send_reply( + int sock, uint32_t addr_u32, + int id, int seqnum, + const void *data, size_t length) { + + return icmp_server_send(sock, addr_u32, id, seqnum, data, length, ICMP_ECHOREPLY); +} + +int icmp_server_send_echo( + int sock, uint32_t addr_u32, + int id, int seqnum, + const void *data, size_t length) { + + return icmp_server_send(sock, addr_u32, id, seqnum, data, length, ICMP_ECHO); +} + +int icmp_client_send( + int sock, uint32_t addr_u32, + int seqnum, + const void *data_, size_t length) { + + const uint8_t *data = (const uint8_t*)data_; + + assert(length <= ICMP_MAX_PAYLOAD_LENGTH); + + struct sockaddr_in addr; + make_sockaddr_u32(&addr, addr_u32); + + struct icmp_echo msg; + memset(&msg, 0, sizeof msg); + msg.type = ICMP_ECHO; + msg.code = 0; + msg.id = 0; // overwritten by kernel + msg.seqnum = htons(seqnum); + + if (data == NULL) assert(length == 0); + else memcpy(msg.payload, data, length); + + if (sendto(sock, &msg, ICMP_PAYLOAD_OFFSET + length, 0, (struct sockaddr*)&addr, sizeof addr) < 0) { + return -1; + } + + return 0; +} diff --git a/icmp.h b/icmp.h index 4cccb95..eed1714 100644 --- a/icmp.h +++ b/icmp.h @@ -22,3 +22,12 @@ struct __attribute__((packed)) icmp_echo { #define ICMP_MAX_PAYLOAD_LENGTH (MAX_DATAGRAM_SIZE - ICMP_PAYLOAD_OFFSET) #define ICMP_SAFE_PAYLOAD_LENGTH (MIN_MTU - IP_HEADER_SIZE - ICMP_PAYLOAD_OFFSET) + + +struct icmp_incoming { + const uint8_t *data; // points to internal buffer + size_t length; // length of 'data' + + int id, seqnum; + uint32_t source_addr; +}; diff --git a/icmp_client.h b/icmp_client.h index 78cfe7a..3474f5d 100644 --- a/icmp_client.h +++ b/icmp_client.h @@ -3,21 +3,24 @@ #include "icmp.h" -struct icmp_client_incoming { - const uint8_t *data; // points to internal buffer - size_t length; // length of 'data' - - int id, seqnum; -}; - // Returns -1 on error with errno. int icmp_client_open_socket(void); +// Returns {.data=NULL} on error with errno. +// Receives only ECHOREPLY. +struct icmp_incoming icmp_client_receive(int sock); + +// Only actual IPv4 addresses allowed. Sends data in 'data' with length 'length'. +int icmp_client_send( + int sock, uint32_t addr, + int seqnum, + const void *data, size_t length); + // Only actual IPv4 addresses allowed. Sends data in 'data' with length 'length', and // returns pointer to internal buffer with reply data. Buffer is invalidated on next // call to the function. // Returns {.data=NULL} on error with errno. -struct icmp_client_incoming icmp_client_communicate( - int sock, const char *ip_address, - int id, int seqnum, +struct icmp_incoming icmp_client_communicate( + int sock, uint32_t addr, + int seqnum, const void *data, size_t length); diff --git a/icmp_server.h b/icmp_server.h index fbf4158..f1f4e9f 100644 --- a/icmp_server.h +++ b/icmp_server.h @@ -2,28 +2,13 @@ #include "icmp.h" -#ifndef ICMP_ECHO -#define ICMP_ECHO 8 -#endif -#ifndef ICMP_ECHOREPLY -#define ICMP_ECHOREPLY 0 -#endif - - -struct icmp_server_incoming { - const uint8_t *data; // points to internal buffer - size_t length; // length of 'data' - - int type; // ICMP_ECHO or ICMP_ECHOREPLY - int id, seqnum; - uint32_t source_addr; -}; // Returns -1 on error with errno. int icmp_server_open_socket(void); // Returns {.data=NULL} on error with errno. -struct icmp_server_incoming icmp_server_receive(int sock); +// Receives only ECHO. +struct icmp_incoming icmp_server_receive(int sock); // Returns -1 on error with errno. int icmp_server_send_reply( diff --git a/icmpd.c b/icmpd.c new file mode 100644 index 0000000..e3127c7 --- /dev/null +++ b/icmpd.c @@ -0,0 +1,502 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "icmpd.h" +#include "util.h" +#include "icmp_server.h" +#include "icmp_client.h" + + +struct icmpd { + struct mt_mutex mut; // for this struct + + bool isserver; // constant + + // IPv4 address of other party + uint32_t other_addr; // constant + + bool outstanding; // server: whether an echo request hasn't been matched yet + + // id for messages, -1 if not set + int id; // constant + + // Client: seqnum can change before every send + // Server: seqnum is the sequence number of the last-received ECHO + // value is -1 if not set + int seqnum; +}; + +// Global state +bool thread_running = false; +struct mt_mutex thread_mutex; // protecting these global values +struct mt_thread thread; +int thread_in, thread_out; +int host_in, host_out; + + +// Arguments to messages are listed in comments. +enum { + MSG_THREAD_UP, // - + MSG_NEWCONN, // struct icmpd* + MSG_ENDCONN, // struct icmpd*; free() done by thread + MSG_PEEK, // struct icmpd* + MSG_PEEK_ANS, // uint8_t (bool) + MSG_RECV, // struct icmpd* + MSG_RECV_ANS, // struct icmpd_received + MSG_SEND, // struct msg_form_send +}; + +struct msg_header { + int type; + size_t size; // size in bytes of argument +}; + +struct msg_in { + struct msg_header hdr; + void *data; // should be free()'d +}; + +struct msg_form_send { + struct icmpd *d; + void *data; // to be free()'d by thread when sent + size_t length; +}; + + +static struct icmpd* find_conn(int id, uint32_t addr, struct icmpd **conns, size_t conns_len) { + struct icmpd *ret = NULL; + for (size_t i = 0; i < conns_len; i++) { + struct icmpd *d = conns[i]; + if ((d->id == -1 || d->id == id) && + (d->other_addr == 0 || d->other_addr == addr)) { + if (ret == NULL || (ret->other_addr == 0 && d->other_addr != 0)) { + ret = d; + } else { + fprintf(stderr, "icmpd thread: warning: multiple connections match id=%d addr=%x\n", + id, addr); + break; + } + } + } + return ret; +} + +static void send_message(int sock, int type, const void *data, size_t size) { + struct msg_header head = {type, size}; + + assert(writeall(sock, &head, sizeof head) == sizeof head); + assert(writeall(sock, data, size) == (ssize_t)size); +} + +// On error, {.type=-1}, errno is set +static struct msg_in recv_message(int sock) { + struct msg_in msg; + + int ret = readall(sock, &msg.hdr, sizeof msg.hdr); + if (ret < 0) return (struct msg_in){.hdr.type = -1, .hdr.size = 0, .data = NULL}; + assert(ret == sizeof msg.hdr); + + msg.data = malloc(msg.hdr.size); + assert(readall(sock, msg.data, msg.hdr.size) == (ssize_t)msg.hdr.size); + + return msg; +} + +static void client_increment_seqnum(struct icmpd *d) { + d->seqnum = (d->seqnum + 1) & 0xffff; +} + +static void* thread_entry(void *arg) { + (void)arg; + + send_message(thread_out, MSG_THREAD_UP, NULL, 0); + + struct recvqu_item { + struct icmpd *d; + struct icmpd_received msg; + }; + + struct sendqu_item { + struct icmpd *d; + void *data; + size_t length; + }; + + size_t conns_cap = 8, conns_len = 0; + struct icmpd **conns = malloc(conns_cap * sizeof(struct icmpd*)); + + int sock_server = -1, sock_client = -1; + + size_t recvqu_cap = 8, recvqu_len = 0; + struct recvqu_item *recvqu = malloc(recvqu_cap * sizeof(struct recvqu_item)); + + // Server messages submitted for transmission while no non-ponged client + // ping is available. + size_t sendqu_cap = 8, sendqu_len = 0; + struct sendqu_item *sendqu = malloc(sendqu_cap * sizeof(struct sendqu_item)); + + while (true) { + fd_set inset; + FD_ZERO(&inset); + FD_SET(thread_in, &inset); + if (sock_server >= 0) FD_SET(sock_server, &inset); + if (sock_client >= 0) FD_SET(sock_client, &inset); + + int nfds = thread_in; + if (sock_server > nfds) nfds = sock_server; + if (sock_client > nfds) nfds = sock_client; + + int ret = select(nfds + 1, &inset, NULL, NULL, NULL); + if (ret < 0) { + if (errno == EINTR) continue; + perror("select"); + assert(false); + } + assert(ret > 0); + + if (FD_ISSET(thread_in, &inset)) { + struct msg_in msg = recv_message(thread_in); + if (msg.hdr.type == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) continue; + perror("recv_message"); + assert(false); + } + + switch (msg.hdr.type) { + case MSG_NEWCONN: { + struct icmpd *d = *(struct icmpd**)msg.data; + free(msg.data); + if (conns_len == conns_cap) { + conns_cap *= 2; + conns = realloc(conns, conns_cap * sizeof(struct icmpd*)); + } + conns[conns_len++] = d; + + if (d->isserver) { + if (sock_server < 0) { + sock_server = icmp_server_open_socket(); + if (sock_server < 0) { + perror("icmp_server_open_socket"); + assert(false); + } + } + } else { + if (sock_client < 0) { + sock_client = icmp_client_open_socket(); + if (sock_client < 0) { + perror("icmp_client_open_socket"); + assert(false); + } + } + } + break; + } + + case MSG_ENDCONN: { + struct icmpd *d = *(struct icmpd**)msg.data; + free(msg.data); + + bool found = false; + for (size_t i = 0; i < conns_len; i++) { + if (conns[i] == d) { + memmove(conns + i, conns + i + 1, conns_len - i - 1); + conns_len--; + found = true; + break; + } + } + assert(found); + + mt_mutex_destroy(&d->mut); + free(d); + break; + } + + case MSG_PEEK: { + struct icmpd *d = *(struct icmpd**)msg.data; + free(msg.data); + + uint8_t ret = 0; + for (size_t i = 0; i < recvqu_len; i++) { + if (recvqu[i].d == d) { + ret = 1; + break; + } + } + + send_message(thread_out, MSG_PEEK_ANS, &ret, 1); + break; + } + + case MSG_RECV: { + struct icmpd *d = *(struct icmpd**)msg.data; + free(msg.data); + + ssize_t index = -1; + for (size_t i = 0; i < recvqu_len; i++) { + if (recvqu[i].d == d) { + index = i; + break; + } + } + + assert(index != -1); // TODO: make host wait for something + + send_message(thread_out, MSG_RECV_ANS, + &recvqu[index].msg, sizeof recvqu[index].msg); + + memcpy(recvqu + index, recvqu + index + 1, + (recvqu_len - index - 1) * sizeof recvqu[0]); + + recvqu_len--; + break; + } + + case MSG_SEND: { + struct msg_form_send form = *(struct msg_form_send*)msg.data; + free(msg.data); + + struct icmpd *d = form.d; + + if (d->isserver) { + mt_mutex_lock(&d->mut); + bool outstanding = d->outstanding; + int id = d->id, seqnum = d->seqnum; + mt_mutex_unlock(&d->mut); + + if (outstanding) { + int ret = icmp_server_send_reply( + sock_server, d->other_addr, id, seqnum, + form.data, form.length); + if (ret < 0) { + perror("icmp_server_send_reply"); + } + } else { + if (sendqu_len == sendqu_cap) { + sendqu_cap *= 2; + sendqu = realloc(sendqu, sendqu_cap * sizeof sendqu[0]); + } + + sendqu[sendqu_len].d = d; + sendqu[sendqu_len].data = form.data; + sendqu[sendqu_len].length = form.length; + sendqu_len++; + } + } else { + mt_mutex_lock(&d->mut); + client_increment_seqnum(d); + int seqnum = d->seqnum; + mt_mutex_unlock(&d->mut); + + int ret = icmp_client_send( + sock_client, d->other_addr, seqnum, + form.data, form.length); + if (ret < 0) { + perror("icmp_client_send"); + } + } + + break; + } + } + } + + if (FD_ISSET(sock_server, &inset)) { + struct icmp_incoming msg = icmp_server_receive(sock_server); + struct icmpd *d = find_conn(msg.id, msg.source_addr, conns, conns_len); + + if (d == NULL) { + fprintf(stderr, "icmpd thread: ping received with unknown id=%d addr=%x\n", + msg.id, msg.source_addr); + int ret = icmp_server_send_reply( + sock_server, msg.source_addr, msg.id, msg.seqnum, + msg.data, msg.length); + if (ret < 0) { + perror("icmpd thread: unknown ping reply: icmp_server_send_reply"); + } + } else { + if (msg.length == 0) { + fprintf(stderr, "server recv: empty\n"); + } else { + if (recvqu_len == recvqu_cap) { + recvqu_cap *= 2; + recvqu = realloc(recvqu, recvqu_cap * sizeof(struct recvqu_item)); + } + + recvqu[recvqu_len].d = d; + recvqu[recvqu_len].msg.data = malloc(msg.length); + memcpy(recvqu[recvqu_len].msg.data, msg.data, msg.length); + recvqu[recvqu_len].msg.length = msg.length; + recvqu[recvqu_len].msg.source_addr = msg.source_addr; + recvqu[recvqu_len].msg.id = msg.id; + recvqu[recvqu_len].msg.seqnum = msg.seqnum; + recvqu_len++; + + fprintf(stderr, "server recv: recvqu_len = %zu\n", recvqu_len); + } + + ssize_t index = -1; + for (size_t i = 0; i < sendqu_len; i++) { + if (sendqu[i].d == d) { + index = i; + break; + } + } + + if (index != -1) { + int ret = icmp_server_send_reply( + sock_server, d->other_addr, d->id, d->seqnum, + sendqu[index].data, sendqu[index].length); + if (ret < 0) { + perror("icmp_server_send_reply"); + } + + memmove(sendqu + index, sendqu + index + 1, + (sendqu_len - index - 1) * sizeof sendqu[0]); + sendqu_len--; + } else { + mt_mutex_lock(&d->mut); + d->outstanding = true; + d->seqnum = msg.seqnum; + mt_mutex_unlock(&d->mut); + } + } + } + + if (FD_ISSET(sock_client, &inset)) { + struct icmp_incoming msg = icmp_client_receive(sock_client); + struct icmpd *d = find_conn(msg.id, msg.source_addr, conns, conns_len); + + if (d == NULL) { + fprintf(stderr, "icmpd thread: pong received with unknown id=%d addr=%x\n", + msg.id, msg.source_addr); + } else { + if (recvqu_len == recvqu_cap) { + recvqu_cap *= 2; + recvqu = realloc(recvqu, recvqu_cap * sizeof(struct recvqu_item)); + } + + recvqu[recvqu_len].d = d; + recvqu[recvqu_len].msg.data = malloc(msg.length); + memcpy(recvqu[recvqu_len].msg.data, msg.data, msg.length); + recvqu[recvqu_len].msg.length = msg.length; + recvqu[recvqu_len].msg.source_addr = msg.source_addr; + recvqu[recvqu_len].msg.id = msg.id; + recvqu[recvqu_len].msg.seqnum = msg.seqnum; + recvqu_len++; + + fprintf(stderr, "client recv: recvqu_len = %zu\n", recvqu_len); + + mt_mutex_lock(&d->mut); + client_increment_seqnum(d); + int seqnum = d->seqnum; + mt_mutex_unlock(&d->mut); + + int ret = icmp_client_send(sock_client, d->other_addr, seqnum, NULL, 0); + if (ret < 0) { + perror("icmp_client_send"); + } + } + } + } + + return NULL; +} + +static void spawn_icmpd_thread(void) { + int pp[2]; + + assert(pipe(pp) == 0); + thread_out = pp[1]; + host_in = pp[0]; + + assert(pipe(pp) == 0); + host_out = pp[1]; + thread_in = pp[0]; + + mt_mutex_init(&thread_mutex); + + mt_thread_create(&thread, thread_entry, NULL); + + struct msg_in msg = recv_message(host_in); + assert(msg.hdr.type == MSG_THREAD_UP); + free(msg.data); +} + +static struct icmpd* icmpd_create_base(int id, bool isserver, uint32_t other_addr) { + if (!thread_running) { + spawn_icmpd_thread(); + thread_running = true; + } + + struct icmpd *d = malloc(sizeof(struct icmpd)); + mt_mutex_init(&d->mut); + d->id = id; + d->seqnum = -1; + d->isserver = isserver; + d->other_addr = other_addr; + d->outstanding = false; + + send_message(host_out, MSG_NEWCONN, &d, sizeof d); + + return d; +} + +struct icmpd* icmpd_create_server(int id, uint32_t client_addr) { + return icmpd_create_base(id, true, client_addr); +} + +struct icmpd* icmpd_create_client(uint32_t server_addr) { + assert(server_addr != 0); + return icmpd_create_base(-1, false, server_addr); +} + +void icmpd_destroy(struct icmpd *d) { + send_message(host_out, MSG_ENDCONN, &d, sizeof d); + // free() is done by thread +} + +void icmpd_server_set_outstanding(struct icmpd *d, int seqnum) { + assert(d->isserver); + mt_mutex_lock(&d->mut); + d->outstanding = true; + d->seqnum = seqnum; + mt_mutex_unlock(&d->mut); +} + +bool icmpd_peek(struct icmpd *d) { + send_message(host_out, MSG_PEEK, &d, sizeof d); + struct msg_in msg = recv_message(host_in); + assert(msg.hdr.type == MSG_PEEK_ANS); + + bool ret = ((uint8_t*)msg.data)[0]; + free(msg.data); + return ret; +} + +struct icmpd_received icmpd_recv(struct icmpd *d) { + send_message(host_out, MSG_RECV, &d, sizeof d); + struct msg_in msg = recv_message(host_in); + assert(msg.hdr.type == MSG_RECV_ANS); + + struct icmpd_received r; + assert(msg.hdr.size == sizeof r); + memcpy(&r, msg.data, sizeof r); + return r; +} + +void icmpd_send(struct icmpd *d, const void *data, size_t length) { + struct msg_form_send form; + form.d = d; + form.data = malloc(length); + memcpy(form.data, data, length); + form.length = length; + + send_message(host_out, MSG_SEND, &form, sizeof form); +} diff --git a/icmpd.h b/icmpd.h new file mode 100644 index 0000000..7a87ec6 --- /dev/null +++ b/icmpd.h @@ -0,0 +1,31 @@ +#pragma once + +#include +#include +#include "mt.h" + + +struct icmpd_received { + uint8_t *data; // points to malloc'd buffer; should be freed by recipient + size_t length; // length of 'data' + + uint32_t source_addr; + int id, seqnum; +}; + + +struct icmpd; + +// Pass 0 as client_addr to listen for messages from any IP (this channel will then only receive unassigned messages) +// Pass -1 as id to listen for messages with any id +struct icmpd* icmpd_create_server(int id, uint32_t client_addr); + +struct icmpd* icmpd_create_client(uint32_t server_addr); + +void icmpd_server_set_outstanding(struct icmpd *d, int seqnum); + +bool icmpd_peek(struct icmpd *d); +struct icmpd_received icmpd_recv(struct icmpd *d); +void icmpd_send(struct icmpd *d, const void *data, size_t length); + +void icmpd_destroy(struct icmpd *d); diff --git a/mt.c b/mt.c new file mode 100644 index 0000000..e5b241e --- /dev/null +++ b/mt.c @@ -0,0 +1,29 @@ +#include +#include "mt.h" + + +void mt_mutex_init(struct mt_mutex *mut) { + assert(pthread_mutex_init(&mut->m, NULL) == 0); +} + +void mt_mutex_destroy(struct mt_mutex *mut) { + assert(pthread_mutex_destroy(&mut->m) == 0); +} + +void mt_mutex_lock(struct mt_mutex *mut) { + assert(pthread_mutex_lock(&mut->m) == 0); +} + +void mt_mutex_unlock(struct mt_mutex *mut) { + assert(pthread_mutex_unlock(&mut->m) == 0); +} + +void mt_thread_create(struct mt_thread *th, void* (*callback)(void*), void *arg) { + assert(pthread_create(&th->t, NULL, callback, arg) == 0); +} + +void* mt_thread_join(struct mt_thread *th) { + void *ret; + assert(pthread_join(th->t, &ret)); + return ret; +} diff --git a/mt.h b/mt.h new file mode 100644 index 0000000..57afd5c --- /dev/null +++ b/mt.h @@ -0,0 +1,23 @@ +#pragma once + +#include + + +// The structs in this file should be considered opaque. + + +struct mt_mutex { + pthread_mutex_t m; +}; + +void mt_mutex_init(struct mt_mutex *mut); +void mt_mutex_destroy(struct mt_mutex *mut); +void mt_mutex_lock(struct mt_mutex *mut); +void mt_mutex_unlock(struct mt_mutex *mut); + +struct mt_thread { + pthread_t t; +}; + +void mt_thread_create(struct mt_thread *th, void* (*callback)(void*), void *arg); +void* mt_thread_join(struct mt_thread *th); diff --git a/run_server.sh b/run_server.sh index 42b6aef..e6beee9 100755 --- a/run_server.sh +++ b/run_server.sh @@ -6,4 +6,4 @@ fi echo 1 >/proc/sys/net/ipv4/icmp_echo_ignore_all trap "echo 0 >/proc/sys/net/ipv4/icmp_echo_ignore_all" EXIT -./server +./serverd diff --git a/server.c b/server.c index 9b16173..df51032 100644 --- a/server.c +++ b/server.c @@ -1,6 +1,7 @@ #include #include #include +#include #include #include "icmp_server.h" #include "util.h" @@ -14,21 +15,22 @@ int main(void) { } while (true) { - struct icmp_server_incoming msg = icmp_server_receive(sock); + struct icmp_incoming msg = icmp_server_receive(sock); if (msg.data == NULL) { perror("icmp_server_communicate"); return 1; } - printf("Received: type %d id %d seqnum %d data:\n", msg.type, msg.id, msg.seqnum); + printf("Received: id %d seqnum %d data:\n", msg.id, msg.seqnum); xxd(msg.data, msg.length); - if (msg.type != ICMP_ECHO) { - printf("Not an ICMP_ECHO, ignoring\n"); - continue; + char data[msg.length + 1]; + for (size_t i = 0; i < msg.length; i++) { + data[i] = toupper(msg.data[i]); } + data[msg.length] = '!'; - if (icmp_server_send_reply(sock, msg.source_addr, msg.id, msg.seqnum, "dank je wel", 11) < 0) { + if (icmp_server_send_reply(sock, msg.source_addr, msg.id, msg.seqnum, data, msg.length + 1) < 0) { perror("icmp_server_send_reply"); } } diff --git a/serverd.c b/serverd.c new file mode 100644 index 0000000..6f21530 --- /dev/null +++ b/serverd.c @@ -0,0 +1,73 @@ +#define _GNU_SOURCE // usleep, getline +#include +#include +#include +#include +#include +#include +#include +#include +#include "icmpd.h" +#include "util.h" + + +int main(void) { + struct icmpd *server = icmpd_create_server(-1, 0); + struct icmpd *conn = NULL; + + while (true) { + // printf("Iter\n"); + + if (icmpd_peek(server)) { + struct icmpd_received msg = icmpd_recv(server); + printf("Server recv: %zu\n", msg.length); + xxd(msg.data, msg.length); + + if (conn) { + printf("Message received while already connected\n"); + struct icmpd *d = icmpd_create_server(msg.id, msg.source_addr); + icmpd_server_set_outstanding(d, msg.seqnum); + icmpd_send(d, msg.data, msg.length); + icmpd_destroy(d); + } else { + conn = icmpd_create_server(msg.id, msg.source_addr); + icmpd_server_set_outstanding(conn, msg.seqnum); + + // icmpd_send(conn, "dankje", 6); + // icmpd_destroy(conn); + } + + free(msg.data); + } + + if (icmpd_peek(conn)) { + struct icmpd_received msg = icmpd_recv(conn); + printf("Connection recv: %zu\n", msg.length); + xxd(msg.data, msg.length); + free(msg.data); + } + + fd_set inset; + FD_ZERO(&inset); + FD_SET(0, &inset); + struct timeval tv; + tv.tv_sec = 1; + tv.tv_usec = 0; + int ret = select(1, &inset, NULL, NULL, &tv); + if (ret < 0) { + if (errno == EINTR) continue; + perror("select"); + return 1; + } + if (ret == 0) continue; // timeout + if (FD_ISSET(0, &inset)) { + char *line = NULL; + size_t linelen = 0; + ssize_t nr = getline(&line, &linelen, stdin); + assert(nr >= 0); + if (nr > 0) nr--; + icmpd_send(conn, line, nr); + free(line); + } + } +} diff --git a/util.c b/util.c index be7083c..d4bd0ec 100644 --- a/util.c +++ b/util.c @@ -1,8 +1,17 @@ #include +#include #include +#include +#include +#include #include "util.h" +int uniqid(void) { + static int i = 0; + return i++; +} + void xxd(const void *buf_, size_t length) { unsigned char *buf = (unsigned char*)buf_; @@ -28,3 +37,31 @@ void xxd(const void *buf_, size_t length) { cursor += 16; } } + +ssize_t readall(int fd, void *data, size_t length) { + size_t cursor = 0; + while (cursor < length) { + ssize_t nr = read(fd, data + cursor, length - cursor); + if (nr < 0) { + if (errno == EINTR) continue; + return -1; + } + assert(nr > 0); + cursor += nr; + } + return length; +} + +ssize_t writeall(int fd, const void *data, size_t length) { + size_t cursor = 0; + while (cursor < length) { + ssize_t nw = write(fd, data + cursor, length - cursor); + if (nw < 0) { + if (errno == EINTR) continue; + return -1; + } + assert(nw > 0); + cursor += nw; + } + return length; +} diff --git a/util.h b/util.h index ea48028..4b2b21c 100644 --- a/util.h +++ b/util.h @@ -1,6 +1,10 @@ #pragma once #include +#include +int uniqid(void); void xxd(const void *buf, size_t length); +ssize_t readall(int fd, void *data, size_t length); +ssize_t writeall(int fd, const void *data, size_t length); -- cgit v1.2.3