From 7549d07933091417b225d094c1648e1382287f93 Mon Sep 17 00:00:00 2001 From: tomsmeding Date: Wed, 11 Oct 2017 08:32:09 +0200 Subject: Initial --- controller/.gitignore | 1 + controller/Makefile | 28 ++++++ controller/main.c | 245 +++++++++++++++++++++++++++++++++++++++++++++++++ controller/unzbuffer.c | 95 +++++++++++++++++++ controller/unzbuffer.h | 14 +++ 5 files changed, 383 insertions(+) create mode 100644 controller/.gitignore create mode 100644 controller/Makefile create mode 100644 controller/main.c create mode 100644 controller/unzbuffer.c create mode 100644 controller/unzbuffer.h (limited to 'controller') diff --git a/controller/.gitignore b/controller/.gitignore new file mode 100644 index 0000000..97f90d8 --- /dev/null +++ b/controller/.gitignore @@ -0,0 +1 @@ +controller diff --git a/controller/Makefile b/controller/Makefile new file mode 100644 index 0000000..7282260 --- /dev/null +++ b/controller/Makefile @@ -0,0 +1,28 @@ +CC = gcc +CFLAGS = -Wall -Wextra -std=c11 -g -O2 -fwrapv -I.. +LDFLAGS = -lz +TARGET = controller + +OBJDIR = obj + +.PHONY: all clean + +.SUFFIXES: + +all: $(TARGET) + +clean: + @echo "Cleaning" + @rm -f $(TARGET) + @rm -rf $(OBJDIR) + + +$(OBJDIR)/%.o: %.c $(wildcard *.h) + @mkdir -p $(OBJDIR) + @echo "CC $<" + @$(CC) $(CFLAGS) -c -o $@ $< + +$(TARGET): $(patsubst %.c,$(OBJDIR)/%.o,$(wildcard *.c)) + @make -q -C .. || make -C .. --no-print-directory + @echo "LD -o $@" + @$(CC) -o $@ $^ $(wildcard ../$(OBJDIR)/*.o) $(LDFLAGS) diff --git a/controller/main.c b/controller/main.c new file mode 100644 index 0000000..2e37e76 --- /dev/null +++ b/controller/main.c @@ -0,0 +1,245 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "global.h" +#include "line_reader.h" +#include "unzbuffer.h" + + +static bool writeall(int sock,const char *data,i64 len){ + i64 cursor=0; + while(cursorstamp.tv_sec),frame->numpids); +} + +static i64 read_frame(const u8 *buffer,i64 len){ + if(len<20)return 0; + struct data_frame *frame=malloc(1,struct data_frame); + frame->stamp.tv_sec=deserialise8(buffer); + frame->stamp.tv_usec=deserialise4(buffer+8); + i64 numpids=frame->numpids=deserialise8(buffer+12); + i64 cursor=20; + fprintf(stderr,"numpids=%" PRIi64 "\n",numpids); + if(len-cursorprocs=calloc(numpids,struct frame_process); + + i64 pi; + for(pi=0;pinumpids;pi++){ + fprintf(stderr,"pi=%" PRIi64 "\n",pi); + if(len-cursor<36){ + fprintf(stderr,"insuff: pi\n"); + goto insufficient_data; + } + frame->procs[pi].pid=deserialise4(buffer+cursor); + cursor+=4; + i64 namelen=frame->procs[pi].namelen=deserialise8(buffer+cursor); + cursor+=8; + fprintf(stderr," pid=%d namelen=%" PRIi64 "\n",frame->procs[pi].pid,namelen); + if(len-cursorprocs[pi].name=malloc(namelen+1,char); + memcpy(frame->procs[pi].name,buffer+cursor,namelen); + cursor+=namelen; + frame->procs[pi].usertime=deserialise8(buffer+cursor); + cursor+=8; + frame->procs[pi].systime=deserialise8(buffer+cursor); + cursor+=8; + frame->procs[pi].rss=deserialise8(buffer+cursor); + cursor+=8; + } + + process_frame(frame); + for(i64 i=0;iprocs[i].name); + free(frame->procs); + free(frame); + return cursor; + +insufficient_data: + if(frame->procs){ + for(i64 i=0;i<=pi;i++){ + if(frame->procs[i].name)free(frame->procs[i].name); + } + free(frame->procs); + } + free(frame); + return 0; +} + +static void frame_data_sink(u8 *newdata,i64 newdatalen,void *payload){ + (void)payload; + static u8 *buffer=NULL; + static i64 buffersize=0,bufferlen=0; + + fprintf(stderr,"frame_data_sink(len=%" PRIi64 ")\n",newdatalen); + + if(buffer==NULL){ + buffersize=4096; + buffer=malloc(buffersize,u8); + } + + if(bufferlen+newdatalen>buffersize){ + do buffersize*=2; while(bufferlen+newdatalen>buffersize); + buffer=realloc(buffer,buffersize,u8); + } + memcpy(buffer+bufferlen,newdata,newdatalen); + bufferlen+=newdatalen; + + i64 cursor=0; + while(cursor [port=57575]\n",argv[0]); + return 1; + } + + struct addrinfo hints,*res; + memset(&hints,0,sizeof(hints)); + hints.ai_family=AF_UNSPEC; + hints.ai_socktype=SOCK_STREAM; + int ret=getaddrinfo(argv[1],argc==3?argv[2]:"57575",&hints,&res); + if(ret!=0){ + fprintf(stderr,"getaddrinfo: %s\n",gai_strerror(ret)); + return 1; + } + + int sock=-1; + for(struct addrinfo *r=res;r!=NULL;r=r->ai_next){ + char buf[256]; + inet_ntop(r->ai_family,&((struct sockaddr_in*)r->ai_addr)->sin_addr,buf,sizeof buf); + fprintf(stderr,"Trying: %s\n",buf); + sock=socket(r->ai_family,r->ai_socktype,r->ai_protocol); + if(sock<0){ + sock=-1; + perror("socket"); + continue; + } + if(connect(sock,r->ai_addr,r->ai_addrlen)<0){ + close(sock); + sock=-1; + perror("connect"); + continue; + } + break; + } + freeaddrinfo(res); + if(sock==-1)return 1; + fprintf(stderr,"Connected.\n"); + + struct line_reader *liner=line_reader_init(STDIN_FILENO); + struct unzbuffer *z=unzbuffer_init(frame_data_sink,NULL); + + bool haderror=false; + while(true){ + fd_set inset; + FD_ZERO(&inset); + FD_SET(STDIN_FILENO,&inset); + FD_SET(sock,&inset); + int ret=select(sock+1,&inset,NULL,NULL,NULL); + if(ret<0){ + if(errno==EINTR)continue; + perror("select"); + break; + } + + if(FD_ISSET(STDIN_FILENO,&inset)){ + char buf[1024]; + i64 nr=read(STDIN_FILENO,buf,sizeof buf); + if(nr<0){ + if(errno==EINTR)continue; + perror("read"); + break; + } + if(nr==0)break; // EOF + line_reader_supply_data(liner,buf,nr); + char *line; + while((line=line_reader_get_line(liner,true))!=NULL){ + if(!writeall(sock,line,strlen(line))){ + fprintf(stderr,"Socket write error\n"); + goto cleanup; + } + // printf("Written <%s>\n",line); + free(line); + } + } + + if(FD_ISSET(sock,&inset)){ + u8 buf[1024]; + i64 nr=read(sock,buf,sizeof buf); + if(nr<0){ + if(errno==EINTR)continue; + perror("read"); + break; + } + if(nr==0){ + fprintf(stderr,"End-of-file on socket\n"); + break; + } + fprintf(stderr,"unzbuffer_write(nr=%" PRIi64 ")\n",nr); + if(!unzbuffer_write(z,buf,nr)){ + haderror=true; + break; + } + } + } + +cleanup: + line_reader_destroy(liner); + if(!unzbuffer_finish_destroy(z))return 1; + if(haderror)return 1; + return 0; +} diff --git a/controller/unzbuffer.c b/controller/unzbuffer.c new file mode 100644 index 0000000..ecd38a6 --- /dev/null +++ b/controller/unzbuffer.c @@ -0,0 +1,95 @@ +#include +#include +#include +#include "unzbuffer.h" + +#define BUFSZ_ZOUT (256*1024) + + +struct unzbuffer { + u8 *out; + i64 outfill; + z_stream strm; + + bool just_resetted; + + unzbuffer_writefunc *wf; + void *payload; +}; + +struct unzbuffer* unzbuffer_init(unzbuffer_writefunc *wf,void *payload){ + struct unzbuffer *z=malloc(1,struct unzbuffer); + z->out=malloc(BUFSZ_ZOUT,u8); + z->outfill=0; + + z->strm.zalloc=Z_NULL; + z->strm.zfree=Z_NULL; + z->strm.opaque=Z_NULL; + z->strm.avail_in=0; + z->strm.next_in=Z_NULL; + + // so unzbuffer_write can start writing immediately + z->strm.avail_out=BUFSZ_ZOUT; + z->strm.next_out=z->out; + + if(inflateInit(&z->strm)!=Z_OK){ + return NULL; + } + + z->just_resetted=true; + + z->wf=wf; + z->payload=payload; + return z; +} + +bool unzbuffer_write(struct unzbuffer *z,u8 *data,i64 len){ + z->just_resetted=false; + z->strm.avail_in=len; + z->strm.next_in=data; + assert(z->strm.avail_out>0); + while(true){ + int ret=inflate(&z->strm,Z_NO_FLUSH); + if(ret!=Z_OK&&ret!=Z_STREAM_END){ + inflateEnd(&z->strm); + fprintf(stderr,"unz error: %d\n",ret); + return false; + } + if(ret!=Z_STREAM_END&&z->strm.avail_out>0)break; + z->wf(z->out,BUFSZ_ZOUT,z->payload); + if(ret==Z_STREAM_END){ + inflateReset(&z->strm); + z->strm.avail_in=0; + z->strm.next_in=Z_NULL; + z->strm.avail_out=BUFSZ_ZOUT; + z->strm.next_out=z->out; + z->just_resetted=true; + break; + } + z->strm.avail_out=BUFSZ_ZOUT; + z->strm.next_out=z->out; + } + assert(z->strm.avail_in==0); + return true; +} + +bool unzbuffer_finish_destroy(struct unzbuffer *z){ + if(!z->just_resetted){ + assert(z->strm.avail_out>0); + while(true){ + int ret=inflate(&z->strm,Z_FINISH); + if(ret!=Z_OK&&ret!=Z_STREAM_END){ + inflateEnd(&z->strm); + fprintf(stderr,"unz error: %d\n",ret); + return false; + } + z->wf(z->out,BUFSZ_ZOUT-z->strm.avail_out,z->payload); + if(z->strm.avail_out>0)break; + z->strm.avail_out=BUFSZ_ZOUT; + z->strm.next_out=z->out; + } + } + free(z->out); + free(z); + return true; +} diff --git a/controller/unzbuffer.h b/controller/unzbuffer.h new file mode 100644 index 0000000..547d2af --- /dev/null +++ b/controller/unzbuffer.h @@ -0,0 +1,14 @@ +#pragma once + +#include "global.h" + + +typedef void unzbuffer_writefunc(u8*,i64,void*); + +struct unzbuffer; + +struct unzbuffer* unzbuffer_init(unzbuffer_writefunc *wf,void *payload); + +// Return false on invalid stream +bool unzbuffer_write(struct unzbuffer *z,u8 *data,i64 len); +bool unzbuffer_finish_destroy(struct unzbuffer *z); -- cgit v1.2.3-70-g09d2