#include #include #include #include #include #include #include #include #include #include "global.h" #include "line_reader.h" #include "unzbuffer.h" static bool writeall(int sock,const char *data,i64 len){ i64 cursor=0; while(cursorstamp.tv_sec),frame->numpids); } static i64 read_frame(const u8 *buffer,i64 len){ if(len<20)return 0; struct data_frame *frame=malloc(1,struct data_frame); frame->stamp.tv_sec=deserialise8(buffer); frame->stamp.tv_usec=deserialise4(buffer+8); i64 numpids=frame->numpids=deserialise8(buffer+12); i64 cursor=20; fprintf(stderr,"numpids=%" PRIi64 "\n",numpids); if(len-cursorprocs=calloc(numpids,struct frame_process); i64 pi; for(pi=0;pinumpids;pi++){ fprintf(stderr,"pi=%" PRIi64 "\n",pi); if(len-cursor<36){ fprintf(stderr,"insuff: pi\n"); goto insufficient_data; } frame->procs[pi].pid=deserialise4(buffer+cursor); cursor+=4; i64 namelen=frame->procs[pi].namelen=deserialise8(buffer+cursor); cursor+=8; fprintf(stderr," pid=%d namelen=%" PRIi64 "\n",frame->procs[pi].pid,namelen); if(len-cursorprocs[pi].name=malloc(namelen+1,char); memcpy(frame->procs[pi].name,buffer+cursor,namelen); cursor+=namelen; frame->procs[pi].usertime=deserialise8(buffer+cursor); cursor+=8; frame->procs[pi].systime=deserialise8(buffer+cursor); cursor+=8; frame->procs[pi].rss=deserialise8(buffer+cursor); cursor+=8; } process_frame(frame); for(i64 i=0;iprocs[i].name); free(frame->procs); free(frame); return cursor; insufficient_data: if(frame->procs){ for(i64 i=0;i<=pi;i++){ if(frame->procs[i].name)free(frame->procs[i].name); } free(frame->procs); } free(frame); return 0; } static void frame_data_sink(u8 *newdata,i64 newdatalen,void *payload){ (void)payload; static u8 *buffer=NULL; static i64 buffersize=0,bufferlen=0; fprintf(stderr,"frame_data_sink(len=%" PRIi64 ")\n",newdatalen); if(buffer==NULL){ buffersize=4096; buffer=malloc(buffersize,u8); } if(bufferlen+newdatalen>buffersize){ do buffersize*=2; while(bufferlen+newdatalen>buffersize); buffer=realloc(buffer,buffersize,u8); } memcpy(buffer+bufferlen,newdata,newdatalen); bufferlen+=newdatalen; i64 cursor=0; while(cursor [port=57575]\n",argv[0]); return 1; } struct addrinfo hints,*res; memset(&hints,0,sizeof(hints)); hints.ai_family=AF_UNSPEC; hints.ai_socktype=SOCK_STREAM; int ret=getaddrinfo(argv[1],argc==3?argv[2]:"57575",&hints,&res); if(ret!=0){ fprintf(stderr,"getaddrinfo: %s\n",gai_strerror(ret)); return 1; } int sock=-1; for(struct addrinfo *r=res;r!=NULL;r=r->ai_next){ char buf[256]; inet_ntop(r->ai_family,&((struct sockaddr_in*)r->ai_addr)->sin_addr,buf,sizeof buf); fprintf(stderr,"Trying: %s\n",buf); sock=socket(r->ai_family,r->ai_socktype,r->ai_protocol); if(sock<0){ sock=-1; perror("socket"); continue; } if(connect(sock,r->ai_addr,r->ai_addrlen)<0){ close(sock); sock=-1; perror("connect"); continue; } break; } freeaddrinfo(res); if(sock==-1)return 1; fprintf(stderr,"Connected.\n"); struct line_reader *liner=line_reader_init(STDIN_FILENO); struct unzbuffer *z=unzbuffer_init(frame_data_sink,NULL); bool haderror=false; while(true){ fd_set inset; FD_ZERO(&inset); FD_SET(STDIN_FILENO,&inset); FD_SET(sock,&inset); int ret=select(sock+1,&inset,NULL,NULL,NULL); if(ret<0){ if(errno==EINTR)continue; perror("select"); break; } if(FD_ISSET(STDIN_FILENO,&inset)){ char buf[1024]; i64 nr=read(STDIN_FILENO,buf,sizeof buf); if(nr<0){ if(errno==EINTR)continue; perror("read"); break; } if(nr==0)break; // EOF line_reader_supply_data(liner,buf,nr); char *line; while((line=line_reader_get_line(liner,true))!=NULL){ if(!writeall(sock,line,strlen(line))){ fprintf(stderr,"Socket write error\n"); goto cleanup; } // printf("Written <%s>\n",line); free(line); } } if(FD_ISSET(sock,&inset)){ u8 buf[1024]; i64 nr=read(sock,buf,sizeof buf); if(nr<0){ if(errno==EINTR)continue; perror("read"); break; } if(nr==0){ fprintf(stderr,"End-of-file on socket\n"); break; } fprintf(stderr,"unzbuffer_write(nr=%" PRIi64 ")\n",nr); if(!unzbuffer_write(z,buf,nr)){ haderror=true; break; } } } cleanup: line_reader_destroy(liner); if(!unzbuffer_finish_destroy(z))return 1; if(haderror)return 1; return 0; }