From cf6fbbec9e2a04217a135924fd2bf209be488223 Mon Sep 17 00:00:00 2001 From: Tom Smeding Date: Mon, 13 Aug 2018 18:01:05 +0200 Subject: Protocol implementation WIP --- prot.c | 368 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ prot.h | 57 +++++++++ protm.c | 25 ++++ protm.h | 42 +++++++ protocol.txt | 62 ++++++++-- 5 files changed, 543 insertions(+), 11 deletions(-) create mode 100644 prot.c create mode 100644 prot.h create mode 100644 protm.c create mode 100644 protm.h diff --git a/prot.c b/prot.c new file mode 100644 index 0000000..3b580b8 --- /dev/null +++ b/prot.c @@ -0,0 +1,368 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include "prot.h" +#include "protm.h" +#include "icmpd.h" +#include "mt.h" +#include "util.h" + + +#define SEQ_SIZE 256 +#define WIN_SIZE 16 + + +struct prot { + // CONSTANTS + bool is_accept; + uint32_t other_addr; + struct icmpd *d; + int d_fd; + int event_fd_in, event_fd_out; + + // VARIABLES + struct mt_mutex mut; + int my_win_start, other_win_start; + bool recv_success[WIN_SIZE]; + int my_next_seq; + bool peer_closed; +}; + + +// r/w host until spawn, thereafter constant +static bool thread_spawned = false; +static struct mt_thread thread; +static int host_in, host_out, thread_in, thread_out; + +// host-only +static bool have_accept_channel = false; + + +// Arguments to messages are listed in comments. +enum { + MSG_THREAD_UP, // - + MSG_NEWCONN, // struct prot* + MSG_ENDCONN, // struct prot*; cleanup and free() done by thread + MSG_ACCEPT, // - + MSG_ACCEPT_ANS, // struct prot*; allocated by thread + MSG_RECV, // struct prot* + MSG_RECV_ANS, // struct prot_msg + MSG_SEND, // struct msg_form_send +}; + +struct msg_header { + int type; + size_t size; // size in bytes of argument +}; + +struct msg_in { + struct msg_header hdr; + void *data; // should be free()'d +}; + +struct msg_form_send { + struct prot *ch; + void *data; // to be free()'d by thread when sent + size_t length; +}; + + +static void send_message(int sock, int type, const void *data, size_t size) { + struct msg_header head = {type, size}; + + assert(writeall(sock, &head, sizeof head) == sizeof head); + assert(writeall(sock, data, size) == (ssize_t)size); +} + +// On error, {.type=-1}, errno is set +static struct msg_in recv_message(int sock) { + struct msg_in msg; + + int ret = readall(sock, &msg.hdr, sizeof msg.hdr); + if (ret < 0) return (struct msg_in){.hdr.type = -1, .hdr.size = 0, .data = NULL}; + assert(ret == sizeof msg.hdr); + + msg.data = malloc(msg.hdr.size); + assert(readall(sock, msg.data, msg.hdr.size) == (ssize_t)msg.hdr.size); + + return msg; +} + +static void populate_prot(struct prot *ch, bool is_accept, struct icmpd *d, uint32_t other_addr) { + ch->is_accept = is_accept; + ch->other_addr = other_addr; + ch->d = d; + ch->d_fd = d != NULL ? icmpd_get_select_fd(d) : -1; + int pp[2]; + assert(pipe(pp) == 0); + ch->event_fd_in = pp[1]; + ch->event_fd_out = pp[0]; + + mt_mutex_init(&ch->mut); + ch->my_win_start = 0; + ch->other_win_start = 0; + memset(ch->recv_success, 0, WIN_SIZE * sizeof(bool)); + ch->my_next_seq = 0; + ch->peer_closed = false; +} + + +static void* thread_entry(void *arg_) { + (void)arg_; + send_message(thread_out, MSG_THREAD_UP, NULL, 0); + + struct prot *accept_ch = NULL; + + size_t chans_cap = 8, chans_len = 0; + struct prot **chans = malloc(chans_cap * sizeof(struct prot*)); + + while (true) { + fd_set inset; + FD_ZERO(&inset); + FD_SET(thread_in, &inset); + int nfds = thread_in; + for (size_t i = 0; i < chans_len; i++) { + if (chans[i]->peer_closed) continue; + FD_SET(chans[i]->d_fd, &inset); + if (chans[i]->d_fd > nfds) nfds = chans[i]->d_fd; + } + + int ret = select(nfds + 1, &inset, NULL, NULL, NULL); + if (ret < 0) { + if (errno == EINTR) continue; + perror("select"); + assert(false); + } + assert(ret > 0); + + if (FD_ISSET(thread_in, &inset)) { + struct msg_in msg = recv_message(thread_in); + + switch (msg.hdr.type) { + case MSG_NEWCONN: { + struct prot *ch = *(struct prot**)msg.data; + free(msg.data); + + if (ch->is_accept) { + assert(accept_ch == NULL); + accept_ch = ch; + break; + } + + if (chans_len == chans_cap) { + chans_cap *= 2; + chans = realloc(chans, chans_cap * sizeof(struct prot*)); + } + + chans[chans_len++] = ch; + break; + } + + case MSG_ENDCONN: { + struct prot *ch = *(struct prot**)msg.data; + free(msg.data); + + if (ch == accept_ch) { + accept_ch = NULL; + } else { + for (size_t i = 0; i < chans_len; i++) { + if (chans[i] == ch) { + memmove(chans + i, chans + i + 1, (chans_len - i - 1) * sizeof(struct prot*)); + chans_len--; + break; + } + } + } + + icmpd_destroy(ch->d); + close(ch->event_fd_in); + close(ch->event_fd_out); + mt_mutex_destroy(&ch->mut); + + free(ch); + break; + } + + case MSG_ACCEPT: { + free(msg.data); + assert(accept_ch != NULL); + + if (!icmpd_peek(accept_ch->d)) { + struct prot *ch = NULL; + send_message(thread_out, MSG_ACCEPT_ANS, &ch, sizeof ch); + break; + } + + struct icmpd_received re = icmpd_recv(accept_ch->d); + assert(re.length <= PROTM_MAX_SIZE); + + struct protm *m = (struct protm*)re.data; + uint8_t type = m->type; + uint8_t ack = m->estab.ack; + + free(re.data); + + if (type != PROTM_TYPE_ESTAB || ack != 0) { + struct prot *ch = NULL; + send_message(thread_out, MSG_ACCEPT_ANS, &ch, sizeof ch); + break; + } + + ssize_t found = -1; + for (size_t i = 0; i < chans_len; i++) { + if (chans[i]->other_addr == re.source_addr) { + found = i; + break; + } + } + + if (found != -1) { + mt_mutex_lock(&chans[found]->mut); + chans[found]->peer_closed = true; + mt_mutex_unlock(&chans[found]->mut); + + struct protm pm; + pm.type = PROTM_TYPE_TERM; + pm.term.ack = 0; + icmpd_send(chans[found]->d, &pm, protm_size(&pm)); + break; + } + + struct prot *ch = malloc(sizeof(struct prot)); + populate_prot(ch, false, icmpd_create_server(re.id, re.source_addr), re.source_addr); + icmpd_server_set_outstanding(ch->d, re.seqnum); + + send_message(thread_out, MSG_ACCEPT_ANS, &ch, sizeof ch); + break; + } + + case MSG_RECV: { + struct prot ch = *(struct prot*)msg.data; + free(msg.data); + + ; + break; + } + } + } + } + + return NULL; +} + + +static void spawn_thread(void) { + int pp[2]; + + assert(pipe(pp) == 0); + thread_out = pp[1]; + host_in = pp[0]; + + assert(pipe(pp) == 0); + host_out = pp[1]; + thread_in = pp[0]; + + mt_thread_create(&thread, thread_entry, NULL); + + struct msg_in msg = recv_message(host_in); + assert(msg.hdr.type == MSG_THREAD_UP); + free(msg.data); + + thread_spawned = true; +} + +int prot_get_select_fd(struct prot *ch) { + return ch->event_fd_out; +} + +void prot_terminate(struct prot *ch) { + send_message(host_out, MSG_ENDCONN, &ch, sizeof ch); + + if (ch->is_accept) { + have_accept_channel = false; + } +} + +struct prot* prot_create_accept_channel() { + if (have_accept_channel) return NULL; + + if (!thread_spawned) spawn_thread(); + + have_accept_channel = true; + + struct prot *ach = malloc(sizeof(struct prot)); + populate_prot(ach, true, icmpd_create_server(-1, 0), 0); + return ach; +} + +struct prot* prot_accept(struct prot *ach) { + assert(ach->is_accept); + + send_message(host_out, MSG_ACCEPT, NULL, 0); + + struct msg_in msg = recv_message(host_in); + assert(msg.hdr.type == MSG_ACCEPT_ANS); + + struct prot *ch = *(struct prot**)msg.data; + free(msg.data); + return ch; +} + +struct prot* prot_connect(uint32_t server_addr) { + if (!thread_spawned) spawn_thread(); + + struct prot *ch = malloc(sizeof(struct prot)); + populate_prot(ch, false, icmpd_create_client(server_addr), server_addr); + return ch; +} + +struct prot_msg prot_recv(struct prot *ch) { + mt_mutex_lock(&ch->mut); + bool closed = ch->peer_closed; + mt_mutex_unlock(&ch->mut); + + if (closed) { + errno = ECONNRESET; + return (struct prot_msg){.data = NULL}; + } + + send_message(host_out, MSG_RECV, &ch, sizeof ch); + + struct msg_in msg = recv_message(host_in); + assert(msg.hdr.type == MSG_RECV_ANS); + + struct prot_msg res; + assert(msg.hdr.size == sizeof res); + memcpy(&res, msg.data, sizeof res); + free(msg.data); + + if (res.data == NULL) errno = EAGAIN; + + return res; +} + +int prot_send(struct prot *ch, const void *data, size_t length) { + mt_mutex_lock(&ch->mut); + bool closed = ch->peer_closed; + mt_mutex_unlock(&ch->mut); + + if (closed) { + errno = ECONNRESET; + return -1; + } + + struct msg_form_send form; + form.ch = ch; + form.data = malloc(length); + memcpy(form.data, data, length); + form.length = length; + + send_message(host_out, MSG_SEND, &form, sizeof form); + + return 0; +} diff --git a/prot.h b/prot.h new file mode 100644 index 0000000..e6eeb05 --- /dev/null +++ b/prot.h @@ -0,0 +1,57 @@ +#pragma once + +#include + + +struct prot_msg { + uint8_t *data; // malloc'd buffer, should be free'd by recipient + size_t length; // length of data +}; + + +// The interface exposed here is NOT thread-safe. Having multiple +// threads in in the same process working with prot is unsafe, since +// internal communication assumes one calling thread. + + +// This is a channel, a "connection", to another party. +struct prot; + +// Returns a file descriptor that becomes readable in select(2) if +// there may be a new event on the channel. Do not actually read data +// from this file descriptor. +int prot_get_select_fd(struct prot *ch); + +// Terminates the connection, blocks until acknowledged. Then channel +// is closed and freed. +void prot_terminate(struct prot *ch); + + +// Create channel on which the server can accept new connections using +// prot_accept(). Only one accept channel can be made in one process. +struct prot* prot_create_accept_channel(); + +// Returns a new connection with a client if such an establishment +// request is ready; an acknowledgement is replied to the request. If +// no new connection is ready at the moment, the function returns +// NULL. (This is a non-blocking function.) +struct prot* prot_accept(struct prot *ach); + + +// Creates a client connection to a server with the specified address. +// Blocks until the establishment request is acknowledged. +struct prot* prot_connect(uint32_t server_addr); + + +// Receives a waiting message; returns {.data=NULL} on error, in which +// case errno is set: EAGAIN = no message ready; ECONNRESET = peer +// closed connection; other values: internal errors. +// Channel should not be an accept channel. This is a non-blocking +// function. +struct prot_msg prot_recv(struct prot *ch); + +// Sends the given data on the given channel. Channel should not be an +// accept channel. Returns 0 on success and -1 on error, in which case +// errno is set: ECONNRESET = peer closed connection; other values: +// internal errors. +int prot_send(struct prot *ch, const void *data, size_t length); diff --git a/protm.c b/protm.c new file mode 100644 index 0000000..932efca --- /dev/null +++ b/protm.c @@ -0,0 +1,25 @@ +#include +#include +#include "protm.h" + +size_t protm_size(const struct protm *pm) { + switch (pm->type) { + case PROTM_TYPE_ESTAB: + return 2; + + case PROTM_TYPE_TERM: + return 2; + + case PROTM_TYPE_DATA: + return 7 + pm->data.size; + + case PROTM_TYPE_RR: + return 3 + pm->rr.nrej; + + case PROTM_TYPE_RR_POLL: + return 1; + + default: + assert(false); + } +} diff --git a/protm.h b/protm.h new file mode 100644 index 0000000..4939df2 --- /dev/null +++ b/protm.h @@ -0,0 +1,42 @@ +#pragma once + +#include +#include + + +#define PROTM_MAX_SIZE 65536 + +#define PROTM_TYPE_ESTAB 1 +#define PROTM_TYPE_TERM 2 +#define PROTM_TYPE_DATA 3 +#define PROTM_TYPE_RR 4 +#define PROTM_TYPE_RR_POLL 5 + +struct __attribute__((packed)) protm { + uint8_t type; + union { + struct { + uint8_t ack; + } estab; + struct { + uint8_t ack; + } term; + struct { + uint8_t seqnum, rr; + uint32_t size; + // An IPv4 packet is at most 65536 bytes, so the data portion + // left after allowing for the IPv4 and ICMP headers as well + // as those for this protocol, is surely at most 65536 bytes. + uint8_t data[65536]; + } data; + struct { + uint8_t rr, nrej; + uint8_t rej[256]; + } rr; + struct { + // empty + } rr_poll; + }; +}; + +size_t protm_size(const struct protm *pm); diff --git a/protocol.txt b/protocol.txt index fd07ced..1ba36ce 100644 --- a/protocol.txt +++ b/protocol.txt @@ -8,31 +8,71 @@ field. Each message containing data has a sequence number. This sequence number is in the seqnum range [0, SEQ_SIZE = 256). Windowing ARQ is applied for efficiency, with a window size of WIN_SIZE = 16. Each party has an independent seqnum -counter. Only messages in the current window range may be sent. +counter and an independent window. Only messages in the current window range may +be sent. The first message byte is the message type. The message types are described below. All structs are packed. -struct prot_data { +struct protm_estab { uint8_t type = 1; + uint8_t ack; +} + +Requests that a new connection is to be established; this message should be sent +by the client. +If ack=0, then this is an establishment request: the receiving part is asked to +respond with a protm_estab(ack=1) message to finish the connection +establishment. +If ack=1, then this is an establishment acknowledgement, which makes the +connection ready to use for both parties. +If an ack=0 message is received by a client, the message can be ignored. +If an ack=0 message is received by a server that already has a connection with +that client, the connection should be terminated and the request replied with a +protm_term(ack=0). +If an ack=1 message is received by a server, the message can be ignored. +If an ack=1 message is received by a client while no unanswered ack=0 message +had been sent to that server, the client should reply with a protm_term(ack=0) +message. + + +struct protm_term { + uint8_t type = 2; + uint8_t ack; +} + +Communicates termination requests and acknowledgements. +If ack=0, then this is a termination request: the receiving party is asked to +respond with a protm_term(ack=1) message to acknowledge the termination. +If ack=1, then this is a termination acknowledgement, which terminates the +connection for both parties. +If an ack=0 message is received while no connection is active, the program +should respond with an ack=1 message (an earlier ack=1 message could have been +lost). +If an ack=1 message is received while no connection is active, the message can +be ignored. + + +struct protm_data { + uint8_t type = 3; uint8_t seqnum, rr; uint32_t size; uint8_t data[size]; } -Transmits application data, with piggy-back of a prot_rr message with nrej=0. +Transmits application data, with piggy-back of a protm_rr message with nrej=0. - 'seqnum' is the sequence number of this message. -- 'rr' is the expected next seqnum, as in the prot_rr message. This can shift +- 'rr' is the expected next seqnum, as in the protm_rr message. This can shift the window. - 'size' is the number of bytes of data sent in this packet. - 'data' contains the next-protocol-level data sent in this message. -struct prot_rr { - uint8_t type = 2; +struct protm_rr { + uint8_t type = 4; uint8_t rr, nrej; uint8_t rej[nrej]; } @@ -49,10 +89,10 @@ recipient is asked to resend its messages with seqnums in 'rej'. -struct prot_rr_poll { - uint8_t type = 3; +struct protm_rr_poll { + uint8_t type = 5; } -Asks for a prot_rr message, because the sender has new data to send but thinks -the window is full. The receiver of a prot_rr_poll should send its current -prot_rr message. +Asks for a protm_rr message, because the sender has new data to send but thinks +the window is full. The receiver of a protm_rr_poll should send its current +protm_rr message. -- cgit v1.2.3