#include #include #include #include #include #include #include "data_stream.h" struct data_stream { int fd; i64 pidbufsize; pid_t *pids; i64 namebufsize; u8 *namebuf; i64 writebufsize; u8 *writebuf; i64 cursor; struct zbuffer *z; }; static bool callback_error=false; static void write_data_fd(u8 *data,i64 len,void *fdp){ int fd=*(int*)fdp; i64 cursor=0; while(cursor=*cap){ if(*cap*2>LARGENUM)return; *cap*=2; *buf=realloc(*buf,*cap,u8); } memcpy(*buf+*cursor,data,datalen); *cursor+=datalen; } static void serialise8(u8 *buf,i64 value){ for(i64 i=0;i<8;i++)buf[i]=(value>>(8*i))&0xff; } static void serialise4(u8 *buf,i32 value){ for(i64 i=0;i<4;i++)buf[i]=(value>>(8*i))&0xff; } struct data_stream* data_stream_init(int fd){ struct data_stream *s=malloc(1,struct data_stream); s->fd=fd; s->pidbufsize=1024; s->pids=malloc(s->pidbufsize,pid_t); s->namebufsize=64; s->namebuf=malloc(s->namebufsize,u8); s->writebufsize=4096; s->writebuf=malloc(s->writebufsize,u8); s->cursor=0; s->z=zbuffer_init(write_data_fd,&s->fd); callback_error=false; return s; } int data_stream_finish_destroy(struct data_stream *s){ callback_error=false; zbuffer_finish_destroy(s->z); free(s->pids); free(s->namebuf); free(s->writebuf); free(s); if(callback_error)return -1; return 0; } int data_stream_frame(struct data_stream *s){ callback_error=false; u8 buf8[8]; struct timeval timeval; if(gettimeofday(&timeval,NULL)<0){ perror("gettimeofday"); timeval.tv_sec=0; timeval.tv_usec=0; } serialise8(buf8,timeval.tv_sec); bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); serialise4(buf8,timeval.tv_usec); bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,4); i64 numpids=proc_listallpids(s->pids,s->pidbufsize*sizeof(pid_t)); while(s->pidbufsize<=LARGENUM&&numpids>=s->pidbufsize){ s->pidbufsize*=2; s->pids=realloc(s->pids,s->pidbufsize,pid_t); numpids=proc_listallpids(s->pids,s->pidbufsize*sizeof(pid_t)); } // correct value will be written later i64 numpids_buf_cursor=s->cursor; bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); i64 nfailed=0; for(i64 index=0;indexpids[index]; struct proc_taskinfo info; int result=proc_pidinfo(pid,PROC_PIDTASKINFO,0,&info,sizeof(info)); if(result!=sizeof info){ nfailed++; continue; } serialise4(buf8,pid); bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,4); size_t namelen=proc_name(pid,s->namebuf,s->namebufsize); while(s->namebufsize<=LARGENUM&&namelen==0){ s->namebufsize*=2; s->namebuf=realloc(s->namebuf,s->namebufsize,u8); namelen=proc_name(pid,s->namebuf,s->namebufsize); } if(namelen==0){ s->namebuf[0]='\0'; } // fprintf(stderr,"pi=%" PRIi64 " pid=%d namelen=%zu\n",index-nfailed,pid,namelen); serialise8(buf8,namelen); bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); bufappend(&s->writebuf,&s->cursor,&s->writebufsize,s->namebuf,namelen); serialise8(buf8,info.pti_total_user); bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); serialise8(buf8,info.pti_total_system); bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); serialise8(buf8,info.pti_resident_size); bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); // fprintf(stderr,"PID %u (%s): %llu %llu %llu\n",pid,namebuf,info.pti_total_user,info.pti_total_system,info.pti_resident_size); } numpids-=nfailed; serialise8(buf8,numpids); memcpy(s->writebuf+numpids_buf_cursor,buf8,8); zbuffer_write(s->z,s->writebuf,s->cursor); // fprintf(stderr,"zbuffer_write(cursor=%" PRIi64 ")\n",s->cursor); s->cursor=0; if(callback_error)return -1; return 0; } int data_stream_flush(struct data_stream *s){ callback_error=false; zbuffer_flush(s->z); if(callback_error)return -1; return 0; }