From 34b0e1a8adf19f695a85280ebb69b062d71b48f7 Mon Sep 17 00:00:00 2001 From: tomsmeding Date: Wed, 11 Oct 2017 19:12:14 +0200 Subject: Second --- controller/main.c | 63 +++++++++++++++++++++++++++++++++++++++++--------- controller/unzbuffer.c | 2 +- server/data_stream.c | 18 ++++++++++++--- server/main.c | 9 ++++++++ server/zbuffer.c | 3 +++ 5 files changed, 80 insertions(+), 15 deletions(-) diff --git a/controller/main.c b/controller/main.c index 2e37e76..d289ce9 100644 --- a/controller/main.c +++ b/controller/main.c @@ -22,6 +22,8 @@ static bool writeall(int sock,const char *data,i64 len){ return true; } +FILE *logfile=NULL; + struct frame_process { pid_t pid; i64 namelen; @@ -48,7 +50,35 @@ static i32 deserialise4(const u8 *buf){ } static void process_frame(const struct data_frame *frame){ - fprintf(stderr,"%s: (%" PRIi64 ")\n",ctime(&frame->stamp.tv_sec),frame->numpids); + fprintf(logfile,"{\"stamp\":[%" PRIi64 ",%d],\"procs\":[", + (i64)frame->stamp.tv_sec,frame->stamp.tv_usec); + for(i64 i=0;inumpids;i++){ + const struct frame_process *p=&frame->procs[i]; + if(i!=0)fputc(',',logfile); + fprintf(logfile,"{\"pid\":%d,\"name\":\"",p->pid); + for(i64 j=0;jnamelen;j++){ + if(p->name[j]>=32&&p->name[j]<=126&&p->name[j]!='"'&&p->name[j]!='\\'){ + fputc(p->name[j],logfile); + } else { + fprintf(logfile,"\\x%c%c", + "0123456789abcdef"[(unsigned char)p->name[j]/16], + "0123456789abcdef"[(unsigned char)p->name[j]%16]); + } + } + fprintf(logfile,"\",\"usertime\":%" PRIi64 ",\"systime\":%" PRIi64 ",\"rss\":%" PRIi64 "}", + p->usertime,p->systime,p->rss); + } + fprintf(logfile,"]}\n"); + + /*char buf[26]; + memcpy(buf,ctime(&frame->stamp.tv_sec),26); + buf[24]='\0'; + fprintf(stderr,"%s: (%" PRIi64 ")\n",buf,frame->numpids); + for(i64 i=0;inumpids;i++){ + const struct frame_process *p=&frame->procs[i]; + fprintf(stderr," (%d) \"%s\" u=%.2f s=%.2f rss=%" PRIi64 "\n", + p->pid,p->name,p->usertime/1000000000.0,p->systime/1000000000.0,p->rss); + }*/ } static i64 read_frame(const u8 *buffer,i64 len){ @@ -58,7 +88,7 @@ static i64 read_frame(const u8 *buffer,i64 len){ frame->stamp.tv_usec=deserialise4(buffer+8); i64 numpids=frame->numpids=deserialise8(buffer+12); i64 cursor=20; - fprintf(stderr,"numpids=%" PRIi64 "\n",numpids); + // fprintf(stderr,"numpids=%" PRIi64 "\n",numpids); if(len-cursornumpids;pi++){ - fprintf(stderr,"pi=%" PRIi64 "\n",pi); + // fprintf(stderr,"pi=%" PRIi64 "\n",pi); if(len-cursor<36){ - fprintf(stderr,"insuff: pi\n"); + // fprintf(stderr,"insuff: pi\n"); goto insufficient_data; } frame->procs[pi].pid=deserialise4(buffer+cursor); cursor+=4; i64 namelen=frame->procs[pi].namelen=deserialise8(buffer+cursor); cursor+=8; - fprintf(stderr," pid=%d namelen=%" PRIi64 "\n",frame->procs[pi].pid,namelen); + // fprintf(stderr," pid=%d namelen=%" PRIi64 "\n",frame->procs[pi].pid,namelen); if(len-cursorprocs[pi].name=malloc(namelen+1,char); memcpy(frame->procs[pi].name,buffer+cursor,namelen); + frame->procs[pi].name[namelen]='\0'; cursor+=namelen; frame->procs[pi].usertime=deserialise8(buffer+cursor); cursor+=8; @@ -115,7 +146,7 @@ static void frame_data_sink(u8 *newdata,i64 newdatalen,void *payload){ static u8 *buffer=NULL; static i64 buffersize=0,bufferlen=0; - fprintf(stderr,"frame_data_sink(len=%" PRIi64 ")\n",newdatalen); + // fprintf(stderr,"frame_data_sink(len=%" PRIi64 ")\n",newdatalen); if(buffer==NULL){ buffersize=4096; @@ -142,8 +173,18 @@ static void frame_data_sink(u8 *newdata,i64 newdatalen,void *payload){ } int main(int argc,char **argv){ - if(argc!=2&&argc!=3){ - fprintf(stderr,"Usage: %s [port=57575]\n",argv[0]); + if(argc<3||argc>4){ + fprintf(stderr,"Usage: %s [port=57575]\n",argv[0]); + return 1; + } + + const char *logfname=argv[1]; + const char *ipaddr=argv[2]; + const char *port=argc==4?argv[4]:"57575"; + + logfile=fopen(logfname,"w"); + if(logfile==NULL){ + fprintf(stderr,"Cannot open log file '%s'\n",logfname); return 1; } @@ -151,7 +192,7 @@ int main(int argc,char **argv){ memset(&hints,0,sizeof(hints)); hints.ai_family=AF_UNSPEC; hints.ai_socktype=SOCK_STREAM; - int ret=getaddrinfo(argv[1],argc==3?argv[2]:"57575",&hints,&res); + int ret=getaddrinfo(ipaddr,port,&hints,&res); if(ret!=0){ fprintf(stderr,"getaddrinfo: %s\n",gai_strerror(ret)); return 1; @@ -229,7 +270,7 @@ int main(int argc,char **argv){ fprintf(stderr,"End-of-file on socket\n"); break; } - fprintf(stderr,"unzbuffer_write(nr=%" PRIi64 ")\n",nr); + // fprintf(stderr,"unzbuffer_write(nr=%" PRIi64 ")\n",nr); if(!unzbuffer_write(z,buf,nr)){ haderror=true; break; diff --git a/controller/unzbuffer.c b/controller/unzbuffer.c index ecd38a6..6578e61 100644 --- a/controller/unzbuffer.c +++ b/controller/unzbuffer.c @@ -56,7 +56,7 @@ bool unzbuffer_write(struct unzbuffer *z,u8 *data,i64 len){ return false; } if(ret!=Z_STREAM_END&&z->strm.avail_out>0)break; - z->wf(z->out,BUFSZ_ZOUT,z->payload); + z->wf(z->out,BUFSZ_ZOUT-z->strm.avail_out,z->payload); if(ret==Z_STREAM_END){ inflateReset(&z->strm); z->strm.avail_in=0; diff --git a/server/data_stream.c b/server/data_stream.c index e86114e..3d25104 100644 --- a/server/data_stream.c +++ b/server/data_stream.c @@ -33,6 +33,7 @@ static void write_data_fd(u8 *data,i64 len,void *fdp){ } cursor+=nwr; } + fprintf(stderr,"write_data_fd(len=%" PRIi64 ")\n",len); } static void bufappend(u8 **buf,i64 *cursor,i64 *cap,u8 *data,i64 datalen){ @@ -103,15 +104,21 @@ int data_stream_frame(struct data_stream *s){ s->pids=realloc(s->pids,s->pidbufsize,pid_t); numpids=proc_listallpids(s->pids,s->pidbufsize*sizeof(pid_t)); } - serialise8(buf8,numpids); + + // correct value will be written later + i64 numpids_buf_cursor=s->cursor; bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); + i64 nfailed=0; for(i64 index=0;indexpids[index]; struct proc_taskinfo info; int result=proc_pidinfo(pid,PROC_PIDTASKINFO,0,&info,sizeof(info)); - if(result!=sizeof info)continue; + if(result!=sizeof info){ + nfailed++; + continue; + } serialise4(buf8,pid); bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,4); @@ -126,7 +133,7 @@ int data_stream_frame(struct data_stream *s){ s->namebuf[0]='\0'; } - fprintf(stderr,"index=%" PRIi64 " pid=%d namelen=%zu\n",index,pid,namelen); + // fprintf(stderr,"pi=%" PRIi64 " pid=%d namelen=%zu\n",index-nfailed,pid,namelen); serialise8(buf8,namelen); bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); bufappend(&s->writebuf,&s->cursor,&s->writebufsize,s->namebuf,namelen); @@ -141,7 +148,12 @@ int data_stream_frame(struct data_stream *s){ // fprintf(stderr,"PID %u (%s): %llu %llu %llu\n",pid,namebuf,info.pti_total_user,info.pti_total_system,info.pti_resident_size); } + numpids-=nfailed; + serialise8(buf8,numpids); + memcpy(s->writebuf+numpids_buf_cursor,buf8,8); + zbuffer_write(s->z,s->writebuf,s->cursor); + // fprintf(stderr,"zbuffer_write(cursor=%" PRIi64 ")\n",s->cursor); s->cursor=0; if(callback_error)return -1; diff --git a/server/main.c b/server/main.c index 3cf1ffd..ffa16cd 100644 --- a/server/main.c +++ b/server/main.c @@ -69,6 +69,7 @@ static void connection_handler(int sock){ break; } if(ret==0||!FD_ISSET(sock,&inset)){ // timeout + before=make_timestamp(); if(do_send_frames){ if(data_stream_frame(stream)<0){ printf("Error sending data\n"); @@ -77,6 +78,14 @@ static void connection_handler(int sock){ printf("."); fflush(stdout); } timeleft=polldelay; + + i64 after=make_timestamp(); + if(after #include #include #include "zbuffer.h" @@ -48,6 +49,7 @@ void zbuffer_write(struct zbuffer *z,u8 *data,i64 len){ z->strm.avail_out=BUFSZ_ZOUT; z->strm.next_out=z->out; } + // fprintf(stderr,"zbuffer_write: avail_out = %d\n",z->strm.avail_out); assert(z->strm.avail_in==0); } @@ -74,6 +76,7 @@ void zbuffer_finish_destroy(struct zbuffer *z){ z->strm.avail_out=BUFSZ_ZOUT; z->strm.next_out=z->out; } + deflateEnd(&z->strm); free(z->out); free(z); } -- cgit v1.2.3