From 7549d07933091417b225d094c1648e1382287f93 Mon Sep 17 00:00:00 2001 From: tomsmeding Date: Wed, 11 Oct 2017 08:32:09 +0200 Subject: Initial --- server/data_stream.c | 156 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 156 insertions(+) create mode 100644 server/data_stream.c (limited to 'server/data_stream.c') diff --git a/server/data_stream.c b/server/data_stream.c new file mode 100644 index 0000000..e86114e --- /dev/null +++ b/server/data_stream.c @@ -0,0 +1,156 @@ +#include +#include +#include +#include +#include +#include +#include "data_stream.h" + + +struct data_stream { + int fd; + i64 pidbufsize; + pid_t *pids; + i64 namebufsize; + u8 *namebuf; + i64 writebufsize; + u8 *writebuf; + i64 cursor; + struct zbuffer *z; +}; + +static bool callback_error=false; + +static void write_data_fd(u8 *data,i64 len,void *fdp){ + int fd=*(int*)fdp; + i64 cursor=0; + while(cursor=*cap){ + if(*cap*2>LARGENUM)return; + *cap*=2; + *buf=realloc(*buf,*cap,u8); + } + memcpy(*buf+*cursor,data,datalen); + *cursor+=datalen; +} + +static void serialise8(u8 *buf,i64 value){ + for(i64 i=0;i<8;i++)buf[i]=(value>>(8*i))&0xff; +} + +static void serialise4(u8 *buf,i32 value){ + for(i64 i=0;i<4;i++)buf[i]=(value>>(8*i))&0xff; +} + +struct data_stream* data_stream_init(int fd){ + struct data_stream *s=malloc(1,struct data_stream); + s->fd=fd; + s->pidbufsize=1024; + s->pids=malloc(s->pidbufsize,pid_t); + s->namebufsize=64; + s->namebuf=malloc(s->namebufsize,u8); + s->writebufsize=4096; + s->writebuf=malloc(s->writebufsize,u8); + s->cursor=0; + s->z=zbuffer_init(write_data_fd,&s->fd); + + callback_error=false; + + return s; +} + +int data_stream_finish_destroy(struct data_stream *s){ + callback_error=false; + zbuffer_finish_destroy(s->z); + free(s->pids); + free(s->namebuf); + free(s->writebuf); + free(s); + if(callback_error)return -1; + return 0; +} + +int data_stream_frame(struct data_stream *s){ + callback_error=false; + + u8 buf8[8]; + + struct timeval timeval; + if(gettimeofday(&timeval,NULL)<0){ + perror("gettimeofday"); + timeval.tv_sec=0; + timeval.tv_usec=0; + } + serialise8(buf8,timeval.tv_sec); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); + serialise4(buf8,timeval.tv_usec); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,4); + + i64 numpids=proc_listallpids(s->pids,s->pidbufsize*sizeof(pid_t)); + while(s->pidbufsize<=LARGENUM&&numpids>=s->pidbufsize){ + s->pidbufsize*=2; + s->pids=realloc(s->pids,s->pidbufsize,pid_t); + numpids=proc_listallpids(s->pids,s->pidbufsize*sizeof(pid_t)); + } + serialise8(buf8,numpids); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); + + for(i64 index=0;indexpids[index]; + + struct proc_taskinfo info; + int result=proc_pidinfo(pid,PROC_PIDTASKINFO,0,&info,sizeof(info)); + if(result!=sizeof info)continue; + + serialise4(buf8,pid); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,4); + + size_t namelen=proc_name(pid,s->namebuf,s->namebufsize); + while(s->namebufsize<=LARGENUM&&namelen==0){ + s->namebufsize*=2; + s->namebuf=realloc(s->namebuf,s->namebufsize,u8); + namelen=proc_name(pid,s->namebuf,s->namebufsize); + } + if(namelen==0){ + s->namebuf[0]='\0'; + } + + fprintf(stderr,"index=%" PRIi64 " pid=%d namelen=%zu\n",index,pid,namelen); + serialise8(buf8,namelen); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,s->namebuf,namelen); + + serialise8(buf8,info.pti_total_user); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); + serialise8(buf8,info.pti_total_system); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); + serialise8(buf8,info.pti_resident_size); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); + + // fprintf(stderr,"PID %u (%s): %llu %llu %llu\n",pid,namebuf,info.pti_total_user,info.pti_total_system,info.pti_resident_size); + } + + zbuffer_write(s->z,s->writebuf,s->cursor); + s->cursor=0; + + if(callback_error)return -1; + return 0; +} + +int data_stream_flush(struct data_stream *s){ + callback_error=false; + zbuffer_flush(s->z); + if(callback_error)return -1; + return 0; +} -- cgit v1.2.3-54-g00ecf