summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore1
-rw-r--r--Makefile21
-rw-r--r--controller/.gitignore1
-rw-r--r--controller/Makefile28
-rw-r--r--controller/main.c245
-rw-r--r--controller/unzbuffer.c95
-rw-r--r--controller/unzbuffer.h14
-rw-r--r--global.h18
-rw-r--r--line_reader.c54
-rw-r--r--line_reader.h11
-rw-r--r--memory.c33
-rw-r--r--memory.h19
-rw-r--r--protocol.txt16
-rw-r--r--server/.gitignore1
-rw-r--r--server/Makefile28
-rw-r--r--server/data_stream.c156
-rw-r--r--server/data_stream.h13
-rw-r--r--server/main.c200
-rwxr-xr-xserver/pstat.py14
-rw-r--r--server/zbuffer.c79
-rw-r--r--server/zbuffer.h13
21 files changed, 1060 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..b672fde
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+obj
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..7d82ff5
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,21 @@
+CC = gcc
+CFLAGS = -Wall -Wextra -std=c11 -g -O2 -fwrapv
+LDFLAGS = -lz
+
+OBJDIR = obj
+
+.PHONY: all clean
+
+.SUFFIXES:
+
+all: $(patsubst %.c,$(OBJDIR)/%.o,$(wildcard *.c))
+
+clean:
+ @echo "Cleaning"
+ @rm -rf $(OBJDIR)
+
+
+$(OBJDIR)/%.o: %.c $(wildcard *.h)
+ @mkdir -p $(OBJDIR)
+ @echo "CC $<"
+ @$(CC) $(CFLAGS) -c -o $@ $<
diff --git a/controller/.gitignore b/controller/.gitignore
new file mode 100644
index 0000000..97f90d8
--- /dev/null
+++ b/controller/.gitignore
@@ -0,0 +1 @@
+controller
diff --git a/controller/Makefile b/controller/Makefile
new file mode 100644
index 0000000..7282260
--- /dev/null
+++ b/controller/Makefile
@@ -0,0 +1,28 @@
+CC = gcc
+CFLAGS = -Wall -Wextra -std=c11 -g -O2 -fwrapv -I..
+LDFLAGS = -lz
+TARGET = controller
+
+OBJDIR = obj
+
+.PHONY: all clean
+
+.SUFFIXES:
+
+all: $(TARGET)
+
+clean:
+ @echo "Cleaning"
+ @rm -f $(TARGET)
+ @rm -rf $(OBJDIR)
+
+
+$(OBJDIR)/%.o: %.c $(wildcard *.h)
+ @mkdir -p $(OBJDIR)
+ @echo "CC $<"
+ @$(CC) $(CFLAGS) -c -o $@ $<
+
+$(TARGET): $(patsubst %.c,$(OBJDIR)/%.o,$(wildcard *.c))
+ @make -q -C .. || make -C .. --no-print-directory
+ @echo "LD -o $@"
+ @$(CC) -o $@ $^ $(wildcard ../$(OBJDIR)/*.o) $(LDFLAGS)
diff --git a/controller/main.c b/controller/main.c
new file mode 100644
index 0000000..2e37e76
--- /dev/null
+++ b/controller/main.c
@@ -0,0 +1,245 @@
+#include <stdio.h>
+#include <string.h>
+#include <time.h>
+#include <unistd.h>
+#include <errno.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <sys/select.h>
+#include "global.h"
+#include "line_reader.h"
+#include "unzbuffer.h"
+
+
+static bool writeall(int sock,const char *data,i64 len){
+ i64 cursor=0;
+ while(cursor<len){
+ i64 nwr=write(sock,data+cursor,len-cursor);
+ if(nwr<=0)return false;
+ cursor+=nwr;
+ }
+ return true;
+}
+
+struct frame_process {
+ pid_t pid;
+ i64 namelen;
+ char *name;
+ i64 usertime,systime,rss;
+};
+
+struct data_frame {
+ struct timeval stamp;
+ i64 numpids;
+ struct frame_process *procs;
+};
+
+static i64 deserialise8(const u8 *buf){
+ u64 value=0;
+ for(i64 i=0;i<8;i++)value|=((u64)buf[i])<<(8*i);
+ return (i64)value;
+}
+
+static i32 deserialise4(const u8 *buf){
+ u32 value=0;
+ for(i64 i=0;i<4;i++)value|=((u32)buf[i])<<(8*i);
+ return (i32)value;
+}
+
+static void process_frame(const struct data_frame *frame){
+ fprintf(stderr,"%s: (%" PRIi64 ")\n",ctime(&frame->stamp.tv_sec),frame->numpids);
+}
+
+static i64 read_frame(const u8 *buffer,i64 len){
+ if(len<20)return 0;
+ struct data_frame *frame=malloc(1,struct data_frame);
+ frame->stamp.tv_sec=deserialise8(buffer);
+ frame->stamp.tv_usec=deserialise4(buffer+8);
+ i64 numpids=frame->numpids=deserialise8(buffer+12);
+ i64 cursor=20;
+ fprintf(stderr,"numpids=%" PRIi64 "\n",numpids);
+ if(len-cursor<numpids*36){
+ free(frame);
+ return 0;
+ }
+ // calloc to null unfilled pointers in procs
+ frame->procs=calloc(numpids,struct frame_process);
+
+ i64 pi;
+ for(pi=0;pi<frame->numpids;pi++){
+ fprintf(stderr,"pi=%" PRIi64 "\n",pi);
+ if(len-cursor<36){
+ fprintf(stderr,"insuff: pi\n");
+ goto insufficient_data;
+ }
+ frame->procs[pi].pid=deserialise4(buffer+cursor);
+ cursor+=4;
+ i64 namelen=frame->procs[pi].namelen=deserialise8(buffer+cursor);
+ cursor+=8;
+ fprintf(stderr," pid=%d namelen=%" PRIi64 "\n",frame->procs[pi].pid,namelen);
+ if(len-cursor<namelen+24){
+ fprintf(stderr,"insuff: namelen\n");
+ goto insufficient_data;
+ }
+ frame->procs[pi].name=malloc(namelen+1,char);
+ memcpy(frame->procs[pi].name,buffer+cursor,namelen);
+ cursor+=namelen;
+ frame->procs[pi].usertime=deserialise8(buffer+cursor);
+ cursor+=8;
+ frame->procs[pi].systime=deserialise8(buffer+cursor);
+ cursor+=8;
+ frame->procs[pi].rss=deserialise8(buffer+cursor);
+ cursor+=8;
+ }
+
+ process_frame(frame);
+ for(i64 i=0;i<numpids;i++)free(frame->procs[i].name);
+ free(frame->procs);
+ free(frame);
+ return cursor;
+
+insufficient_data:
+ if(frame->procs){
+ for(i64 i=0;i<=pi;i++){
+ if(frame->procs[i].name)free(frame->procs[i].name);
+ }
+ free(frame->procs);
+ }
+ free(frame);
+ return 0;
+}
+
+static void frame_data_sink(u8 *newdata,i64 newdatalen,void *payload){
+ (void)payload;
+ static u8 *buffer=NULL;
+ static i64 buffersize=0,bufferlen=0;
+
+ fprintf(stderr,"frame_data_sink(len=%" PRIi64 ")\n",newdatalen);
+
+ if(buffer==NULL){
+ buffersize=4096;
+ buffer=malloc(buffersize,u8);
+ }
+
+ if(bufferlen+newdatalen>buffersize){
+ do buffersize*=2; while(bufferlen+newdatalen>buffersize);
+ buffer=realloc(buffer,buffersize,u8);
+ }
+ memcpy(buffer+bufferlen,newdata,newdatalen);
+ bufferlen+=newdatalen;
+
+ i64 cursor=0;
+ while(cursor<bufferlen){
+ i64 nr=read_frame(buffer+cursor,bufferlen-cursor);
+ if(nr<=0)break;
+ cursor+=nr;
+ }
+ if(cursor<bufferlen){
+ memmove(buffer,buffer+cursor,bufferlen-cursor);
+ }
+ bufferlen-=cursor;
+}
+
+int main(int argc,char **argv){
+ if(argc!=2&&argc!=3){
+ fprintf(stderr,"Usage: %s <ip addr> [port=57575]\n",argv[0]);
+ return 1;
+ }
+
+ struct addrinfo hints,*res;
+ memset(&hints,0,sizeof(hints));
+ hints.ai_family=AF_UNSPEC;
+ hints.ai_socktype=SOCK_STREAM;
+ int ret=getaddrinfo(argv[1],argc==3?argv[2]:"57575",&hints,&res);
+ if(ret!=0){
+ fprintf(stderr,"getaddrinfo: %s\n",gai_strerror(ret));
+ return 1;
+ }
+
+ int sock=-1;
+ for(struct addrinfo *r=res;r!=NULL;r=r->ai_next){
+ char buf[256];
+ inet_ntop(r->ai_family,&((struct sockaddr_in*)r->ai_addr)->sin_addr,buf,sizeof buf);
+ fprintf(stderr,"Trying: %s\n",buf);
+ sock=socket(r->ai_family,r->ai_socktype,r->ai_protocol);
+ if(sock<0){
+ sock=-1;
+ perror("socket");
+ continue;
+ }
+ if(connect(sock,r->ai_addr,r->ai_addrlen)<0){
+ close(sock);
+ sock=-1;
+ perror("connect");
+ continue;
+ }
+ break;
+ }
+ freeaddrinfo(res);
+ if(sock==-1)return 1;
+ fprintf(stderr,"Connected.\n");
+
+ struct line_reader *liner=line_reader_init(STDIN_FILENO);
+ struct unzbuffer *z=unzbuffer_init(frame_data_sink,NULL);
+
+ bool haderror=false;
+ while(true){
+ fd_set inset;
+ FD_ZERO(&inset);
+ FD_SET(STDIN_FILENO,&inset);
+ FD_SET(sock,&inset);
+ int ret=select(sock+1,&inset,NULL,NULL,NULL);
+ if(ret<0){
+ if(errno==EINTR)continue;
+ perror("select");
+ break;
+ }
+
+ if(FD_ISSET(STDIN_FILENO,&inset)){
+ char buf[1024];
+ i64 nr=read(STDIN_FILENO,buf,sizeof buf);
+ if(nr<0){
+ if(errno==EINTR)continue;
+ perror("read");
+ break;
+ }
+ if(nr==0)break; // EOF
+ line_reader_supply_data(liner,buf,nr);
+ char *line;
+ while((line=line_reader_get_line(liner,true))!=NULL){
+ if(!writeall(sock,line,strlen(line))){
+ fprintf(stderr,"Socket write error\n");
+ goto cleanup;
+ }
+ // printf("Written <%s>\n",line);
+ free(line);
+ }
+ }
+
+ if(FD_ISSET(sock,&inset)){
+ u8 buf[1024];
+ i64 nr=read(sock,buf,sizeof buf);
+ if(nr<0){
+ if(errno==EINTR)continue;
+ perror("read");
+ break;
+ }
+ if(nr==0){
+ fprintf(stderr,"End-of-file on socket\n");
+ break;
+ }
+ fprintf(stderr,"unzbuffer_write(nr=%" PRIi64 ")\n",nr);
+ if(!unzbuffer_write(z,buf,nr)){
+ haderror=true;
+ break;
+ }
+ }
+ }
+
+cleanup:
+ line_reader_destroy(liner);
+ if(!unzbuffer_finish_destroy(z))return 1;
+ if(haderror)return 1;
+ return 0;
+}
diff --git a/controller/unzbuffer.c b/controller/unzbuffer.c
new file mode 100644
index 0000000..ecd38a6
--- /dev/null
+++ b/controller/unzbuffer.c
@@ -0,0 +1,95 @@
+#include <stdio.h>
+#include <assert.h>
+#include <zlib.h>
+#include "unzbuffer.h"
+
+#define BUFSZ_ZOUT (256*1024)
+
+
+struct unzbuffer {
+ u8 *out;
+ i64 outfill;
+ z_stream strm;
+
+ bool just_resetted;
+
+ unzbuffer_writefunc *wf;
+ void *payload;
+};
+
+struct unzbuffer* unzbuffer_init(unzbuffer_writefunc *wf,void *payload){
+ struct unzbuffer *z=malloc(1,struct unzbuffer);
+ z->out=malloc(BUFSZ_ZOUT,u8);
+ z->outfill=0;
+
+ z->strm.zalloc=Z_NULL;
+ z->strm.zfree=Z_NULL;
+ z->strm.opaque=Z_NULL;
+ z->strm.avail_in=0;
+ z->strm.next_in=Z_NULL;
+
+ // so unzbuffer_write can start writing immediately
+ z->strm.avail_out=BUFSZ_ZOUT;
+ z->strm.next_out=z->out;
+
+ if(inflateInit(&z->strm)!=Z_OK){
+ return NULL;
+ }
+
+ z->just_resetted=true;
+
+ z->wf=wf;
+ z->payload=payload;
+ return z;
+}
+
+bool unzbuffer_write(struct unzbuffer *z,u8 *data,i64 len){
+ z->just_resetted=false;
+ z->strm.avail_in=len;
+ z->strm.next_in=data;
+ assert(z->strm.avail_out>0);
+ while(true){
+ int ret=inflate(&z->strm,Z_NO_FLUSH);
+ if(ret!=Z_OK&&ret!=Z_STREAM_END){
+ inflateEnd(&z->strm);
+ fprintf(stderr,"unz error: %d\n",ret);
+ return false;
+ }
+ if(ret!=Z_STREAM_END&&z->strm.avail_out>0)break;
+ z->wf(z->out,BUFSZ_ZOUT,z->payload);
+ if(ret==Z_STREAM_END){
+ inflateReset(&z->strm);
+ z->strm.avail_in=0;
+ z->strm.next_in=Z_NULL;
+ z->strm.avail_out=BUFSZ_ZOUT;
+ z->strm.next_out=z->out;
+ z->just_resetted=true;
+ break;
+ }
+ z->strm.avail_out=BUFSZ_ZOUT;
+ z->strm.next_out=z->out;
+ }
+ assert(z->strm.avail_in==0);
+ return true;
+}
+
+bool unzbuffer_finish_destroy(struct unzbuffer *z){
+ if(!z->just_resetted){
+ assert(z->strm.avail_out>0);
+ while(true){
+ int ret=inflate(&z->strm,Z_FINISH);
+ if(ret!=Z_OK&&ret!=Z_STREAM_END){
+ inflateEnd(&z->strm);
+ fprintf(stderr,"unz error: %d\n",ret);
+ return false;
+ }
+ z->wf(z->out,BUFSZ_ZOUT-z->strm.avail_out,z->payload);
+ if(z->strm.avail_out>0)break;
+ z->strm.avail_out=BUFSZ_ZOUT;
+ z->strm.next_out=z->out;
+ }
+ }
+ free(z->out);
+ free(z);
+ return true;
+}
diff --git a/controller/unzbuffer.h b/controller/unzbuffer.h
new file mode 100644
index 0000000..547d2af
--- /dev/null
+++ b/controller/unzbuffer.h
@@ -0,0 +1,14 @@
+#pragma once
+
+#include "global.h"
+
+
+typedef void unzbuffer_writefunc(u8*,i64,void*);
+
+struct unzbuffer;
+
+struct unzbuffer* unzbuffer_init(unzbuffer_writefunc *wf,void *payload);
+
+// Return false on invalid stream
+bool unzbuffer_write(struct unzbuffer *z,u8 *data,i64 len);
+bool unzbuffer_finish_destroy(struct unzbuffer *z);
diff --git a/global.h b/global.h
new file mode 100644
index 0000000..b0029c6
--- /dev/null
+++ b/global.h
@@ -0,0 +1,18 @@
+#pragma once
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <inttypes.h>
+#include <limits.h>
+#include "memory.h"
+
+typedef int8_t i8;
+typedef uint8_t u8;
+typedef int16_t i16;
+typedef uint16_t u16;
+typedef int32_t i32;
+typedef uint32_t u32;
+typedef int64_t i64;
+typedef uint64_t u64;
+
+#define LARGENUM (INT_MAX/32)
diff --git a/line_reader.c b/line_reader.c
new file mode 100644
index 0000000..48c8355
--- /dev/null
+++ b/line_reader.c
@@ -0,0 +1,54 @@
+#include <stdio.h>
+#include <string.h>
+#include "line_reader.h"
+
+
+struct line_reader {
+ int fd;
+ i64 bufcap,buflen;
+ char *buffer;
+};
+
+struct line_reader* line_reader_init(int fd){
+ struct line_reader *r=malloc(1,struct line_reader);
+ r->fd=fd;
+ r->bufcap=1024;
+ r->buflen=0;
+ r->buffer=malloc(r->bufcap,char);
+ return r;
+}
+
+void line_reader_supply_data(struct line_reader *r,const char *data,i64 len){
+ bool do_realloc=false;
+ while(r->buflen+len>r->bufcap){
+ if(r->bufcap*2>LARGENUM){
+ fprintf(stderr,"Data too large in line_reader_supply_data!\n");
+ return;
+ }
+ r->bufcap*=2;
+ do_realloc=true;
+ }
+ if(do_realloc){
+ r->buffer=realloc(r->buffer,r->bufcap,char);
+ }
+ memcpy(r->buffer+r->buflen,data,len);
+ r->buflen+=len;
+}
+
+char* line_reader_get_line(struct line_reader *r,bool include_lf){
+ char *p;
+ p=memchr(r->buffer,'\n',r->buflen);
+ if(p==NULL)return NULL;
+ i64 idx=p-r->buffer;
+ char *str=malloc(idx+include_lf+1,char);
+ memcpy(str,r->buffer,idx+include_lf);
+ str[idx+include_lf]='\0';
+ memmove(r->buffer,r->buffer+idx+1,r->buflen-idx-1);
+ r->buflen-=idx+1;
+ return str;
+}
+
+void line_reader_destroy(struct line_reader *r){
+ free(r->buffer);
+ free(r);
+}
diff --git a/line_reader.h b/line_reader.h
new file mode 100644
index 0000000..898801f
--- /dev/null
+++ b/line_reader.h
@@ -0,0 +1,11 @@
+#pragma once
+
+#include "global.h"
+
+
+struct line_reader;
+
+struct line_reader* line_reader_init(int fd);
+void line_reader_supply_data(struct line_reader *r,const char *data,i64 len);
+char* line_reader_get_line(struct line_reader *r,bool include_lf); // call until NULL
+void line_reader_destroy(struct line_reader *r);
diff --git a/memory.c b/memory.c
new file mode 100644
index 0000000..8734ab2
--- /dev/null
+++ b/memory.c
@@ -0,0 +1,33 @@
+#define _GNU_SOURCE
+#include <stdio.h>
+#include <stdarg.h>
+#include <assert.h>
+#include "global.h"
+#include "memory.h"
+
+void* check_after_allocation(const char *func,size_t num,size_t sz,void *ptr){
+ if(ptr==NULL){
+ fprintf(stderr,"Allocation failed: %s(%zu * %zuB = %zu)",func,num,sz,num*sz);
+ exit(1);
+ }
+ return ptr;
+}
+
+void* check_after_allocation_str(const char *func,void *ptr){
+ if(ptr==NULL){
+ fprintf(stderr,"Allocation failed: %s()",func);
+ exit(1);
+ }
+ return ptr;
+}
+
+__attribute__((format (printf, 2, 3)))
+int memory_asprintf_wrapper(char **ret,const char *format,...){
+ assert(ret!=NULL);
+ va_list ap;
+ va_start(ap,format);
+ int len=vasprintf(ret,format,ap);
+ va_end(ap);
+ check_after_allocation_str("asprintf",*ret);
+ return len;
+}
diff --git a/memory.h b/memory.h
new file mode 100644
index 0000000..9175dea
--- /dev/null
+++ b/memory.h
@@ -0,0 +1,19 @@
+#pragma once
+
+#include <stdlib.h>
+
+#define malloc(num,type) \
+ ((type*)check_after_allocation("malloc",num,sizeof(type),malloc((num)*sizeof(type))))
+#define calloc(num,type) \
+ ((type*)check_after_allocation("calloc",num,sizeof(type),calloc((num),sizeof(type))))
+#define realloc(ptr,num,type) \
+ ((type*)check_after_allocation("realloc",num,sizeof(type),realloc((ptr),(num)*sizeof(type))))
+#define strdup(str) \
+ ((char*)check_after_allocation_str("strdup",strdup(str)))
+#define asprintf(...) \
+ (memory_asprintf_wrapper(__VA_ARGS__))
+
+void* check_after_allocation(const char *func,size_t num,size_t sz,void *ptr);
+void* check_after_allocation_str(const char *func,void *ptr);
+
+int memory_asprintf_wrapper(char **ret,const char *format,...) __attribute__((format (printf, 2, 3)));
diff --git a/protocol.txt b/protocol.txt
new file mode 100644
index 0000000..fda2640
--- /dev/null
+++ b/protocol.txt
@@ -0,0 +1,16 @@
+controller -> server:
+Just lines of text, see either program's main.c.
+
+server -> controller:
+Data frames compressed using zlib. Data frames look as follows:
+- Timestamp: (as from gettimeofday)
+ - seconds (8 bytes)
+ - microseconds (4 bytes)
+- numpids (8 bytes)
+- [numpids times:]
+ - pid (4 bytes)
+ - namelen (8 bytes)
+ - process name (namelen bytes)
+ - total user time (8 bytes)
+ - total system time (8 bytes)
+ - resident size (8 bytes)
diff --git a/server/.gitignore b/server/.gitignore
new file mode 100644
index 0000000..3fbc411
--- /dev/null
+++ b/server/.gitignore
@@ -0,0 +1 @@
+pstat
diff --git a/server/Makefile b/server/Makefile
new file mode 100644
index 0000000..24cf554
--- /dev/null
+++ b/server/Makefile
@@ -0,0 +1,28 @@
+CC = gcc
+CFLAGS = -Wall -Wextra -std=c11 -g -O2 -fwrapv -I..
+LDFLAGS = -lz
+TARGET = pstat
+
+OBJDIR = obj
+
+.PHONY: all clean
+
+.SUFFIXES:
+
+all: $(TARGET)
+
+clean:
+ @echo "Cleaning"
+ @rm -f $(TARGET)
+ @rm -rf $(OBJDIR)
+
+
+$(OBJDIR)/%.o: %.c $(wildcard *.h)
+ @mkdir -p $(OBJDIR)
+ @echo "CC $<"
+ @$(CC) $(CFLAGS) -c -o $@ $<
+
+$(TARGET): $(patsubst %.c,$(OBJDIR)/%.o,$(wildcard *.c))
+ @make -q -C .. || make -C .. --no-print-directory
+ @echo "LD -o $@"
+ @$(CC) -o $@ $^ $(wildcard ../$(OBJDIR)/*.o) $(LDFLAGS)
diff --git a/server/data_stream.c b/server/data_stream.c
new file mode 100644
index 0000000..e86114e
--- /dev/null
+++ b/server/data_stream.c
@@ -0,0 +1,156 @@
+#include <stdio.h>
+#include <limits.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/time.h>
+#include <libproc.h>
+#include "data_stream.h"
+
+
+struct data_stream {
+ int fd;
+ i64 pidbufsize;
+ pid_t *pids;
+ i64 namebufsize;
+ u8 *namebuf;
+ i64 writebufsize;
+ u8 *writebuf;
+ i64 cursor;
+ struct zbuffer *z;
+};
+
+static bool callback_error=false;
+
+static void write_data_fd(u8 *data,i64 len,void *fdp){
+ int fd=*(int*)fdp;
+ i64 cursor=0;
+ while(cursor<len){
+ i64 nwr=write(fd,data+cursor,len-cursor);
+ if(nwr<0){
+ perror("write");
+ callback_error=true;
+ return;
+ }
+ cursor+=nwr;
+ }
+}
+
+static void bufappend(u8 **buf,i64 *cursor,i64 *cap,u8 *data,i64 datalen){
+ if(*cursor+datalen>=*cap){
+ if(*cap*2>LARGENUM)return;
+ *cap*=2;
+ *buf=realloc(*buf,*cap,u8);
+ }
+ memcpy(*buf+*cursor,data,datalen);
+ *cursor+=datalen;
+}
+
+static void serialise8(u8 *buf,i64 value){
+ for(i64 i=0;i<8;i++)buf[i]=(value>>(8*i))&0xff;
+}
+
+static void serialise4(u8 *buf,i32 value){
+ for(i64 i=0;i<4;i++)buf[i]=(value>>(8*i))&0xff;
+}
+
+struct data_stream* data_stream_init(int fd){
+ struct data_stream *s=malloc(1,struct data_stream);
+ s->fd=fd;
+ s->pidbufsize=1024;
+ s->pids=malloc(s->pidbufsize,pid_t);
+ s->namebufsize=64;
+ s->namebuf=malloc(s->namebufsize,u8);
+ s->writebufsize=4096;
+ s->writebuf=malloc(s->writebufsize,u8);
+ s->cursor=0;
+ s->z=zbuffer_init(write_data_fd,&s->fd);
+
+ callback_error=false;
+
+ return s;
+}
+
+int data_stream_finish_destroy(struct data_stream *s){
+ callback_error=false;
+ zbuffer_finish_destroy(s->z);
+ free(s->pids);
+ free(s->namebuf);
+ free(s->writebuf);
+ free(s);
+ if(callback_error)return -1;
+ return 0;
+}
+
+int data_stream_frame(struct data_stream *s){
+ callback_error=false;
+
+ u8 buf8[8];
+
+ struct timeval timeval;
+ if(gettimeofday(&timeval,NULL)<0){
+ perror("gettimeofday");
+ timeval.tv_sec=0;
+ timeval.tv_usec=0;
+ }
+ serialise8(buf8,timeval.tv_sec);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8);
+ serialise4(buf8,timeval.tv_usec);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,4);
+
+ i64 numpids=proc_listallpids(s->pids,s->pidbufsize*sizeof(pid_t));
+ while(s->pidbufsize<=LARGENUM&&numpids>=s->pidbufsize){
+ s->pidbufsize*=2;
+ s->pids=realloc(s->pids,s->pidbufsize,pid_t);
+ numpids=proc_listallpids(s->pids,s->pidbufsize*sizeof(pid_t));
+ }
+ serialise8(buf8,numpids);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8);
+
+ for(i64 index=0;index<numpids;index++) {
+ pid_t pid=s->pids[index];
+
+ struct proc_taskinfo info;
+ int result=proc_pidinfo(pid,PROC_PIDTASKINFO,0,&info,sizeof(info));
+ if(result!=sizeof info)continue;
+
+ serialise4(buf8,pid);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,4);
+
+ size_t namelen=proc_name(pid,s->namebuf,s->namebufsize);
+ while(s->namebufsize<=LARGENUM&&namelen==0){
+ s->namebufsize*=2;
+ s->namebuf=realloc(s->namebuf,s->namebufsize,u8);
+ namelen=proc_name(pid,s->namebuf,s->namebufsize);
+ }
+ if(namelen==0){
+ s->namebuf[0]='\0';
+ }
+
+ fprintf(stderr,"index=%" PRIi64 " pid=%d namelen=%zu\n",index,pid,namelen);
+ serialise8(buf8,namelen);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,s->namebuf,namelen);
+
+ serialise8(buf8,info.pti_total_user);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8);
+ serialise8(buf8,info.pti_total_system);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8);
+ serialise8(buf8,info.pti_resident_size);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8);
+
+ // fprintf(stderr,"PID %u (%s): %llu %llu %llu\n",pid,namebuf,info.pti_total_user,info.pti_total_system,info.pti_resident_size);
+ }
+
+ zbuffer_write(s->z,s->writebuf,s->cursor);
+ s->cursor=0;
+
+ if(callback_error)return -1;
+ return 0;
+}
+
+int data_stream_flush(struct data_stream *s){
+ callback_error=false;
+ zbuffer_flush(s->z);
+ if(callback_error)return -1;
+ return 0;
+}
diff --git a/server/data_stream.h b/server/data_stream.h
new file mode 100644
index 0000000..c8baf18
--- /dev/null
+++ b/server/data_stream.h
@@ -0,0 +1,13 @@
+#pragma once
+
+#include "global.h"
+#include "zbuffer.h"
+
+
+struct data_stream;
+
+// Return NULL or -1 on error
+struct data_stream* data_stream_init(int fd);
+int data_stream_finish_destroy(struct data_stream *s);
+int data_stream_frame(struct data_stream *s);
+int data_stream_flush(struct data_stream *s); // can degrade compression
diff --git a/server/main.c b/server/main.c
new file mode 100644
index 0000000..3cf1ffd
--- /dev/null
+++ b/server/main.c
@@ -0,0 +1,200 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+#include <sys/time.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <sys/select.h>
+#include "global.h"
+#include "data_stream.h"
+#include "line_reader.h"
+#include "zbuffer.h"
+
+#define PORT 57575
+
+#define POLLDELAY (10*1000000)
+
+/*struct proc_taskinfo {
+ uint64_t pti_virtual_size; virtual memory size (bytes)
+ uint64_t pti_resident_size; resident memory size (bytes)
+ uint64_t pti_total_user; total time
+ uint64_t pti_total_system;
+ uint64_t pti_threads_user; existing threads only
+ uint64_t pti_threads_system;
+ int32_t pti_policy; default policy for new threads
+ int32_t pti_faults; number of page faults
+ int32_t pti_pageins; number of actual pageins
+ int32_t pti_cow_faults; number of copy-on-write faults
+ int32_t pti_messages_sent; number of messages sent
+ int32_t pti_messages_received; number of messages received
+ int32_t pti_syscalls_mach; number of mach system calls
+ int32_t pti_syscalls_unix; number of unix system calls
+ int32_t pti_csw; number of context switches
+ int32_t pti_threadnum; number of threads in the task
+ int32_t pti_numrunning; number of running threads
+ int32_t pti_priority; task priority
+};*/
+
+static i64 make_timestamp(void){
+ struct timeval tv;
+ gettimeofday(&tv,NULL);
+ return tv.tv_sec*(i64)1000000+tv.tv_usec;
+}
+
+static void connection_handler(int sock){
+ struct data_stream *stream=data_stream_init(sock);
+ struct line_reader *reader=line_reader_init(sock);
+
+ i64 polldelay=POLLDELAY;
+ i64 timeleft=polldelay;
+
+ bool do_send_frames=false;
+
+ bool do_quit=false;
+
+ while(true){
+ struct timeval timeout={timeleft/1000000,timeleft%1000000};
+ i64 before=make_timestamp();
+
+ fd_set inset;
+ FD_ZERO(&inset);
+ FD_SET(sock,&inset);
+ int ret=select(sock+1,&inset,NULL,NULL,do_send_frames?&timeout:NULL);
+ if(ret<0){
+ if(errno==EINTR)continue;
+ perror("select");
+ break;
+ }
+ if(ret==0||!FD_ISSET(sock,&inset)){ // timeout
+ if(do_send_frames){
+ if(data_stream_frame(stream)<0){
+ printf("Error sending data\n");
+ goto cleanup;
+ }
+ printf("."); fflush(stdout);
+ }
+ timeleft=polldelay;
+ continue;
+ }
+
+ i64 after=make_timestamp();
+ if(after<before){
+ printf("Time ran backwards?\n");
+ after=before;
+ }
+ timeleft-=after-before;
+ if(timeleft<0)timeleft=0;
+
+ char buf[1024];
+ i64 nr=read(sock,buf,sizeof buf);
+ if(nr<0){
+ if(errno==EINTR)continue;
+ perror("read");
+ break;
+ }
+ if(nr==0)break; // EOF
+ line_reader_supply_data(reader,buf,nr);
+
+ char *line;
+ while((line=line_reader_get_line(reader,false))!=NULL){
+ char *word=line;
+
+ printf("Read <%s>\n",line);
+
+ char *next=strchr(word,' ');
+ if(next!=NULL){
+ *next='\0';
+ next++;
+ }
+ if(strcmp(word,"delay")==0){
+ char *endp;
+ i64 v=strtoll(next,&endp,10);
+ if(*next=='\0'||*endp!='\0'||v<=1000){
+ free(line);
+ printf("Invalid number in delay\n");
+ continue;
+ }
+ polldelay=v;
+ printf("polldelay -> %" PRIi64 "\n",polldelay);
+ if(timeleft>polldelay)timeleft=polldelay;
+ } else if(strcmp(word,"start")==0){
+ do_send_frames=true;
+ printf("Sending enabled\n");
+ } else if(strcmp(word,"stop")==0){
+ do_send_frames=false;
+ printf("Sending disabled\n");
+ } else if(strcmp(word,"flush")==0){
+ printf("Flushing\n");
+ if(data_stream_flush(stream)<0){
+ printf("Error flushing\n");
+ goto cleanup;
+ }
+ } else if(strcmp(word,"quit")==0){
+ printf("Quitting by request\n");
+ do_quit=true;
+ goto cleanup;
+ } else {
+ printf("Read unrecognised line <%s>!\n",line);
+ }
+
+ free(line);
+ }
+ }
+
+cleanup:
+ data_stream_finish_destroy(stream);
+ if(do_quit)exit(0);
+}
+
+int main(void){
+ int sock=socket(PF_INET,SOCK_STREAM,0);
+ if(sock<0){
+ perror("socket");
+ return 1;
+ }
+
+ int one=1;
+ if(setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&one,sizeof(int))<0){
+ perror("setsockopt");
+ return 1;
+ }
+
+ struct sockaddr_in sin;
+ sin.sin_family=AF_INET;
+ sin.sin_addr.s_addr=htonl(INADDR_ANY);
+ sin.sin_port=htons(PORT);
+
+ if(bind(sock,(struct sockaddr*)&sin,sizeof(sin))<0){
+ close(sock);
+ perror("bind");
+ return 1;
+ }
+
+ if(listen(sock,1)<0){
+ perror("listen");
+ return 1;
+ }
+
+ printf("Listening on port %d\n",PORT);
+
+ while(true){
+ struct sockaddr_storage client_addr;
+ socklen_t client_addr_sz=sizeof(client_addr);
+ int clientsock=accept(sock,(struct sockaddr*)&client_addr,&client_addr_sz);
+ if(clientsock<0){
+ perror("accept");
+ continue;
+ }
+
+ char str[INET_ADDRSTRLEN];
+ inet_ntop(client_addr.ss_family,&((struct sockaddr_in*)&client_addr)->sin_addr,
+ str,sizeof(str));
+
+ printf("Accept from %s\n",str);
+ connection_handler(clientsock);
+ close(clientsock);
+ }
+}
diff --git a/server/pstat.py b/server/pstat.py
new file mode 100755
index 0000000..eed6528
--- /dev/null
+++ b/server/pstat.py
@@ -0,0 +1,14 @@
+#!/usr/bin/env python3
+import psutil, os, sys, time
+
+t=time.clock_gettime(time.CLOCK_MONOTONIC)
+
+for p in psutil.process_iter():
+ try:
+ with p.oneshot():
+ print(p.pid,p.username(),p.name(),p.memory_info().rss,p.cpu_times())
+ except Exception as e:
+ pass
+
+t=time.clock_gettime(time.CLOCK_MONOTONIC)-t
+print(t,file=sys.stderr)
diff --git a/server/zbuffer.c b/server/zbuffer.c
new file mode 100644
index 0000000..af37f74
--- /dev/null
+++ b/server/zbuffer.c
@@ -0,0 +1,79 @@
+#include <assert.h>
+#include <zlib.h>
+#include "zbuffer.h"
+
+#define BUFSZ_ZOUT (256*1024)
+
+
+struct zbuffer {
+ u8 *out;
+ i64 outfill;
+ z_stream strm;
+
+ zbuffer_writefunc *wf;
+ void *payload;
+};
+
+struct zbuffer* zbuffer_init(zbuffer_writefunc *wf,void *payload){
+ struct zbuffer *z=malloc(1,struct zbuffer);
+ z->out=malloc(BUFSZ_ZOUT,u8);
+ z->outfill=0;
+
+ z->strm.zalloc=Z_NULL;
+ z->strm.zfree=Z_NULL;
+ z->strm.opaque=Z_NULL;
+
+ // so zbuffer_write can start writing immediately
+ z->strm.avail_out=BUFSZ_ZOUT;
+ z->strm.next_out=z->out;
+
+ if(deflateInit(&z->strm,Z_DEFAULT_COMPRESSION)!=Z_OK){
+ return NULL;
+ }
+
+ z->wf=wf;
+ z->payload=payload;
+ return z;
+}
+
+void zbuffer_write(struct zbuffer *z,u8 *data,i64 len){
+ z->strm.avail_in=len;
+ z->strm.next_in=data;
+ assert(z->strm.avail_out>0);
+ while(true){
+ int ret=deflate(&z->strm,Z_NO_FLUSH);
+ assert(ret==Z_OK);
+ if(z->strm.avail_out>0)break;
+ z->wf(z->out,BUFSZ_ZOUT,z->payload);
+ z->strm.avail_out=BUFSZ_ZOUT;
+ z->strm.next_out=z->out;
+ }
+ assert(z->strm.avail_in==0);
+}
+
+void zbuffer_flush(struct zbuffer *z){
+ assert(z->strm.avail_out>0);
+ while(true){
+ int ret=deflate(&z->strm,Z_SYNC_FLUSH);
+ assert(ret==Z_OK);
+ z->wf(z->out,BUFSZ_ZOUT-z->strm.avail_out,z->payload);
+ bool done=z->strm.avail_out>0;
+ z->strm.avail_out=BUFSZ_ZOUT;
+ z->strm.next_out=z->out;
+ if(done)break;
+ }
+}
+
+void zbuffer_finish_destroy(struct zbuffer *z){
+ assert(z->strm.avail_out>0);
+ while(true){
+ int ret=deflate(&z->strm,Z_FINISH);
+ assert(ret==Z_OK||ret==Z_STREAM_END);
+ z->wf(z->out,BUFSZ_ZOUT-z->strm.avail_out,z->payload);
+ if(z->strm.avail_out>0)break;
+ z->strm.avail_out=BUFSZ_ZOUT;
+ z->strm.next_out=z->out;
+ }
+ free(z->out);
+ free(z);
+}
diff --git a/server/zbuffer.h b/server/zbuffer.h
new file mode 100644
index 0000000..2e9f707
--- /dev/null
+++ b/server/zbuffer.h
@@ -0,0 +1,13 @@
+#pragma once
+
+#include "global.h"
+
+
+typedef void zbuffer_writefunc(u8*,i64,void*);
+
+struct zbuffer;
+
+struct zbuffer* zbuffer_init(zbuffer_writefunc *wf,void *payload);
+void zbuffer_write(struct zbuffer *z,u8 *data,i64 len);
+void zbuffer_flush(struct zbuffer *z); // can degrade compression
+void zbuffer_finish_destroy(struct zbuffer *z);