summaryrefslogtreecommitdiff
path: root/controller
diff options
context:
space:
mode:
authortomsmeding <tom.smeding@gmail.com>2017-10-11 08:32:09 +0200
committertomsmeding <tom.smeding@gmail.com>2017-10-11 08:32:09 +0200
commit7549d07933091417b225d094c1648e1382287f93 (patch)
treef533b99745beb808d1cbe226a38e0232076d7b53 /controller
Initial
Diffstat (limited to 'controller')
-rw-r--r--controller/.gitignore1
-rw-r--r--controller/Makefile28
-rw-r--r--controller/main.c245
-rw-r--r--controller/unzbuffer.c95
-rw-r--r--controller/unzbuffer.h14
5 files changed, 383 insertions, 0 deletions
diff --git a/controller/.gitignore b/controller/.gitignore
new file mode 100644
index 0000000..97f90d8
--- /dev/null
+++ b/controller/.gitignore
@@ -0,0 +1 @@
+controller
diff --git a/controller/Makefile b/controller/Makefile
new file mode 100644
index 0000000..7282260
--- /dev/null
+++ b/controller/Makefile
@@ -0,0 +1,28 @@
+CC = gcc
+CFLAGS = -Wall -Wextra -std=c11 -g -O2 -fwrapv -I..
+LDFLAGS = -lz
+TARGET = controller
+
+OBJDIR = obj
+
+.PHONY: all clean
+
+.SUFFIXES:
+
+all: $(TARGET)
+
+clean:
+ @echo "Cleaning"
+ @rm -f $(TARGET)
+ @rm -rf $(OBJDIR)
+
+
+$(OBJDIR)/%.o: %.c $(wildcard *.h)
+ @mkdir -p $(OBJDIR)
+ @echo "CC $<"
+ @$(CC) $(CFLAGS) -c -o $@ $<
+
+$(TARGET): $(patsubst %.c,$(OBJDIR)/%.o,$(wildcard *.c))
+ @make -q -C .. || make -C .. --no-print-directory
+ @echo "LD -o $@"
+ @$(CC) -o $@ $^ $(wildcard ../$(OBJDIR)/*.o) $(LDFLAGS)
diff --git a/controller/main.c b/controller/main.c
new file mode 100644
index 0000000..2e37e76
--- /dev/null
+++ b/controller/main.c
@@ -0,0 +1,245 @@
+#include <stdio.h>
+#include <string.h>
+#include <time.h>
+#include <unistd.h>
+#include <errno.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <sys/select.h>
+#include "global.h"
+#include "line_reader.h"
+#include "unzbuffer.h"
+
+
+static bool writeall(int sock,const char *data,i64 len){
+ i64 cursor=0;
+ while(cursor<len){
+ i64 nwr=write(sock,data+cursor,len-cursor);
+ if(nwr<=0)return false;
+ cursor+=nwr;
+ }
+ return true;
+}
+
+struct frame_process {
+ pid_t pid;
+ i64 namelen;
+ char *name;
+ i64 usertime,systime,rss;
+};
+
+struct data_frame {
+ struct timeval stamp;
+ i64 numpids;
+ struct frame_process *procs;
+};
+
+static i64 deserialise8(const u8 *buf){
+ u64 value=0;
+ for(i64 i=0;i<8;i++)value|=((u64)buf[i])<<(8*i);
+ return (i64)value;
+}
+
+static i32 deserialise4(const u8 *buf){
+ u32 value=0;
+ for(i64 i=0;i<4;i++)value|=((u32)buf[i])<<(8*i);
+ return (i32)value;
+}
+
+static void process_frame(const struct data_frame *frame){
+ fprintf(stderr,"%s: (%" PRIi64 ")\n",ctime(&frame->stamp.tv_sec),frame->numpids);
+}
+
+static i64 read_frame(const u8 *buffer,i64 len){
+ if(len<20)return 0;
+ struct data_frame *frame=malloc(1,struct data_frame);
+ frame->stamp.tv_sec=deserialise8(buffer);
+ frame->stamp.tv_usec=deserialise4(buffer+8);
+ i64 numpids=frame->numpids=deserialise8(buffer+12);
+ i64 cursor=20;
+ fprintf(stderr,"numpids=%" PRIi64 "\n",numpids);
+ if(len-cursor<numpids*36){
+ free(frame);
+ return 0;
+ }
+ // calloc to null unfilled pointers in procs
+ frame->procs=calloc(numpids,struct frame_process);
+
+ i64 pi;
+ for(pi=0;pi<frame->numpids;pi++){
+ fprintf(stderr,"pi=%" PRIi64 "\n",pi);
+ if(len-cursor<36){
+ fprintf(stderr,"insuff: pi\n");
+ goto insufficient_data;
+ }
+ frame->procs[pi].pid=deserialise4(buffer+cursor);
+ cursor+=4;
+ i64 namelen=frame->procs[pi].namelen=deserialise8(buffer+cursor);
+ cursor+=8;
+ fprintf(stderr," pid=%d namelen=%" PRIi64 "\n",frame->procs[pi].pid,namelen);
+ if(len-cursor<namelen+24){
+ fprintf(stderr,"insuff: namelen\n");
+ goto insufficient_data;
+ }
+ frame->procs[pi].name=malloc(namelen+1,char);
+ memcpy(frame->procs[pi].name,buffer+cursor,namelen);
+ cursor+=namelen;
+ frame->procs[pi].usertime=deserialise8(buffer+cursor);
+ cursor+=8;
+ frame->procs[pi].systime=deserialise8(buffer+cursor);
+ cursor+=8;
+ frame->procs[pi].rss=deserialise8(buffer+cursor);
+ cursor+=8;
+ }
+
+ process_frame(frame);
+ for(i64 i=0;i<numpids;i++)free(frame->procs[i].name);
+ free(frame->procs);
+ free(frame);
+ return cursor;
+
+insufficient_data:
+ if(frame->procs){
+ for(i64 i=0;i<=pi;i++){
+ if(frame->procs[i].name)free(frame->procs[i].name);
+ }
+ free(frame->procs);
+ }
+ free(frame);
+ return 0;
+}
+
+static void frame_data_sink(u8 *newdata,i64 newdatalen,void *payload){
+ (void)payload;
+ static u8 *buffer=NULL;
+ static i64 buffersize=0,bufferlen=0;
+
+ fprintf(stderr,"frame_data_sink(len=%" PRIi64 ")\n",newdatalen);
+
+ if(buffer==NULL){
+ buffersize=4096;
+ buffer=malloc(buffersize,u8);
+ }
+
+ if(bufferlen+newdatalen>buffersize){
+ do buffersize*=2; while(bufferlen+newdatalen>buffersize);
+ buffer=realloc(buffer,buffersize,u8);
+ }
+ memcpy(buffer+bufferlen,newdata,newdatalen);
+ bufferlen+=newdatalen;
+
+ i64 cursor=0;
+ while(cursor<bufferlen){
+ i64 nr=read_frame(buffer+cursor,bufferlen-cursor);
+ if(nr<=0)break;
+ cursor+=nr;
+ }
+ if(cursor<bufferlen){
+ memmove(buffer,buffer+cursor,bufferlen-cursor);
+ }
+ bufferlen-=cursor;
+}
+
+int main(int argc,char **argv){
+ if(argc!=2&&argc!=3){
+ fprintf(stderr,"Usage: %s <ip addr> [port=57575]\n",argv[0]);
+ return 1;
+ }
+
+ struct addrinfo hints,*res;
+ memset(&hints,0,sizeof(hints));
+ hints.ai_family=AF_UNSPEC;
+ hints.ai_socktype=SOCK_STREAM;
+ int ret=getaddrinfo(argv[1],argc==3?argv[2]:"57575",&hints,&res);
+ if(ret!=0){
+ fprintf(stderr,"getaddrinfo: %s\n",gai_strerror(ret));
+ return 1;
+ }
+
+ int sock=-1;
+ for(struct addrinfo *r=res;r!=NULL;r=r->ai_next){
+ char buf[256];
+ inet_ntop(r->ai_family,&((struct sockaddr_in*)r->ai_addr)->sin_addr,buf,sizeof buf);
+ fprintf(stderr,"Trying: %s\n",buf);
+ sock=socket(r->ai_family,r->ai_socktype,r->ai_protocol);
+ if(sock<0){
+ sock=-1;
+ perror("socket");
+ continue;
+ }
+ if(connect(sock,r->ai_addr,r->ai_addrlen)<0){
+ close(sock);
+ sock=-1;
+ perror("connect");
+ continue;
+ }
+ break;
+ }
+ freeaddrinfo(res);
+ if(sock==-1)return 1;
+ fprintf(stderr,"Connected.\n");
+
+ struct line_reader *liner=line_reader_init(STDIN_FILENO);
+ struct unzbuffer *z=unzbuffer_init(frame_data_sink,NULL);
+
+ bool haderror=false;
+ while(true){
+ fd_set inset;
+ FD_ZERO(&inset);
+ FD_SET(STDIN_FILENO,&inset);
+ FD_SET(sock,&inset);
+ int ret=select(sock+1,&inset,NULL,NULL,NULL);
+ if(ret<0){
+ if(errno==EINTR)continue;
+ perror("select");
+ break;
+ }
+
+ if(FD_ISSET(STDIN_FILENO,&inset)){
+ char buf[1024];
+ i64 nr=read(STDIN_FILENO,buf,sizeof buf);
+ if(nr<0){
+ if(errno==EINTR)continue;
+ perror("read");
+ break;
+ }
+ if(nr==0)break; // EOF
+ line_reader_supply_data(liner,buf,nr);
+ char *line;
+ while((line=line_reader_get_line(liner,true))!=NULL){
+ if(!writeall(sock,line,strlen(line))){
+ fprintf(stderr,"Socket write error\n");
+ goto cleanup;
+ }
+ // printf("Written <%s>\n",line);
+ free(line);
+ }
+ }
+
+ if(FD_ISSET(sock,&inset)){
+ u8 buf[1024];
+ i64 nr=read(sock,buf,sizeof buf);
+ if(nr<0){
+ if(errno==EINTR)continue;
+ perror("read");
+ break;
+ }
+ if(nr==0){
+ fprintf(stderr,"End-of-file on socket\n");
+ break;
+ }
+ fprintf(stderr,"unzbuffer_write(nr=%" PRIi64 ")\n",nr);
+ if(!unzbuffer_write(z,buf,nr)){
+ haderror=true;
+ break;
+ }
+ }
+ }
+
+cleanup:
+ line_reader_destroy(liner);
+ if(!unzbuffer_finish_destroy(z))return 1;
+ if(haderror)return 1;
+ return 0;
+}
diff --git a/controller/unzbuffer.c b/controller/unzbuffer.c
new file mode 100644
index 0000000..ecd38a6
--- /dev/null
+++ b/controller/unzbuffer.c
@@ -0,0 +1,95 @@
+#include <stdio.h>
+#include <assert.h>
+#include <zlib.h>
+#include "unzbuffer.h"
+
+#define BUFSZ_ZOUT (256*1024)
+
+
+struct unzbuffer {
+ u8 *out;
+ i64 outfill;
+ z_stream strm;
+
+ bool just_resetted;
+
+ unzbuffer_writefunc *wf;
+ void *payload;
+};
+
+struct unzbuffer* unzbuffer_init(unzbuffer_writefunc *wf,void *payload){
+ struct unzbuffer *z=malloc(1,struct unzbuffer);
+ z->out=malloc(BUFSZ_ZOUT,u8);
+ z->outfill=0;
+
+ z->strm.zalloc=Z_NULL;
+ z->strm.zfree=Z_NULL;
+ z->strm.opaque=Z_NULL;
+ z->strm.avail_in=0;
+ z->strm.next_in=Z_NULL;
+
+ // so unzbuffer_write can start writing immediately
+ z->strm.avail_out=BUFSZ_ZOUT;
+ z->strm.next_out=z->out;
+
+ if(inflateInit(&z->strm)!=Z_OK){
+ return NULL;
+ }
+
+ z->just_resetted=true;
+
+ z->wf=wf;
+ z->payload=payload;
+ return z;
+}
+
+bool unzbuffer_write(struct unzbuffer *z,u8 *data,i64 len){
+ z->just_resetted=false;
+ z->strm.avail_in=len;
+ z->strm.next_in=data;
+ assert(z->strm.avail_out>0);
+ while(true){
+ int ret=inflate(&z->strm,Z_NO_FLUSH);
+ if(ret!=Z_OK&&ret!=Z_STREAM_END){
+ inflateEnd(&z->strm);
+ fprintf(stderr,"unz error: %d\n",ret);
+ return false;
+ }
+ if(ret!=Z_STREAM_END&&z->strm.avail_out>0)break;
+ z->wf(z->out,BUFSZ_ZOUT,z->payload);
+ if(ret==Z_STREAM_END){
+ inflateReset(&z->strm);
+ z->strm.avail_in=0;
+ z->strm.next_in=Z_NULL;
+ z->strm.avail_out=BUFSZ_ZOUT;
+ z->strm.next_out=z->out;
+ z->just_resetted=true;
+ break;
+ }
+ z->strm.avail_out=BUFSZ_ZOUT;
+ z->strm.next_out=z->out;
+ }
+ assert(z->strm.avail_in==0);
+ return true;
+}
+
+bool unzbuffer_finish_destroy(struct unzbuffer *z){
+ if(!z->just_resetted){
+ assert(z->strm.avail_out>0);
+ while(true){
+ int ret=inflate(&z->strm,Z_FINISH);
+ if(ret!=Z_OK&&ret!=Z_STREAM_END){
+ inflateEnd(&z->strm);
+ fprintf(stderr,"unz error: %d\n",ret);
+ return false;
+ }
+ z->wf(z->out,BUFSZ_ZOUT-z->strm.avail_out,z->payload);
+ if(z->strm.avail_out>0)break;
+ z->strm.avail_out=BUFSZ_ZOUT;
+ z->strm.next_out=z->out;
+ }
+ }
+ free(z->out);
+ free(z);
+ return true;
+}
diff --git a/controller/unzbuffer.h b/controller/unzbuffer.h
new file mode 100644
index 0000000..547d2af
--- /dev/null
+++ b/controller/unzbuffer.h
@@ -0,0 +1,14 @@
+#pragma once
+
+#include "global.h"
+
+
+typedef void unzbuffer_writefunc(u8*,i64,void*);
+
+struct unzbuffer;
+
+struct unzbuffer* unzbuffer_init(unzbuffer_writefunc *wf,void *payload);
+
+// Return false on invalid stream
+bool unzbuffer_write(struct unzbuffer *z,u8 *data,i64 len);
+bool unzbuffer_finish_destroy(struct unzbuffer *z);