summaryrefslogtreecommitdiff
path: root/server/data_stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'server/data_stream.c')
-rw-r--r--server/data_stream.c156
1 files changed, 156 insertions, 0 deletions
diff --git a/server/data_stream.c b/server/data_stream.c
new file mode 100644
index 0000000..e86114e
--- /dev/null
+++ b/server/data_stream.c
@@ -0,0 +1,156 @@
+#include <stdio.h>
+#include <limits.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/time.h>
+#include <libproc.h>
+#include "data_stream.h"
+
+
+struct data_stream {
+ int fd;
+ i64 pidbufsize;
+ pid_t *pids;
+ i64 namebufsize;
+ u8 *namebuf;
+ i64 writebufsize;
+ u8 *writebuf;
+ i64 cursor;
+ struct zbuffer *z;
+};
+
+static bool callback_error=false;
+
+static void write_data_fd(u8 *data,i64 len,void *fdp){
+ int fd=*(int*)fdp;
+ i64 cursor=0;
+ while(cursor<len){
+ i64 nwr=write(fd,data+cursor,len-cursor);
+ if(nwr<0){
+ perror("write");
+ callback_error=true;
+ return;
+ }
+ cursor+=nwr;
+ }
+}
+
+static void bufappend(u8 **buf,i64 *cursor,i64 *cap,u8 *data,i64 datalen){
+ if(*cursor+datalen>=*cap){
+ if(*cap*2>LARGENUM)return;
+ *cap*=2;
+ *buf=realloc(*buf,*cap,u8);
+ }
+ memcpy(*buf+*cursor,data,datalen);
+ *cursor+=datalen;
+}
+
+static void serialise8(u8 *buf,i64 value){
+ for(i64 i=0;i<8;i++)buf[i]=(value>>(8*i))&0xff;
+}
+
+static void serialise4(u8 *buf,i32 value){
+ for(i64 i=0;i<4;i++)buf[i]=(value>>(8*i))&0xff;
+}
+
+struct data_stream* data_stream_init(int fd){
+ struct data_stream *s=malloc(1,struct data_stream);
+ s->fd=fd;
+ s->pidbufsize=1024;
+ s->pids=malloc(s->pidbufsize,pid_t);
+ s->namebufsize=64;
+ s->namebuf=malloc(s->namebufsize,u8);
+ s->writebufsize=4096;
+ s->writebuf=malloc(s->writebufsize,u8);
+ s->cursor=0;
+ s->z=zbuffer_init(write_data_fd,&s->fd);
+
+ callback_error=false;
+
+ return s;
+}
+
+int data_stream_finish_destroy(struct data_stream *s){
+ callback_error=false;
+ zbuffer_finish_destroy(s->z);
+ free(s->pids);
+ free(s->namebuf);
+ free(s->writebuf);
+ free(s);
+ if(callback_error)return -1;
+ return 0;
+}
+
+int data_stream_frame(struct data_stream *s){
+ callback_error=false;
+
+ u8 buf8[8];
+
+ struct timeval timeval;
+ if(gettimeofday(&timeval,NULL)<0){
+ perror("gettimeofday");
+ timeval.tv_sec=0;
+ timeval.tv_usec=0;
+ }
+ serialise8(buf8,timeval.tv_sec);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8);
+ serialise4(buf8,timeval.tv_usec);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,4);
+
+ i64 numpids=proc_listallpids(s->pids,s->pidbufsize*sizeof(pid_t));
+ while(s->pidbufsize<=LARGENUM&&numpids>=s->pidbufsize){
+ s->pidbufsize*=2;
+ s->pids=realloc(s->pids,s->pidbufsize,pid_t);
+ numpids=proc_listallpids(s->pids,s->pidbufsize*sizeof(pid_t));
+ }
+ serialise8(buf8,numpids);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8);
+
+ for(i64 index=0;index<numpids;index++) {
+ pid_t pid=s->pids[index];
+
+ struct proc_taskinfo info;
+ int result=proc_pidinfo(pid,PROC_PIDTASKINFO,0,&info,sizeof(info));
+ if(result!=sizeof info)continue;
+
+ serialise4(buf8,pid);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,4);
+
+ size_t namelen=proc_name(pid,s->namebuf,s->namebufsize);
+ while(s->namebufsize<=LARGENUM&&namelen==0){
+ s->namebufsize*=2;
+ s->namebuf=realloc(s->namebuf,s->namebufsize,u8);
+ namelen=proc_name(pid,s->namebuf,s->namebufsize);
+ }
+ if(namelen==0){
+ s->namebuf[0]='\0';
+ }
+
+ fprintf(stderr,"index=%" PRIi64 " pid=%d namelen=%zu\n",index,pid,namelen);
+ serialise8(buf8,namelen);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,s->namebuf,namelen);
+
+ serialise8(buf8,info.pti_total_user);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8);
+ serialise8(buf8,info.pti_total_system);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8);
+ serialise8(buf8,info.pti_resident_size);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8);
+
+ // fprintf(stderr,"PID %u (%s): %llu %llu %llu\n",pid,namebuf,info.pti_total_user,info.pti_total_system,info.pti_resident_size);
+ }
+
+ zbuffer_write(s->z,s->writebuf,s->cursor);
+ s->cursor=0;
+
+ if(callback_error)return -1;
+ return 0;
+}
+
+int data_stream_flush(struct data_stream *s){
+ callback_error=false;
+ zbuffer_flush(s->z);
+ if(callback_error)return -1;
+ return 0;
+}