summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--controller/main.c63
-rw-r--r--controller/unzbuffer.c2
-rw-r--r--server/data_stream.c18
-rw-r--r--server/main.c9
-rw-r--r--server/zbuffer.c3
5 files changed, 80 insertions, 15 deletions
diff --git a/controller/main.c b/controller/main.c
index 2e37e76..d289ce9 100644
--- a/controller/main.c
+++ b/controller/main.c
@@ -22,6 +22,8 @@ static bool writeall(int sock,const char *data,i64 len){
return true;
}
+FILE *logfile=NULL;
+
struct frame_process {
pid_t pid;
i64 namelen;
@@ -48,7 +50,35 @@ static i32 deserialise4(const u8 *buf){
}
static void process_frame(const struct data_frame *frame){
- fprintf(stderr,"%s: (%" PRIi64 ")\n",ctime(&frame->stamp.tv_sec),frame->numpids);
+ fprintf(logfile,"{\"stamp\":[%" PRIi64 ",%d],\"procs\":[",
+ (i64)frame->stamp.tv_sec,frame->stamp.tv_usec);
+ for(i64 i=0;i<frame->numpids;i++){
+ const struct frame_process *p=&frame->procs[i];
+ if(i!=0)fputc(',',logfile);
+ fprintf(logfile,"{\"pid\":%d,\"name\":\"",p->pid);
+ for(i64 j=0;j<p->namelen;j++){
+ if(p->name[j]>=32&&p->name[j]<=126&&p->name[j]!='"'&&p->name[j]!='\\'){
+ fputc(p->name[j],logfile);
+ } else {
+ fprintf(logfile,"\\x%c%c",
+ "0123456789abcdef"[(unsigned char)p->name[j]/16],
+ "0123456789abcdef"[(unsigned char)p->name[j]%16]);
+ }
+ }
+ fprintf(logfile,"\",\"usertime\":%" PRIi64 ",\"systime\":%" PRIi64 ",\"rss\":%" PRIi64 "}",
+ p->usertime,p->systime,p->rss);
+ }
+ fprintf(logfile,"]}\n");
+
+ /*char buf[26];
+ memcpy(buf,ctime(&frame->stamp.tv_sec),26);
+ buf[24]='\0';
+ fprintf(stderr,"%s: (%" PRIi64 ")\n",buf,frame->numpids);
+ for(i64 i=0;i<frame->numpids;i++){
+ const struct frame_process *p=&frame->procs[i];
+ fprintf(stderr," (%d) \"%s\" u=%.2f s=%.2f rss=%" PRIi64 "\n",
+ p->pid,p->name,p->usertime/1000000000.0,p->systime/1000000000.0,p->rss);
+ }*/
}
static i64 read_frame(const u8 *buffer,i64 len){
@@ -58,7 +88,7 @@ static i64 read_frame(const u8 *buffer,i64 len){
frame->stamp.tv_usec=deserialise4(buffer+8);
i64 numpids=frame->numpids=deserialise8(buffer+12);
i64 cursor=20;
- fprintf(stderr,"numpids=%" PRIi64 "\n",numpids);
+ // fprintf(stderr,"numpids=%" PRIi64 "\n",numpids);
if(len-cursor<numpids*36){
free(frame);
return 0;
@@ -68,22 +98,23 @@ static i64 read_frame(const u8 *buffer,i64 len){
i64 pi;
for(pi=0;pi<frame->numpids;pi++){
- fprintf(stderr,"pi=%" PRIi64 "\n",pi);
+ // fprintf(stderr,"pi=%" PRIi64 "\n",pi);
if(len-cursor<36){
- fprintf(stderr,"insuff: pi\n");
+ // fprintf(stderr,"insuff: pi\n");
goto insufficient_data;
}
frame->procs[pi].pid=deserialise4(buffer+cursor);
cursor+=4;
i64 namelen=frame->procs[pi].namelen=deserialise8(buffer+cursor);
cursor+=8;
- fprintf(stderr," pid=%d namelen=%" PRIi64 "\n",frame->procs[pi].pid,namelen);
+ // fprintf(stderr," pid=%d namelen=%" PRIi64 "\n",frame->procs[pi].pid,namelen);
if(len-cursor<namelen+24){
- fprintf(stderr,"insuff: namelen\n");
+ // fprintf(stderr,"insuff: namelen\n");
goto insufficient_data;
}
frame->procs[pi].name=malloc(namelen+1,char);
memcpy(frame->procs[pi].name,buffer+cursor,namelen);
+ frame->procs[pi].name[namelen]='\0';
cursor+=namelen;
frame->procs[pi].usertime=deserialise8(buffer+cursor);
cursor+=8;
@@ -115,7 +146,7 @@ static void frame_data_sink(u8 *newdata,i64 newdatalen,void *payload){
static u8 *buffer=NULL;
static i64 buffersize=0,bufferlen=0;
- fprintf(stderr,"frame_data_sink(len=%" PRIi64 ")\n",newdatalen);
+ // fprintf(stderr,"frame_data_sink(len=%" PRIi64 ")\n",newdatalen);
if(buffer==NULL){
buffersize=4096;
@@ -142,8 +173,18 @@ static void frame_data_sink(u8 *newdata,i64 newdatalen,void *payload){
}
int main(int argc,char **argv){
- if(argc!=2&&argc!=3){
- fprintf(stderr,"Usage: %s <ip addr> [port=57575]\n",argv[0]);
+ if(argc<3||argc>4){
+ fprintf(stderr,"Usage: %s <logfile> <ip addr> [port=57575]\n",argv[0]);
+ return 1;
+ }
+
+ const char *logfname=argv[1];
+ const char *ipaddr=argv[2];
+ const char *port=argc==4?argv[4]:"57575";
+
+ logfile=fopen(logfname,"w");
+ if(logfile==NULL){
+ fprintf(stderr,"Cannot open log file '%s'\n",logfname);
return 1;
}
@@ -151,7 +192,7 @@ int main(int argc,char **argv){
memset(&hints,0,sizeof(hints));
hints.ai_family=AF_UNSPEC;
hints.ai_socktype=SOCK_STREAM;
- int ret=getaddrinfo(argv[1],argc==3?argv[2]:"57575",&hints,&res);
+ int ret=getaddrinfo(ipaddr,port,&hints,&res);
if(ret!=0){
fprintf(stderr,"getaddrinfo: %s\n",gai_strerror(ret));
return 1;
@@ -229,7 +270,7 @@ int main(int argc,char **argv){
fprintf(stderr,"End-of-file on socket\n");
break;
}
- fprintf(stderr,"unzbuffer_write(nr=%" PRIi64 ")\n",nr);
+ // fprintf(stderr,"unzbuffer_write(nr=%" PRIi64 ")\n",nr);
if(!unzbuffer_write(z,buf,nr)){
haderror=true;
break;
diff --git a/controller/unzbuffer.c b/controller/unzbuffer.c
index ecd38a6..6578e61 100644
--- a/controller/unzbuffer.c
+++ b/controller/unzbuffer.c
@@ -56,7 +56,7 @@ bool unzbuffer_write(struct unzbuffer *z,u8 *data,i64 len){
return false;
}
if(ret!=Z_STREAM_END&&z->strm.avail_out>0)break;
- z->wf(z->out,BUFSZ_ZOUT,z->payload);
+ z->wf(z->out,BUFSZ_ZOUT-z->strm.avail_out,z->payload);
if(ret==Z_STREAM_END){
inflateReset(&z->strm);
z->strm.avail_in=0;
diff --git a/server/data_stream.c b/server/data_stream.c
index e86114e..3d25104 100644
--- a/server/data_stream.c
+++ b/server/data_stream.c
@@ -33,6 +33,7 @@ static void write_data_fd(u8 *data,i64 len,void *fdp){
}
cursor+=nwr;
}
+ fprintf(stderr,"write_data_fd(len=%" PRIi64 ")\n",len);
}
static void bufappend(u8 **buf,i64 *cursor,i64 *cap,u8 *data,i64 datalen){
@@ -103,15 +104,21 @@ int data_stream_frame(struct data_stream *s){
s->pids=realloc(s->pids,s->pidbufsize,pid_t);
numpids=proc_listallpids(s->pids,s->pidbufsize*sizeof(pid_t));
}
- serialise8(buf8,numpids);
+
+ // correct value will be written later
+ i64 numpids_buf_cursor=s->cursor;
bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8);
+ i64 nfailed=0;
for(i64 index=0;index<numpids;index++) {
pid_t pid=s->pids[index];
struct proc_taskinfo info;
int result=proc_pidinfo(pid,PROC_PIDTASKINFO,0,&info,sizeof(info));
- if(result!=sizeof info)continue;
+ if(result!=sizeof info){
+ nfailed++;
+ continue;
+ }
serialise4(buf8,pid);
bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,4);
@@ -126,7 +133,7 @@ int data_stream_frame(struct data_stream *s){
s->namebuf[0]='\0';
}
- fprintf(stderr,"index=%" PRIi64 " pid=%d namelen=%zu\n",index,pid,namelen);
+ // fprintf(stderr,"pi=%" PRIi64 " pid=%d namelen=%zu\n",index-nfailed,pid,namelen);
serialise8(buf8,namelen);
bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8);
bufappend(&s->writebuf,&s->cursor,&s->writebufsize,s->namebuf,namelen);
@@ -141,7 +148,12 @@ int data_stream_frame(struct data_stream *s){
// fprintf(stderr,"PID %u (%s): %llu %llu %llu\n",pid,namebuf,info.pti_total_user,info.pti_total_system,info.pti_resident_size);
}
+ numpids-=nfailed;
+ serialise8(buf8,numpids);
+ memcpy(s->writebuf+numpids_buf_cursor,buf8,8);
+
zbuffer_write(s->z,s->writebuf,s->cursor);
+ // fprintf(stderr,"zbuffer_write(cursor=%" PRIi64 ")\n",s->cursor);
s->cursor=0;
if(callback_error)return -1;
diff --git a/server/main.c b/server/main.c
index 3cf1ffd..ffa16cd 100644
--- a/server/main.c
+++ b/server/main.c
@@ -69,6 +69,7 @@ static void connection_handler(int sock){
break;
}
if(ret==0||!FD_ISSET(sock,&inset)){ // timeout
+ before=make_timestamp();
if(do_send_frames){
if(data_stream_frame(stream)<0){
printf("Error sending data\n");
@@ -77,6 +78,14 @@ static void connection_handler(int sock){
printf("."); fflush(stdout);
}
timeleft=polldelay;
+
+ i64 after=make_timestamp();
+ if(after<before){
+ printf("Time ran backwards?\n");
+ after=before;
+ }
+ timeleft-=after-before;
+ if(timeleft<0)timeleft=0;
continue;
}
diff --git a/server/zbuffer.c b/server/zbuffer.c
index af37f74..d65c7cc 100644
--- a/server/zbuffer.c
+++ b/server/zbuffer.c
@@ -1,3 +1,4 @@
+#include <stdio.h>
#include <assert.h>
#include <zlib.h>
#include "zbuffer.h"
@@ -48,6 +49,7 @@ void zbuffer_write(struct zbuffer *z,u8 *data,i64 len){
z->strm.avail_out=BUFSZ_ZOUT;
z->strm.next_out=z->out;
}
+ // fprintf(stderr,"zbuffer_write: avail_out = %d\n",z->strm.avail_out);
assert(z->strm.avail_in==0);
}
@@ -74,6 +76,7 @@ void zbuffer_finish_destroy(struct zbuffer *z){
z->strm.avail_out=BUFSZ_ZOUT;
z->strm.next_out=z->out;
}
+ deflateEnd(&z->strm);
free(z->out);
free(z);
}