summaryrefslogtreecommitdiff
path: root/server
diff options
context:
space:
mode:
Diffstat (limited to 'server')
-rw-r--r--server/.gitignore1
-rw-r--r--server/Makefile28
-rw-r--r--server/data_stream.c156
-rw-r--r--server/data_stream.h13
-rw-r--r--server/main.c200
-rwxr-xr-xserver/pstat.py14
-rw-r--r--server/zbuffer.c79
-rw-r--r--server/zbuffer.h13
8 files changed, 504 insertions, 0 deletions
diff --git a/server/.gitignore b/server/.gitignore
new file mode 100644
index 0000000..3fbc411
--- /dev/null
+++ b/server/.gitignore
@@ -0,0 +1 @@
+pstat
diff --git a/server/Makefile b/server/Makefile
new file mode 100644
index 0000000..24cf554
--- /dev/null
+++ b/server/Makefile
@@ -0,0 +1,28 @@
+CC = gcc
+CFLAGS = -Wall -Wextra -std=c11 -g -O2 -fwrapv -I..
+LDFLAGS = -lz
+TARGET = pstat
+
+OBJDIR = obj
+
+.PHONY: all clean
+
+.SUFFIXES:
+
+all: $(TARGET)
+
+clean:
+ @echo "Cleaning"
+ @rm -f $(TARGET)
+ @rm -rf $(OBJDIR)
+
+
+$(OBJDIR)/%.o: %.c $(wildcard *.h)
+ @mkdir -p $(OBJDIR)
+ @echo "CC $<"
+ @$(CC) $(CFLAGS) -c -o $@ $<
+
+$(TARGET): $(patsubst %.c,$(OBJDIR)/%.o,$(wildcard *.c))
+ @make -q -C .. || make -C .. --no-print-directory
+ @echo "LD -o $@"
+ @$(CC) -o $@ $^ $(wildcard ../$(OBJDIR)/*.o) $(LDFLAGS)
diff --git a/server/data_stream.c b/server/data_stream.c
new file mode 100644
index 0000000..e86114e
--- /dev/null
+++ b/server/data_stream.c
@@ -0,0 +1,156 @@
+#include <stdio.h>
+#include <limits.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/time.h>
+#include <libproc.h>
+#include "data_stream.h"
+
+
+struct data_stream {
+ int fd;
+ i64 pidbufsize;
+ pid_t *pids;
+ i64 namebufsize;
+ u8 *namebuf;
+ i64 writebufsize;
+ u8 *writebuf;
+ i64 cursor;
+ struct zbuffer *z;
+};
+
+static bool callback_error=false;
+
+static void write_data_fd(u8 *data,i64 len,void *fdp){
+ int fd=*(int*)fdp;
+ i64 cursor=0;
+ while(cursor<len){
+ i64 nwr=write(fd,data+cursor,len-cursor);
+ if(nwr<0){
+ perror("write");
+ callback_error=true;
+ return;
+ }
+ cursor+=nwr;
+ }
+}
+
+static void bufappend(u8 **buf,i64 *cursor,i64 *cap,u8 *data,i64 datalen){
+ if(*cursor+datalen>=*cap){
+ if(*cap*2>LARGENUM)return;
+ *cap*=2;
+ *buf=realloc(*buf,*cap,u8);
+ }
+ memcpy(*buf+*cursor,data,datalen);
+ *cursor+=datalen;
+}
+
+static void serialise8(u8 *buf,i64 value){
+ for(i64 i=0;i<8;i++)buf[i]=(value>>(8*i))&0xff;
+}
+
+static void serialise4(u8 *buf,i32 value){
+ for(i64 i=0;i<4;i++)buf[i]=(value>>(8*i))&0xff;
+}
+
+struct data_stream* data_stream_init(int fd){
+ struct data_stream *s=malloc(1,struct data_stream);
+ s->fd=fd;
+ s->pidbufsize=1024;
+ s->pids=malloc(s->pidbufsize,pid_t);
+ s->namebufsize=64;
+ s->namebuf=malloc(s->namebufsize,u8);
+ s->writebufsize=4096;
+ s->writebuf=malloc(s->writebufsize,u8);
+ s->cursor=0;
+ s->z=zbuffer_init(write_data_fd,&s->fd);
+
+ callback_error=false;
+
+ return s;
+}
+
+int data_stream_finish_destroy(struct data_stream *s){
+ callback_error=false;
+ zbuffer_finish_destroy(s->z);
+ free(s->pids);
+ free(s->namebuf);
+ free(s->writebuf);
+ free(s);
+ if(callback_error)return -1;
+ return 0;
+}
+
+int data_stream_frame(struct data_stream *s){
+ callback_error=false;
+
+ u8 buf8[8];
+
+ struct timeval timeval;
+ if(gettimeofday(&timeval,NULL)<0){
+ perror("gettimeofday");
+ timeval.tv_sec=0;
+ timeval.tv_usec=0;
+ }
+ serialise8(buf8,timeval.tv_sec);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8);
+ serialise4(buf8,timeval.tv_usec);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,4);
+
+ i64 numpids=proc_listallpids(s->pids,s->pidbufsize*sizeof(pid_t));
+ while(s->pidbufsize<=LARGENUM&&numpids>=s->pidbufsize){
+ s->pidbufsize*=2;
+ s->pids=realloc(s->pids,s->pidbufsize,pid_t);
+ numpids=proc_listallpids(s->pids,s->pidbufsize*sizeof(pid_t));
+ }
+ serialise8(buf8,numpids);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8);
+
+ for(i64 index=0;index<numpids;index++) {
+ pid_t pid=s->pids[index];
+
+ struct proc_taskinfo info;
+ int result=proc_pidinfo(pid,PROC_PIDTASKINFO,0,&info,sizeof(info));
+ if(result!=sizeof info)continue;
+
+ serialise4(buf8,pid);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,4);
+
+ size_t namelen=proc_name(pid,s->namebuf,s->namebufsize);
+ while(s->namebufsize<=LARGENUM&&namelen==0){
+ s->namebufsize*=2;
+ s->namebuf=realloc(s->namebuf,s->namebufsize,u8);
+ namelen=proc_name(pid,s->namebuf,s->namebufsize);
+ }
+ if(namelen==0){
+ s->namebuf[0]='\0';
+ }
+
+ fprintf(stderr,"index=%" PRIi64 " pid=%d namelen=%zu\n",index,pid,namelen);
+ serialise8(buf8,namelen);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,s->namebuf,namelen);
+
+ serialise8(buf8,info.pti_total_user);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8);
+ serialise8(buf8,info.pti_total_system);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8);
+ serialise8(buf8,info.pti_resident_size);
+ bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8);
+
+ // fprintf(stderr,"PID %u (%s): %llu %llu %llu\n",pid,namebuf,info.pti_total_user,info.pti_total_system,info.pti_resident_size);
+ }
+
+ zbuffer_write(s->z,s->writebuf,s->cursor);
+ s->cursor=0;
+
+ if(callback_error)return -1;
+ return 0;
+}
+
+int data_stream_flush(struct data_stream *s){
+ callback_error=false;
+ zbuffer_flush(s->z);
+ if(callback_error)return -1;
+ return 0;
+}
diff --git a/server/data_stream.h b/server/data_stream.h
new file mode 100644
index 0000000..c8baf18
--- /dev/null
+++ b/server/data_stream.h
@@ -0,0 +1,13 @@
+#pragma once
+
+#include "global.h"
+#include "zbuffer.h"
+
+
+struct data_stream;
+
+// Return NULL or -1 on error
+struct data_stream* data_stream_init(int fd);
+int data_stream_finish_destroy(struct data_stream *s);
+int data_stream_frame(struct data_stream *s);
+int data_stream_flush(struct data_stream *s); // can degrade compression
diff --git a/server/main.c b/server/main.c
new file mode 100644
index 0000000..3cf1ffd
--- /dev/null
+++ b/server/main.c
@@ -0,0 +1,200 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+#include <sys/time.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <sys/select.h>
+#include "global.h"
+#include "data_stream.h"
+#include "line_reader.h"
+#include "zbuffer.h"
+
+#define PORT 57575
+
+#define POLLDELAY (10*1000000)
+
+/*struct proc_taskinfo {
+ uint64_t pti_virtual_size; virtual memory size (bytes)
+ uint64_t pti_resident_size; resident memory size (bytes)
+ uint64_t pti_total_user; total time
+ uint64_t pti_total_system;
+ uint64_t pti_threads_user; existing threads only
+ uint64_t pti_threads_system;
+ int32_t pti_policy; default policy for new threads
+ int32_t pti_faults; number of page faults
+ int32_t pti_pageins; number of actual pageins
+ int32_t pti_cow_faults; number of copy-on-write faults
+ int32_t pti_messages_sent; number of messages sent
+ int32_t pti_messages_received; number of messages received
+ int32_t pti_syscalls_mach; number of mach system calls
+ int32_t pti_syscalls_unix; number of unix system calls
+ int32_t pti_csw; number of context switches
+ int32_t pti_threadnum; number of threads in the task
+ int32_t pti_numrunning; number of running threads
+ int32_t pti_priority; task priority
+};*/
+
+static i64 make_timestamp(void){
+ struct timeval tv;
+ gettimeofday(&tv,NULL);
+ return tv.tv_sec*(i64)1000000+tv.tv_usec;
+}
+
+static void connection_handler(int sock){
+ struct data_stream *stream=data_stream_init(sock);
+ struct line_reader *reader=line_reader_init(sock);
+
+ i64 polldelay=POLLDELAY;
+ i64 timeleft=polldelay;
+
+ bool do_send_frames=false;
+
+ bool do_quit=false;
+
+ while(true){
+ struct timeval timeout={timeleft/1000000,timeleft%1000000};
+ i64 before=make_timestamp();
+
+ fd_set inset;
+ FD_ZERO(&inset);
+ FD_SET(sock,&inset);
+ int ret=select(sock+1,&inset,NULL,NULL,do_send_frames?&timeout:NULL);
+ if(ret<0){
+ if(errno==EINTR)continue;
+ perror("select");
+ break;
+ }
+ if(ret==0||!FD_ISSET(sock,&inset)){ // timeout
+ if(do_send_frames){
+ if(data_stream_frame(stream)<0){
+ printf("Error sending data\n");
+ goto cleanup;
+ }
+ printf("."); fflush(stdout);
+ }
+ timeleft=polldelay;
+ continue;
+ }
+
+ i64 after=make_timestamp();
+ if(after<before){
+ printf("Time ran backwards?\n");
+ after=before;
+ }
+ timeleft-=after-before;
+ if(timeleft<0)timeleft=0;
+
+ char buf[1024];
+ i64 nr=read(sock,buf,sizeof buf);
+ if(nr<0){
+ if(errno==EINTR)continue;
+ perror("read");
+ break;
+ }
+ if(nr==0)break; // EOF
+ line_reader_supply_data(reader,buf,nr);
+
+ char *line;
+ while((line=line_reader_get_line(reader,false))!=NULL){
+ char *word=line;
+
+ printf("Read <%s>\n",line);
+
+ char *next=strchr(word,' ');
+ if(next!=NULL){
+ *next='\0';
+ next++;
+ }
+ if(strcmp(word,"delay")==0){
+ char *endp;
+ i64 v=strtoll(next,&endp,10);
+ if(*next=='\0'||*endp!='\0'||v<=1000){
+ free(line);
+ printf("Invalid number in delay\n");
+ continue;
+ }
+ polldelay=v;
+ printf("polldelay -> %" PRIi64 "\n",polldelay);
+ if(timeleft>polldelay)timeleft=polldelay;
+ } else if(strcmp(word,"start")==0){
+ do_send_frames=true;
+ printf("Sending enabled\n");
+ } else if(strcmp(word,"stop")==0){
+ do_send_frames=false;
+ printf("Sending disabled\n");
+ } else if(strcmp(word,"flush")==0){
+ printf("Flushing\n");
+ if(data_stream_flush(stream)<0){
+ printf("Error flushing\n");
+ goto cleanup;
+ }
+ } else if(strcmp(word,"quit")==0){
+ printf("Quitting by request\n");
+ do_quit=true;
+ goto cleanup;
+ } else {
+ printf("Read unrecognised line <%s>!\n",line);
+ }
+
+ free(line);
+ }
+ }
+
+cleanup:
+ data_stream_finish_destroy(stream);
+ if(do_quit)exit(0);
+}
+
+int main(void){
+ int sock=socket(PF_INET,SOCK_STREAM,0);
+ if(sock<0){
+ perror("socket");
+ return 1;
+ }
+
+ int one=1;
+ if(setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&one,sizeof(int))<0){
+ perror("setsockopt");
+ return 1;
+ }
+
+ struct sockaddr_in sin;
+ sin.sin_family=AF_INET;
+ sin.sin_addr.s_addr=htonl(INADDR_ANY);
+ sin.sin_port=htons(PORT);
+
+ if(bind(sock,(struct sockaddr*)&sin,sizeof(sin))<0){
+ close(sock);
+ perror("bind");
+ return 1;
+ }
+
+ if(listen(sock,1)<0){
+ perror("listen");
+ return 1;
+ }
+
+ printf("Listening on port %d\n",PORT);
+
+ while(true){
+ struct sockaddr_storage client_addr;
+ socklen_t client_addr_sz=sizeof(client_addr);
+ int clientsock=accept(sock,(struct sockaddr*)&client_addr,&client_addr_sz);
+ if(clientsock<0){
+ perror("accept");
+ continue;
+ }
+
+ char str[INET_ADDRSTRLEN];
+ inet_ntop(client_addr.ss_family,&((struct sockaddr_in*)&client_addr)->sin_addr,
+ str,sizeof(str));
+
+ printf("Accept from %s\n",str);
+ connection_handler(clientsock);
+ close(clientsock);
+ }
+}
diff --git a/server/pstat.py b/server/pstat.py
new file mode 100755
index 0000000..eed6528
--- /dev/null
+++ b/server/pstat.py
@@ -0,0 +1,14 @@
+#!/usr/bin/env python3
+import psutil, os, sys, time
+
+t=time.clock_gettime(time.CLOCK_MONOTONIC)
+
+for p in psutil.process_iter():
+ try:
+ with p.oneshot():
+ print(p.pid,p.username(),p.name(),p.memory_info().rss,p.cpu_times())
+ except Exception as e:
+ pass
+
+t=time.clock_gettime(time.CLOCK_MONOTONIC)-t
+print(t,file=sys.stderr)
diff --git a/server/zbuffer.c b/server/zbuffer.c
new file mode 100644
index 0000000..af37f74
--- /dev/null
+++ b/server/zbuffer.c
@@ -0,0 +1,79 @@
+#include <assert.h>
+#include <zlib.h>
+#include "zbuffer.h"
+
+#define BUFSZ_ZOUT (256*1024)
+
+
+struct zbuffer {
+ u8 *out;
+ i64 outfill;
+ z_stream strm;
+
+ zbuffer_writefunc *wf;
+ void *payload;
+};
+
+struct zbuffer* zbuffer_init(zbuffer_writefunc *wf,void *payload){
+ struct zbuffer *z=malloc(1,struct zbuffer);
+ z->out=malloc(BUFSZ_ZOUT,u8);
+ z->outfill=0;
+
+ z->strm.zalloc=Z_NULL;
+ z->strm.zfree=Z_NULL;
+ z->strm.opaque=Z_NULL;
+
+ // so zbuffer_write can start writing immediately
+ z->strm.avail_out=BUFSZ_ZOUT;
+ z->strm.next_out=z->out;
+
+ if(deflateInit(&z->strm,Z_DEFAULT_COMPRESSION)!=Z_OK){
+ return NULL;
+ }
+
+ z->wf=wf;
+ z->payload=payload;
+ return z;
+}
+
+void zbuffer_write(struct zbuffer *z,u8 *data,i64 len){
+ z->strm.avail_in=len;
+ z->strm.next_in=data;
+ assert(z->strm.avail_out>0);
+ while(true){
+ int ret=deflate(&z->strm,Z_NO_FLUSH);
+ assert(ret==Z_OK);
+ if(z->strm.avail_out>0)break;
+ z->wf(z->out,BUFSZ_ZOUT,z->payload);
+ z->strm.avail_out=BUFSZ_ZOUT;
+ z->strm.next_out=z->out;
+ }
+ assert(z->strm.avail_in==0);
+}
+
+void zbuffer_flush(struct zbuffer *z){
+ assert(z->strm.avail_out>0);
+ while(true){
+ int ret=deflate(&z->strm,Z_SYNC_FLUSH);
+ assert(ret==Z_OK);
+ z->wf(z->out,BUFSZ_ZOUT-z->strm.avail_out,z->payload);
+ bool done=z->strm.avail_out>0;
+ z->strm.avail_out=BUFSZ_ZOUT;
+ z->strm.next_out=z->out;
+ if(done)break;
+ }
+}
+
+void zbuffer_finish_destroy(struct zbuffer *z){
+ assert(z->strm.avail_out>0);
+ while(true){
+ int ret=deflate(&z->strm,Z_FINISH);
+ assert(ret==Z_OK||ret==Z_STREAM_END);
+ z->wf(z->out,BUFSZ_ZOUT-z->strm.avail_out,z->payload);
+ if(z->strm.avail_out>0)break;
+ z->strm.avail_out=BUFSZ_ZOUT;
+ z->strm.next_out=z->out;
+ }
+ free(z->out);
+ free(z);
+}
diff --git a/server/zbuffer.h b/server/zbuffer.h
new file mode 100644
index 0000000..2e9f707
--- /dev/null
+++ b/server/zbuffer.h
@@ -0,0 +1,13 @@
+#pragma once
+
+#include "global.h"
+
+
+typedef void zbuffer_writefunc(u8*,i64,void*);
+
+struct zbuffer;
+
+struct zbuffer* zbuffer_init(zbuffer_writefunc *wf,void *payload);
+void zbuffer_write(struct zbuffer *z,u8 *data,i64 len);
+void zbuffer_flush(struct zbuffer *z); // can degrade compression
+void zbuffer_finish_destroy(struct zbuffer *z);