From 7549d07933091417b225d094c1648e1382287f93 Mon Sep 17 00:00:00 2001 From: tomsmeding Date: Wed, 11 Oct 2017 08:32:09 +0200 Subject: Initial --- .gitignore | 1 + Makefile | 21 +++++ controller/.gitignore | 1 + controller/Makefile | 28 ++++++ controller/main.c | 245 +++++++++++++++++++++++++++++++++++++++++++++++++ controller/unzbuffer.c | 95 +++++++++++++++++++ controller/unzbuffer.h | 14 +++ global.h | 18 ++++ line_reader.c | 54 +++++++++++ line_reader.h | 11 +++ memory.c | 33 +++++++ memory.h | 19 ++++ protocol.txt | 16 ++++ server/.gitignore | 1 + server/Makefile | 28 ++++++ server/data_stream.c | 156 +++++++++++++++++++++++++++++++ server/data_stream.h | 13 +++ server/main.c | 200 ++++++++++++++++++++++++++++++++++++++++ server/pstat.py | 14 +++ server/zbuffer.c | 79 ++++++++++++++++ server/zbuffer.h | 13 +++ 21 files changed, 1060 insertions(+) create mode 100644 .gitignore create mode 100644 Makefile create mode 100644 controller/.gitignore create mode 100644 controller/Makefile create mode 100644 controller/main.c create mode 100644 controller/unzbuffer.c create mode 100644 controller/unzbuffer.h create mode 100644 global.h create mode 100644 line_reader.c create mode 100644 line_reader.h create mode 100644 memory.c create mode 100644 memory.h create mode 100644 protocol.txt create mode 100644 server/.gitignore create mode 100644 server/Makefile create mode 100644 server/data_stream.c create mode 100644 server/data_stream.h create mode 100644 server/main.c create mode 100755 server/pstat.py create mode 100644 server/zbuffer.c create mode 100644 server/zbuffer.h diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b672fde --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +obj diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..7d82ff5 --- /dev/null +++ b/Makefile @@ -0,0 +1,21 @@ +CC = gcc +CFLAGS = -Wall -Wextra -std=c11 -g -O2 -fwrapv +LDFLAGS = -lz + +OBJDIR = obj + +.PHONY: all clean + +.SUFFIXES: + +all: $(patsubst %.c,$(OBJDIR)/%.o,$(wildcard *.c)) + +clean: + @echo "Cleaning" + @rm -rf $(OBJDIR) + + +$(OBJDIR)/%.o: %.c $(wildcard *.h) + @mkdir -p $(OBJDIR) + @echo "CC $<" + @$(CC) $(CFLAGS) -c -o $@ $< diff --git a/controller/.gitignore b/controller/.gitignore new file mode 100644 index 0000000..97f90d8 --- /dev/null +++ b/controller/.gitignore @@ -0,0 +1 @@ +controller diff --git a/controller/Makefile b/controller/Makefile new file mode 100644 index 0000000..7282260 --- /dev/null +++ b/controller/Makefile @@ -0,0 +1,28 @@ +CC = gcc +CFLAGS = -Wall -Wextra -std=c11 -g -O2 -fwrapv -I.. +LDFLAGS = -lz +TARGET = controller + +OBJDIR = obj + +.PHONY: all clean + +.SUFFIXES: + +all: $(TARGET) + +clean: + @echo "Cleaning" + @rm -f $(TARGET) + @rm -rf $(OBJDIR) + + +$(OBJDIR)/%.o: %.c $(wildcard *.h) + @mkdir -p $(OBJDIR) + @echo "CC $<" + @$(CC) $(CFLAGS) -c -o $@ $< + +$(TARGET): $(patsubst %.c,$(OBJDIR)/%.o,$(wildcard *.c)) + @make -q -C .. || make -C .. --no-print-directory + @echo "LD -o $@" + @$(CC) -o $@ $^ $(wildcard ../$(OBJDIR)/*.o) $(LDFLAGS) diff --git a/controller/main.c b/controller/main.c new file mode 100644 index 0000000..2e37e76 --- /dev/null +++ b/controller/main.c @@ -0,0 +1,245 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "global.h" +#include "line_reader.h" +#include "unzbuffer.h" + + +static bool writeall(int sock,const char *data,i64 len){ + i64 cursor=0; + while(cursorstamp.tv_sec),frame->numpids); +} + +static i64 read_frame(const u8 *buffer,i64 len){ + if(len<20)return 0; + struct data_frame *frame=malloc(1,struct data_frame); + frame->stamp.tv_sec=deserialise8(buffer); + frame->stamp.tv_usec=deserialise4(buffer+8); + i64 numpids=frame->numpids=deserialise8(buffer+12); + i64 cursor=20; + fprintf(stderr,"numpids=%" PRIi64 "\n",numpids); + if(len-cursorprocs=calloc(numpids,struct frame_process); + + i64 pi; + for(pi=0;pinumpids;pi++){ + fprintf(stderr,"pi=%" PRIi64 "\n",pi); + if(len-cursor<36){ + fprintf(stderr,"insuff: pi\n"); + goto insufficient_data; + } + frame->procs[pi].pid=deserialise4(buffer+cursor); + cursor+=4; + i64 namelen=frame->procs[pi].namelen=deserialise8(buffer+cursor); + cursor+=8; + fprintf(stderr," pid=%d namelen=%" PRIi64 "\n",frame->procs[pi].pid,namelen); + if(len-cursorprocs[pi].name=malloc(namelen+1,char); + memcpy(frame->procs[pi].name,buffer+cursor,namelen); + cursor+=namelen; + frame->procs[pi].usertime=deserialise8(buffer+cursor); + cursor+=8; + frame->procs[pi].systime=deserialise8(buffer+cursor); + cursor+=8; + frame->procs[pi].rss=deserialise8(buffer+cursor); + cursor+=8; + } + + process_frame(frame); + for(i64 i=0;iprocs[i].name); + free(frame->procs); + free(frame); + return cursor; + +insufficient_data: + if(frame->procs){ + for(i64 i=0;i<=pi;i++){ + if(frame->procs[i].name)free(frame->procs[i].name); + } + free(frame->procs); + } + free(frame); + return 0; +} + +static void frame_data_sink(u8 *newdata,i64 newdatalen,void *payload){ + (void)payload; + static u8 *buffer=NULL; + static i64 buffersize=0,bufferlen=0; + + fprintf(stderr,"frame_data_sink(len=%" PRIi64 ")\n",newdatalen); + + if(buffer==NULL){ + buffersize=4096; + buffer=malloc(buffersize,u8); + } + + if(bufferlen+newdatalen>buffersize){ + do buffersize*=2; while(bufferlen+newdatalen>buffersize); + buffer=realloc(buffer,buffersize,u8); + } + memcpy(buffer+bufferlen,newdata,newdatalen); + bufferlen+=newdatalen; + + i64 cursor=0; + while(cursor [port=57575]\n",argv[0]); + return 1; + } + + struct addrinfo hints,*res; + memset(&hints,0,sizeof(hints)); + hints.ai_family=AF_UNSPEC; + hints.ai_socktype=SOCK_STREAM; + int ret=getaddrinfo(argv[1],argc==3?argv[2]:"57575",&hints,&res); + if(ret!=0){ + fprintf(stderr,"getaddrinfo: %s\n",gai_strerror(ret)); + return 1; + } + + int sock=-1; + for(struct addrinfo *r=res;r!=NULL;r=r->ai_next){ + char buf[256]; + inet_ntop(r->ai_family,&((struct sockaddr_in*)r->ai_addr)->sin_addr,buf,sizeof buf); + fprintf(stderr,"Trying: %s\n",buf); + sock=socket(r->ai_family,r->ai_socktype,r->ai_protocol); + if(sock<0){ + sock=-1; + perror("socket"); + continue; + } + if(connect(sock,r->ai_addr,r->ai_addrlen)<0){ + close(sock); + sock=-1; + perror("connect"); + continue; + } + break; + } + freeaddrinfo(res); + if(sock==-1)return 1; + fprintf(stderr,"Connected.\n"); + + struct line_reader *liner=line_reader_init(STDIN_FILENO); + struct unzbuffer *z=unzbuffer_init(frame_data_sink,NULL); + + bool haderror=false; + while(true){ + fd_set inset; + FD_ZERO(&inset); + FD_SET(STDIN_FILENO,&inset); + FD_SET(sock,&inset); + int ret=select(sock+1,&inset,NULL,NULL,NULL); + if(ret<0){ + if(errno==EINTR)continue; + perror("select"); + break; + } + + if(FD_ISSET(STDIN_FILENO,&inset)){ + char buf[1024]; + i64 nr=read(STDIN_FILENO,buf,sizeof buf); + if(nr<0){ + if(errno==EINTR)continue; + perror("read"); + break; + } + if(nr==0)break; // EOF + line_reader_supply_data(liner,buf,nr); + char *line; + while((line=line_reader_get_line(liner,true))!=NULL){ + if(!writeall(sock,line,strlen(line))){ + fprintf(stderr,"Socket write error\n"); + goto cleanup; + } + // printf("Written <%s>\n",line); + free(line); + } + } + + if(FD_ISSET(sock,&inset)){ + u8 buf[1024]; + i64 nr=read(sock,buf,sizeof buf); + if(nr<0){ + if(errno==EINTR)continue; + perror("read"); + break; + } + if(nr==0){ + fprintf(stderr,"End-of-file on socket\n"); + break; + } + fprintf(stderr,"unzbuffer_write(nr=%" PRIi64 ")\n",nr); + if(!unzbuffer_write(z,buf,nr)){ + haderror=true; + break; + } + } + } + +cleanup: + line_reader_destroy(liner); + if(!unzbuffer_finish_destroy(z))return 1; + if(haderror)return 1; + return 0; +} diff --git a/controller/unzbuffer.c b/controller/unzbuffer.c new file mode 100644 index 0000000..ecd38a6 --- /dev/null +++ b/controller/unzbuffer.c @@ -0,0 +1,95 @@ +#include +#include +#include +#include "unzbuffer.h" + +#define BUFSZ_ZOUT (256*1024) + + +struct unzbuffer { + u8 *out; + i64 outfill; + z_stream strm; + + bool just_resetted; + + unzbuffer_writefunc *wf; + void *payload; +}; + +struct unzbuffer* unzbuffer_init(unzbuffer_writefunc *wf,void *payload){ + struct unzbuffer *z=malloc(1,struct unzbuffer); + z->out=malloc(BUFSZ_ZOUT,u8); + z->outfill=0; + + z->strm.zalloc=Z_NULL; + z->strm.zfree=Z_NULL; + z->strm.opaque=Z_NULL; + z->strm.avail_in=0; + z->strm.next_in=Z_NULL; + + // so unzbuffer_write can start writing immediately + z->strm.avail_out=BUFSZ_ZOUT; + z->strm.next_out=z->out; + + if(inflateInit(&z->strm)!=Z_OK){ + return NULL; + } + + z->just_resetted=true; + + z->wf=wf; + z->payload=payload; + return z; +} + +bool unzbuffer_write(struct unzbuffer *z,u8 *data,i64 len){ + z->just_resetted=false; + z->strm.avail_in=len; + z->strm.next_in=data; + assert(z->strm.avail_out>0); + while(true){ + int ret=inflate(&z->strm,Z_NO_FLUSH); + if(ret!=Z_OK&&ret!=Z_STREAM_END){ + inflateEnd(&z->strm); + fprintf(stderr,"unz error: %d\n",ret); + return false; + } + if(ret!=Z_STREAM_END&&z->strm.avail_out>0)break; + z->wf(z->out,BUFSZ_ZOUT,z->payload); + if(ret==Z_STREAM_END){ + inflateReset(&z->strm); + z->strm.avail_in=0; + z->strm.next_in=Z_NULL; + z->strm.avail_out=BUFSZ_ZOUT; + z->strm.next_out=z->out; + z->just_resetted=true; + break; + } + z->strm.avail_out=BUFSZ_ZOUT; + z->strm.next_out=z->out; + } + assert(z->strm.avail_in==0); + return true; +} + +bool unzbuffer_finish_destroy(struct unzbuffer *z){ + if(!z->just_resetted){ + assert(z->strm.avail_out>0); + while(true){ + int ret=inflate(&z->strm,Z_FINISH); + if(ret!=Z_OK&&ret!=Z_STREAM_END){ + inflateEnd(&z->strm); + fprintf(stderr,"unz error: %d\n",ret); + return false; + } + z->wf(z->out,BUFSZ_ZOUT-z->strm.avail_out,z->payload); + if(z->strm.avail_out>0)break; + z->strm.avail_out=BUFSZ_ZOUT; + z->strm.next_out=z->out; + } + } + free(z->out); + free(z); + return true; +} diff --git a/controller/unzbuffer.h b/controller/unzbuffer.h new file mode 100644 index 0000000..547d2af --- /dev/null +++ b/controller/unzbuffer.h @@ -0,0 +1,14 @@ +#pragma once + +#include "global.h" + + +typedef void unzbuffer_writefunc(u8*,i64,void*); + +struct unzbuffer; + +struct unzbuffer* unzbuffer_init(unzbuffer_writefunc *wf,void *payload); + +// Return false on invalid stream +bool unzbuffer_write(struct unzbuffer *z,u8 *data,i64 len); +bool unzbuffer_finish_destroy(struct unzbuffer *z); diff --git a/global.h b/global.h new file mode 100644 index 0000000..b0029c6 --- /dev/null +++ b/global.h @@ -0,0 +1,18 @@ +#pragma once + +#include +#include +#include +#include +#include "memory.h" + +typedef int8_t i8; +typedef uint8_t u8; +typedef int16_t i16; +typedef uint16_t u16; +typedef int32_t i32; +typedef uint32_t u32; +typedef int64_t i64; +typedef uint64_t u64; + +#define LARGENUM (INT_MAX/32) diff --git a/line_reader.c b/line_reader.c new file mode 100644 index 0000000..48c8355 --- /dev/null +++ b/line_reader.c @@ -0,0 +1,54 @@ +#include +#include +#include "line_reader.h" + + +struct line_reader { + int fd; + i64 bufcap,buflen; + char *buffer; +}; + +struct line_reader* line_reader_init(int fd){ + struct line_reader *r=malloc(1,struct line_reader); + r->fd=fd; + r->bufcap=1024; + r->buflen=0; + r->buffer=malloc(r->bufcap,char); + return r; +} + +void line_reader_supply_data(struct line_reader *r,const char *data,i64 len){ + bool do_realloc=false; + while(r->buflen+len>r->bufcap){ + if(r->bufcap*2>LARGENUM){ + fprintf(stderr,"Data too large in line_reader_supply_data!\n"); + return; + } + r->bufcap*=2; + do_realloc=true; + } + if(do_realloc){ + r->buffer=realloc(r->buffer,r->bufcap,char); + } + memcpy(r->buffer+r->buflen,data,len); + r->buflen+=len; +} + +char* line_reader_get_line(struct line_reader *r,bool include_lf){ + char *p; + p=memchr(r->buffer,'\n',r->buflen); + if(p==NULL)return NULL; + i64 idx=p-r->buffer; + char *str=malloc(idx+include_lf+1,char); + memcpy(str,r->buffer,idx+include_lf); + str[idx+include_lf]='\0'; + memmove(r->buffer,r->buffer+idx+1,r->buflen-idx-1); + r->buflen-=idx+1; + return str; +} + +void line_reader_destroy(struct line_reader *r){ + free(r->buffer); + free(r); +} diff --git a/line_reader.h b/line_reader.h new file mode 100644 index 0000000..898801f --- /dev/null +++ b/line_reader.h @@ -0,0 +1,11 @@ +#pragma once + +#include "global.h" + + +struct line_reader; + +struct line_reader* line_reader_init(int fd); +void line_reader_supply_data(struct line_reader *r,const char *data,i64 len); +char* line_reader_get_line(struct line_reader *r,bool include_lf); // call until NULL +void line_reader_destroy(struct line_reader *r); diff --git a/memory.c b/memory.c new file mode 100644 index 0000000..8734ab2 --- /dev/null +++ b/memory.c @@ -0,0 +1,33 @@ +#define _GNU_SOURCE +#include +#include +#include +#include "global.h" +#include "memory.h" + +void* check_after_allocation(const char *func,size_t num,size_t sz,void *ptr){ + if(ptr==NULL){ + fprintf(stderr,"Allocation failed: %s(%zu * %zuB = %zu)",func,num,sz,num*sz); + exit(1); + } + return ptr; +} + +void* check_after_allocation_str(const char *func,void *ptr){ + if(ptr==NULL){ + fprintf(stderr,"Allocation failed: %s()",func); + exit(1); + } + return ptr; +} + +__attribute__((format (printf, 2, 3))) +int memory_asprintf_wrapper(char **ret,const char *format,...){ + assert(ret!=NULL); + va_list ap; + va_start(ap,format); + int len=vasprintf(ret,format,ap); + va_end(ap); + check_after_allocation_str("asprintf",*ret); + return len; +} diff --git a/memory.h b/memory.h new file mode 100644 index 0000000..9175dea --- /dev/null +++ b/memory.h @@ -0,0 +1,19 @@ +#pragma once + +#include + +#define malloc(num,type) \ + ((type*)check_after_allocation("malloc",num,sizeof(type),malloc((num)*sizeof(type)))) +#define calloc(num,type) \ + ((type*)check_after_allocation("calloc",num,sizeof(type),calloc((num),sizeof(type)))) +#define realloc(ptr,num,type) \ + ((type*)check_after_allocation("realloc",num,sizeof(type),realloc((ptr),(num)*sizeof(type)))) +#define strdup(str) \ + ((char*)check_after_allocation_str("strdup",strdup(str))) +#define asprintf(...) \ + (memory_asprintf_wrapper(__VA_ARGS__)) + +void* check_after_allocation(const char *func,size_t num,size_t sz,void *ptr); +void* check_after_allocation_str(const char *func,void *ptr); + +int memory_asprintf_wrapper(char **ret,const char *format,...) __attribute__((format (printf, 2, 3))); diff --git a/protocol.txt b/protocol.txt new file mode 100644 index 0000000..fda2640 --- /dev/null +++ b/protocol.txt @@ -0,0 +1,16 @@ +controller -> server: +Just lines of text, see either program's main.c. + +server -> controller: +Data frames compressed using zlib. Data frames look as follows: +- Timestamp: (as from gettimeofday) + - seconds (8 bytes) + - microseconds (4 bytes) +- numpids (8 bytes) +- [numpids times:] + - pid (4 bytes) + - namelen (8 bytes) + - process name (namelen bytes) + - total user time (8 bytes) + - total system time (8 bytes) + - resident size (8 bytes) diff --git a/server/.gitignore b/server/.gitignore new file mode 100644 index 0000000..3fbc411 --- /dev/null +++ b/server/.gitignore @@ -0,0 +1 @@ +pstat diff --git a/server/Makefile b/server/Makefile new file mode 100644 index 0000000..24cf554 --- /dev/null +++ b/server/Makefile @@ -0,0 +1,28 @@ +CC = gcc +CFLAGS = -Wall -Wextra -std=c11 -g -O2 -fwrapv -I.. +LDFLAGS = -lz +TARGET = pstat + +OBJDIR = obj + +.PHONY: all clean + +.SUFFIXES: + +all: $(TARGET) + +clean: + @echo "Cleaning" + @rm -f $(TARGET) + @rm -rf $(OBJDIR) + + +$(OBJDIR)/%.o: %.c $(wildcard *.h) + @mkdir -p $(OBJDIR) + @echo "CC $<" + @$(CC) $(CFLAGS) -c -o $@ $< + +$(TARGET): $(patsubst %.c,$(OBJDIR)/%.o,$(wildcard *.c)) + @make -q -C .. || make -C .. --no-print-directory + @echo "LD -o $@" + @$(CC) -o $@ $^ $(wildcard ../$(OBJDIR)/*.o) $(LDFLAGS) diff --git a/server/data_stream.c b/server/data_stream.c new file mode 100644 index 0000000..e86114e --- /dev/null +++ b/server/data_stream.c @@ -0,0 +1,156 @@ +#include +#include +#include +#include +#include +#include +#include "data_stream.h" + + +struct data_stream { + int fd; + i64 pidbufsize; + pid_t *pids; + i64 namebufsize; + u8 *namebuf; + i64 writebufsize; + u8 *writebuf; + i64 cursor; + struct zbuffer *z; +}; + +static bool callback_error=false; + +static void write_data_fd(u8 *data,i64 len,void *fdp){ + int fd=*(int*)fdp; + i64 cursor=0; + while(cursor=*cap){ + if(*cap*2>LARGENUM)return; + *cap*=2; + *buf=realloc(*buf,*cap,u8); + } + memcpy(*buf+*cursor,data,datalen); + *cursor+=datalen; +} + +static void serialise8(u8 *buf,i64 value){ + for(i64 i=0;i<8;i++)buf[i]=(value>>(8*i))&0xff; +} + +static void serialise4(u8 *buf,i32 value){ + for(i64 i=0;i<4;i++)buf[i]=(value>>(8*i))&0xff; +} + +struct data_stream* data_stream_init(int fd){ + struct data_stream *s=malloc(1,struct data_stream); + s->fd=fd; + s->pidbufsize=1024; + s->pids=malloc(s->pidbufsize,pid_t); + s->namebufsize=64; + s->namebuf=malloc(s->namebufsize,u8); + s->writebufsize=4096; + s->writebuf=malloc(s->writebufsize,u8); + s->cursor=0; + s->z=zbuffer_init(write_data_fd,&s->fd); + + callback_error=false; + + return s; +} + +int data_stream_finish_destroy(struct data_stream *s){ + callback_error=false; + zbuffer_finish_destroy(s->z); + free(s->pids); + free(s->namebuf); + free(s->writebuf); + free(s); + if(callback_error)return -1; + return 0; +} + +int data_stream_frame(struct data_stream *s){ + callback_error=false; + + u8 buf8[8]; + + struct timeval timeval; + if(gettimeofday(&timeval,NULL)<0){ + perror("gettimeofday"); + timeval.tv_sec=0; + timeval.tv_usec=0; + } + serialise8(buf8,timeval.tv_sec); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); + serialise4(buf8,timeval.tv_usec); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,4); + + i64 numpids=proc_listallpids(s->pids,s->pidbufsize*sizeof(pid_t)); + while(s->pidbufsize<=LARGENUM&&numpids>=s->pidbufsize){ + s->pidbufsize*=2; + s->pids=realloc(s->pids,s->pidbufsize,pid_t); + numpids=proc_listallpids(s->pids,s->pidbufsize*sizeof(pid_t)); + } + serialise8(buf8,numpids); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); + + for(i64 index=0;indexpids[index]; + + struct proc_taskinfo info; + int result=proc_pidinfo(pid,PROC_PIDTASKINFO,0,&info,sizeof(info)); + if(result!=sizeof info)continue; + + serialise4(buf8,pid); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,4); + + size_t namelen=proc_name(pid,s->namebuf,s->namebufsize); + while(s->namebufsize<=LARGENUM&&namelen==0){ + s->namebufsize*=2; + s->namebuf=realloc(s->namebuf,s->namebufsize,u8); + namelen=proc_name(pid,s->namebuf,s->namebufsize); + } + if(namelen==0){ + s->namebuf[0]='\0'; + } + + fprintf(stderr,"index=%" PRIi64 " pid=%d namelen=%zu\n",index,pid,namelen); + serialise8(buf8,namelen); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,s->namebuf,namelen); + + serialise8(buf8,info.pti_total_user); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); + serialise8(buf8,info.pti_total_system); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); + serialise8(buf8,info.pti_resident_size); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); + + // fprintf(stderr,"PID %u (%s): %llu %llu %llu\n",pid,namebuf,info.pti_total_user,info.pti_total_system,info.pti_resident_size); + } + + zbuffer_write(s->z,s->writebuf,s->cursor); + s->cursor=0; + + if(callback_error)return -1; + return 0; +} + +int data_stream_flush(struct data_stream *s){ + callback_error=false; + zbuffer_flush(s->z); + if(callback_error)return -1; + return 0; +} diff --git a/server/data_stream.h b/server/data_stream.h new file mode 100644 index 0000000..c8baf18 --- /dev/null +++ b/server/data_stream.h @@ -0,0 +1,13 @@ +#pragma once + +#include "global.h" +#include "zbuffer.h" + + +struct data_stream; + +// Return NULL or -1 on error +struct data_stream* data_stream_init(int fd); +int data_stream_finish_destroy(struct data_stream *s); +int data_stream_frame(struct data_stream *s); +int data_stream_flush(struct data_stream *s); // can degrade compression diff --git a/server/main.c b/server/main.c new file mode 100644 index 0000000..3cf1ffd --- /dev/null +++ b/server/main.c @@ -0,0 +1,200 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "global.h" +#include "data_stream.h" +#include "line_reader.h" +#include "zbuffer.h" + +#define PORT 57575 + +#define POLLDELAY (10*1000000) + +/*struct proc_taskinfo { + uint64_t pti_virtual_size; virtual memory size (bytes) + uint64_t pti_resident_size; resident memory size (bytes) + uint64_t pti_total_user; total time + uint64_t pti_total_system; + uint64_t pti_threads_user; existing threads only + uint64_t pti_threads_system; + int32_t pti_policy; default policy for new threads + int32_t pti_faults; number of page faults + int32_t pti_pageins; number of actual pageins + int32_t pti_cow_faults; number of copy-on-write faults + int32_t pti_messages_sent; number of messages sent + int32_t pti_messages_received; number of messages received + int32_t pti_syscalls_mach; number of mach system calls + int32_t pti_syscalls_unix; number of unix system calls + int32_t pti_csw; number of context switches + int32_t pti_threadnum; number of threads in the task + int32_t pti_numrunning; number of running threads + int32_t pti_priority; task priority +};*/ + +static i64 make_timestamp(void){ + struct timeval tv; + gettimeofday(&tv,NULL); + return tv.tv_sec*(i64)1000000+tv.tv_usec; +} + +static void connection_handler(int sock){ + struct data_stream *stream=data_stream_init(sock); + struct line_reader *reader=line_reader_init(sock); + + i64 polldelay=POLLDELAY; + i64 timeleft=polldelay; + + bool do_send_frames=false; + + bool do_quit=false; + + while(true){ + struct timeval timeout={timeleft/1000000,timeleft%1000000}; + i64 before=make_timestamp(); + + fd_set inset; + FD_ZERO(&inset); + FD_SET(sock,&inset); + int ret=select(sock+1,&inset,NULL,NULL,do_send_frames?&timeout:NULL); + if(ret<0){ + if(errno==EINTR)continue; + perror("select"); + break; + } + if(ret==0||!FD_ISSET(sock,&inset)){ // timeout + if(do_send_frames){ + if(data_stream_frame(stream)<0){ + printf("Error sending data\n"); + goto cleanup; + } + printf("."); fflush(stdout); + } + timeleft=polldelay; + continue; + } + + i64 after=make_timestamp(); + if(after\n",line); + + char *next=strchr(word,' '); + if(next!=NULL){ + *next='\0'; + next++; + } + if(strcmp(word,"delay")==0){ + char *endp; + i64 v=strtoll(next,&endp,10); + if(*next=='\0'||*endp!='\0'||v<=1000){ + free(line); + printf("Invalid number in delay\n"); + continue; + } + polldelay=v; + printf("polldelay -> %" PRIi64 "\n",polldelay); + if(timeleft>polldelay)timeleft=polldelay; + } else if(strcmp(word,"start")==0){ + do_send_frames=true; + printf("Sending enabled\n"); + } else if(strcmp(word,"stop")==0){ + do_send_frames=false; + printf("Sending disabled\n"); + } else if(strcmp(word,"flush")==0){ + printf("Flushing\n"); + if(data_stream_flush(stream)<0){ + printf("Error flushing\n"); + goto cleanup; + } + } else if(strcmp(word,"quit")==0){ + printf("Quitting by request\n"); + do_quit=true; + goto cleanup; + } else { + printf("Read unrecognised line <%s>!\n",line); + } + + free(line); + } + } + +cleanup: + data_stream_finish_destroy(stream); + if(do_quit)exit(0); +} + +int main(void){ + int sock=socket(PF_INET,SOCK_STREAM,0); + if(sock<0){ + perror("socket"); + return 1; + } + + int one=1; + if(setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&one,sizeof(int))<0){ + perror("setsockopt"); + return 1; + } + + struct sockaddr_in sin; + sin.sin_family=AF_INET; + sin.sin_addr.s_addr=htonl(INADDR_ANY); + sin.sin_port=htons(PORT); + + if(bind(sock,(struct sockaddr*)&sin,sizeof(sin))<0){ + close(sock); + perror("bind"); + return 1; + } + + if(listen(sock,1)<0){ + perror("listen"); + return 1; + } + + printf("Listening on port %d\n",PORT); + + while(true){ + struct sockaddr_storage client_addr; + socklen_t client_addr_sz=sizeof(client_addr); + int clientsock=accept(sock,(struct sockaddr*)&client_addr,&client_addr_sz); + if(clientsock<0){ + perror("accept"); + continue; + } + + char str[INET_ADDRSTRLEN]; + inet_ntop(client_addr.ss_family,&((struct sockaddr_in*)&client_addr)->sin_addr, + str,sizeof(str)); + + printf("Accept from %s\n",str); + connection_handler(clientsock); + close(clientsock); + } +} diff --git a/server/pstat.py b/server/pstat.py new file mode 100755 index 0000000..eed6528 --- /dev/null +++ b/server/pstat.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python3 +import psutil, os, sys, time + +t=time.clock_gettime(time.CLOCK_MONOTONIC) + +for p in psutil.process_iter(): + try: + with p.oneshot(): + print(p.pid,p.username(),p.name(),p.memory_info().rss,p.cpu_times()) + except Exception as e: + pass + +t=time.clock_gettime(time.CLOCK_MONOTONIC)-t +print(t,file=sys.stderr) diff --git a/server/zbuffer.c b/server/zbuffer.c new file mode 100644 index 0000000..af37f74 --- /dev/null +++ b/server/zbuffer.c @@ -0,0 +1,79 @@ +#include +#include +#include "zbuffer.h" + +#define BUFSZ_ZOUT (256*1024) + + +struct zbuffer { + u8 *out; + i64 outfill; + z_stream strm; + + zbuffer_writefunc *wf; + void *payload; +}; + +struct zbuffer* zbuffer_init(zbuffer_writefunc *wf,void *payload){ + struct zbuffer *z=malloc(1,struct zbuffer); + z->out=malloc(BUFSZ_ZOUT,u8); + z->outfill=0; + + z->strm.zalloc=Z_NULL; + z->strm.zfree=Z_NULL; + z->strm.opaque=Z_NULL; + + // so zbuffer_write can start writing immediately + z->strm.avail_out=BUFSZ_ZOUT; + z->strm.next_out=z->out; + + if(deflateInit(&z->strm,Z_DEFAULT_COMPRESSION)!=Z_OK){ + return NULL; + } + + z->wf=wf; + z->payload=payload; + return z; +} + +void zbuffer_write(struct zbuffer *z,u8 *data,i64 len){ + z->strm.avail_in=len; + z->strm.next_in=data; + assert(z->strm.avail_out>0); + while(true){ + int ret=deflate(&z->strm,Z_NO_FLUSH); + assert(ret==Z_OK); + if(z->strm.avail_out>0)break; + z->wf(z->out,BUFSZ_ZOUT,z->payload); + z->strm.avail_out=BUFSZ_ZOUT; + z->strm.next_out=z->out; + } + assert(z->strm.avail_in==0); +} + +void zbuffer_flush(struct zbuffer *z){ + assert(z->strm.avail_out>0); + while(true){ + int ret=deflate(&z->strm,Z_SYNC_FLUSH); + assert(ret==Z_OK); + z->wf(z->out,BUFSZ_ZOUT-z->strm.avail_out,z->payload); + bool done=z->strm.avail_out>0; + z->strm.avail_out=BUFSZ_ZOUT; + z->strm.next_out=z->out; + if(done)break; + } +} + +void zbuffer_finish_destroy(struct zbuffer *z){ + assert(z->strm.avail_out>0); + while(true){ + int ret=deflate(&z->strm,Z_FINISH); + assert(ret==Z_OK||ret==Z_STREAM_END); + z->wf(z->out,BUFSZ_ZOUT-z->strm.avail_out,z->payload); + if(z->strm.avail_out>0)break; + z->strm.avail_out=BUFSZ_ZOUT; + z->strm.next_out=z->out; + } + free(z->out); + free(z); +} diff --git a/server/zbuffer.h b/server/zbuffer.h new file mode 100644 index 0000000..2e9f707 --- /dev/null +++ b/server/zbuffer.h @@ -0,0 +1,13 @@ +#pragma once + +#include "global.h" + + +typedef void zbuffer_writefunc(u8*,i64,void*); + +struct zbuffer; + +struct zbuffer* zbuffer_init(zbuffer_writefunc *wf,void *payload); +void zbuffer_write(struct zbuffer *z,u8 *data,i64 len); +void zbuffer_flush(struct zbuffer *z); // can degrade compression +void zbuffer_finish_destroy(struct zbuffer *z); -- cgit v1.2.3