summaryrefslogtreecommitdiff
path: root/controller/main.c
diff options
context:
space:
mode:
Diffstat (limited to 'controller/main.c')
-rw-r--r--controller/main.c245
1 files changed, 245 insertions, 0 deletions
diff --git a/controller/main.c b/controller/main.c
new file mode 100644
index 0000000..2e37e76
--- /dev/null
+++ b/controller/main.c
@@ -0,0 +1,245 @@
+#include <stdio.h>
+#include <string.h>
+#include <time.h>
+#include <unistd.h>
+#include <errno.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <sys/select.h>
+#include "global.h"
+#include "line_reader.h"
+#include "unzbuffer.h"
+
+
+static bool writeall(int sock,const char *data,i64 len){
+ i64 cursor=0;
+ while(cursor<len){
+ i64 nwr=write(sock,data+cursor,len-cursor);
+ if(nwr<=0)return false;
+ cursor+=nwr;
+ }
+ return true;
+}
+
+struct frame_process {
+ pid_t pid;
+ i64 namelen;
+ char *name;
+ i64 usertime,systime,rss;
+};
+
+struct data_frame {
+ struct timeval stamp;
+ i64 numpids;
+ struct frame_process *procs;
+};
+
+static i64 deserialise8(const u8 *buf){
+ u64 value=0;
+ for(i64 i=0;i<8;i++)value|=((u64)buf[i])<<(8*i);
+ return (i64)value;
+}
+
+static i32 deserialise4(const u8 *buf){
+ u32 value=0;
+ for(i64 i=0;i<4;i++)value|=((u32)buf[i])<<(8*i);
+ return (i32)value;
+}
+
+static void process_frame(const struct data_frame *frame){
+ fprintf(stderr,"%s: (%" PRIi64 ")\n",ctime(&frame->stamp.tv_sec),frame->numpids);
+}
+
+static i64 read_frame(const u8 *buffer,i64 len){
+ if(len<20)return 0;
+ struct data_frame *frame=malloc(1,struct data_frame);
+ frame->stamp.tv_sec=deserialise8(buffer);
+ frame->stamp.tv_usec=deserialise4(buffer+8);
+ i64 numpids=frame->numpids=deserialise8(buffer+12);
+ i64 cursor=20;
+ fprintf(stderr,"numpids=%" PRIi64 "\n",numpids);
+ if(len-cursor<numpids*36){
+ free(frame);
+ return 0;
+ }
+ // calloc to null unfilled pointers in procs
+ frame->procs=calloc(numpids,struct frame_process);
+
+ i64 pi;
+ for(pi=0;pi<frame->numpids;pi++){
+ fprintf(stderr,"pi=%" PRIi64 "\n",pi);
+ if(len-cursor<36){
+ fprintf(stderr,"insuff: pi\n");
+ goto insufficient_data;
+ }
+ frame->procs[pi].pid=deserialise4(buffer+cursor);
+ cursor+=4;
+ i64 namelen=frame->procs[pi].namelen=deserialise8(buffer+cursor);
+ cursor+=8;
+ fprintf(stderr," pid=%d namelen=%" PRIi64 "\n",frame->procs[pi].pid,namelen);
+ if(len-cursor<namelen+24){
+ fprintf(stderr,"insuff: namelen\n");
+ goto insufficient_data;
+ }
+ frame->procs[pi].name=malloc(namelen+1,char);
+ memcpy(frame->procs[pi].name,buffer+cursor,namelen);
+ cursor+=namelen;
+ frame->procs[pi].usertime=deserialise8(buffer+cursor);
+ cursor+=8;
+ frame->procs[pi].systime=deserialise8(buffer+cursor);
+ cursor+=8;
+ frame->procs[pi].rss=deserialise8(buffer+cursor);
+ cursor+=8;
+ }
+
+ process_frame(frame);
+ for(i64 i=0;i<numpids;i++)free(frame->procs[i].name);
+ free(frame->procs);
+ free(frame);
+ return cursor;
+
+insufficient_data:
+ if(frame->procs){
+ for(i64 i=0;i<=pi;i++){
+ if(frame->procs[i].name)free(frame->procs[i].name);
+ }
+ free(frame->procs);
+ }
+ free(frame);
+ return 0;
+}
+
+static void frame_data_sink(u8 *newdata,i64 newdatalen,void *payload){
+ (void)payload;
+ static u8 *buffer=NULL;
+ static i64 buffersize=0,bufferlen=0;
+
+ fprintf(stderr,"frame_data_sink(len=%" PRIi64 ")\n",newdatalen);
+
+ if(buffer==NULL){
+ buffersize=4096;
+ buffer=malloc(buffersize,u8);
+ }
+
+ if(bufferlen+newdatalen>buffersize){
+ do buffersize*=2; while(bufferlen+newdatalen>buffersize);
+ buffer=realloc(buffer,buffersize,u8);
+ }
+ memcpy(buffer+bufferlen,newdata,newdatalen);
+ bufferlen+=newdatalen;
+
+ i64 cursor=0;
+ while(cursor<bufferlen){
+ i64 nr=read_frame(buffer+cursor,bufferlen-cursor);
+ if(nr<=0)break;
+ cursor+=nr;
+ }
+ if(cursor<bufferlen){
+ memmove(buffer,buffer+cursor,bufferlen-cursor);
+ }
+ bufferlen-=cursor;
+}
+
+int main(int argc,char **argv){
+ if(argc!=2&&argc!=3){
+ fprintf(stderr,"Usage: %s <ip addr> [port=57575]\n",argv[0]);
+ return 1;
+ }
+
+ struct addrinfo hints,*res;
+ memset(&hints,0,sizeof(hints));
+ hints.ai_family=AF_UNSPEC;
+ hints.ai_socktype=SOCK_STREAM;
+ int ret=getaddrinfo(argv[1],argc==3?argv[2]:"57575",&hints,&res);
+ if(ret!=0){
+ fprintf(stderr,"getaddrinfo: %s\n",gai_strerror(ret));
+ return 1;
+ }
+
+ int sock=-1;
+ for(struct addrinfo *r=res;r!=NULL;r=r->ai_next){
+ char buf[256];
+ inet_ntop(r->ai_family,&((struct sockaddr_in*)r->ai_addr)->sin_addr,buf,sizeof buf);
+ fprintf(stderr,"Trying: %s\n",buf);
+ sock=socket(r->ai_family,r->ai_socktype,r->ai_protocol);
+ if(sock<0){
+ sock=-1;
+ perror("socket");
+ continue;
+ }
+ if(connect(sock,r->ai_addr,r->ai_addrlen)<0){
+ close(sock);
+ sock=-1;
+ perror("connect");
+ continue;
+ }
+ break;
+ }
+ freeaddrinfo(res);
+ if(sock==-1)return 1;
+ fprintf(stderr,"Connected.\n");
+
+ struct line_reader *liner=line_reader_init(STDIN_FILENO);
+ struct unzbuffer *z=unzbuffer_init(frame_data_sink,NULL);
+
+ bool haderror=false;
+ while(true){
+ fd_set inset;
+ FD_ZERO(&inset);
+ FD_SET(STDIN_FILENO,&inset);
+ FD_SET(sock,&inset);
+ int ret=select(sock+1,&inset,NULL,NULL,NULL);
+ if(ret<0){
+ if(errno==EINTR)continue;
+ perror("select");
+ break;
+ }
+
+ if(FD_ISSET(STDIN_FILENO,&inset)){
+ char buf[1024];
+ i64 nr=read(STDIN_FILENO,buf,sizeof buf);
+ if(nr<0){
+ if(errno==EINTR)continue;
+ perror("read");
+ break;
+ }
+ if(nr==0)break; // EOF
+ line_reader_supply_data(liner,buf,nr);
+ char *line;
+ while((line=line_reader_get_line(liner,true))!=NULL){
+ if(!writeall(sock,line,strlen(line))){
+ fprintf(stderr,"Socket write error\n");
+ goto cleanup;
+ }
+ // printf("Written <%s>\n",line);
+ free(line);
+ }
+ }
+
+ if(FD_ISSET(sock,&inset)){
+ u8 buf[1024];
+ i64 nr=read(sock,buf,sizeof buf);
+ if(nr<0){
+ if(errno==EINTR)continue;
+ perror("read");
+ break;
+ }
+ if(nr==0){
+ fprintf(stderr,"End-of-file on socket\n");
+ break;
+ }
+ fprintf(stderr,"unzbuffer_write(nr=%" PRIi64 ")\n",nr);
+ if(!unzbuffer_write(z,buf,nr)){
+ haderror=true;
+ break;
+ }
+ }
+ }
+
+cleanup:
+ line_reader_destroy(liner);
+ if(!unzbuffer_finish_destroy(z))return 1;
+ if(haderror)return 1;
+ return 0;
+}