From 7549d07933091417b225d094c1648e1382287f93 Mon Sep 17 00:00:00 2001 From: tomsmeding Date: Wed, 11 Oct 2017 08:32:09 +0200 Subject: Initial --- controller/main.c | 245 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 245 insertions(+) create mode 100644 controller/main.c (limited to 'controller/main.c') diff --git a/controller/main.c b/controller/main.c new file mode 100644 index 0000000..2e37e76 --- /dev/null +++ b/controller/main.c @@ -0,0 +1,245 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "global.h" +#include "line_reader.h" +#include "unzbuffer.h" + + +static bool writeall(int sock,const char *data,i64 len){ + i64 cursor=0; + while(cursorstamp.tv_sec),frame->numpids); +} + +static i64 read_frame(const u8 *buffer,i64 len){ + if(len<20)return 0; + struct data_frame *frame=malloc(1,struct data_frame); + frame->stamp.tv_sec=deserialise8(buffer); + frame->stamp.tv_usec=deserialise4(buffer+8); + i64 numpids=frame->numpids=deserialise8(buffer+12); + i64 cursor=20; + fprintf(stderr,"numpids=%" PRIi64 "\n",numpids); + if(len-cursorprocs=calloc(numpids,struct frame_process); + + i64 pi; + for(pi=0;pinumpids;pi++){ + fprintf(stderr,"pi=%" PRIi64 "\n",pi); + if(len-cursor<36){ + fprintf(stderr,"insuff: pi\n"); + goto insufficient_data; + } + frame->procs[pi].pid=deserialise4(buffer+cursor); + cursor+=4; + i64 namelen=frame->procs[pi].namelen=deserialise8(buffer+cursor); + cursor+=8; + fprintf(stderr," pid=%d namelen=%" PRIi64 "\n",frame->procs[pi].pid,namelen); + if(len-cursorprocs[pi].name=malloc(namelen+1,char); + memcpy(frame->procs[pi].name,buffer+cursor,namelen); + cursor+=namelen; + frame->procs[pi].usertime=deserialise8(buffer+cursor); + cursor+=8; + frame->procs[pi].systime=deserialise8(buffer+cursor); + cursor+=8; + frame->procs[pi].rss=deserialise8(buffer+cursor); + cursor+=8; + } + + process_frame(frame); + for(i64 i=0;iprocs[i].name); + free(frame->procs); + free(frame); + return cursor; + +insufficient_data: + if(frame->procs){ + for(i64 i=0;i<=pi;i++){ + if(frame->procs[i].name)free(frame->procs[i].name); + } + free(frame->procs); + } + free(frame); + return 0; +} + +static void frame_data_sink(u8 *newdata,i64 newdatalen,void *payload){ + (void)payload; + static u8 *buffer=NULL; + static i64 buffersize=0,bufferlen=0; + + fprintf(stderr,"frame_data_sink(len=%" PRIi64 ")\n",newdatalen); + + if(buffer==NULL){ + buffersize=4096; + buffer=malloc(buffersize,u8); + } + + if(bufferlen+newdatalen>buffersize){ + do buffersize*=2; while(bufferlen+newdatalen>buffersize); + buffer=realloc(buffer,buffersize,u8); + } + memcpy(buffer+bufferlen,newdata,newdatalen); + bufferlen+=newdatalen; + + i64 cursor=0; + while(cursor [port=57575]\n",argv[0]); + return 1; + } + + struct addrinfo hints,*res; + memset(&hints,0,sizeof(hints)); + hints.ai_family=AF_UNSPEC; + hints.ai_socktype=SOCK_STREAM; + int ret=getaddrinfo(argv[1],argc==3?argv[2]:"57575",&hints,&res); + if(ret!=0){ + fprintf(stderr,"getaddrinfo: %s\n",gai_strerror(ret)); + return 1; + } + + int sock=-1; + for(struct addrinfo *r=res;r!=NULL;r=r->ai_next){ + char buf[256]; + inet_ntop(r->ai_family,&((struct sockaddr_in*)r->ai_addr)->sin_addr,buf,sizeof buf); + fprintf(stderr,"Trying: %s\n",buf); + sock=socket(r->ai_family,r->ai_socktype,r->ai_protocol); + if(sock<0){ + sock=-1; + perror("socket"); + continue; + } + if(connect(sock,r->ai_addr,r->ai_addrlen)<0){ + close(sock); + sock=-1; + perror("connect"); + continue; + } + break; + } + freeaddrinfo(res); + if(sock==-1)return 1; + fprintf(stderr,"Connected.\n"); + + struct line_reader *liner=line_reader_init(STDIN_FILENO); + struct unzbuffer *z=unzbuffer_init(frame_data_sink,NULL); + + bool haderror=false; + while(true){ + fd_set inset; + FD_ZERO(&inset); + FD_SET(STDIN_FILENO,&inset); + FD_SET(sock,&inset); + int ret=select(sock+1,&inset,NULL,NULL,NULL); + if(ret<0){ + if(errno==EINTR)continue; + perror("select"); + break; + } + + if(FD_ISSET(STDIN_FILENO,&inset)){ + char buf[1024]; + i64 nr=read(STDIN_FILENO,buf,sizeof buf); + if(nr<0){ + if(errno==EINTR)continue; + perror("read"); + break; + } + if(nr==0)break; // EOF + line_reader_supply_data(liner,buf,nr); + char *line; + while((line=line_reader_get_line(liner,true))!=NULL){ + if(!writeall(sock,line,strlen(line))){ + fprintf(stderr,"Socket write error\n"); + goto cleanup; + } + // printf("Written <%s>\n",line); + free(line); + } + } + + if(FD_ISSET(sock,&inset)){ + u8 buf[1024]; + i64 nr=read(sock,buf,sizeof buf); + if(nr<0){ + if(errno==EINTR)continue; + perror("read"); + break; + } + if(nr==0){ + fprintf(stderr,"End-of-file on socket\n"); + break; + } + fprintf(stderr,"unzbuffer_write(nr=%" PRIi64 ")\n",nr); + if(!unzbuffer_write(z,buf,nr)){ + haderror=true; + break; + } + } + } + +cleanup: + line_reader_destroy(liner); + if(!unzbuffer_finish_destroy(z))return 1; + if(haderror)return 1; + return 0; +} -- cgit v1.2.3-54-g00ecf