diff options
Diffstat (limited to 'server')
-rw-r--r-- | server/.gitignore | 1 | ||||
-rw-r--r-- | server/Makefile | 28 | ||||
-rw-r--r-- | server/data_stream.c | 156 | ||||
-rw-r--r-- | server/data_stream.h | 13 | ||||
-rw-r--r-- | server/main.c | 200 | ||||
-rwxr-xr-x | server/pstat.py | 14 | ||||
-rw-r--r-- | server/zbuffer.c | 79 | ||||
-rw-r--r-- | server/zbuffer.h | 13 |
8 files changed, 504 insertions, 0 deletions
diff --git a/server/.gitignore b/server/.gitignore new file mode 100644 index 0000000..3fbc411 --- /dev/null +++ b/server/.gitignore @@ -0,0 +1 @@ +pstat diff --git a/server/Makefile b/server/Makefile new file mode 100644 index 0000000..24cf554 --- /dev/null +++ b/server/Makefile @@ -0,0 +1,28 @@ +CC = gcc +CFLAGS = -Wall -Wextra -std=c11 -g -O2 -fwrapv -I.. +LDFLAGS = -lz +TARGET = pstat + +OBJDIR = obj + +.PHONY: all clean + +.SUFFIXES: + +all: $(TARGET) + +clean: + @echo "Cleaning" + @rm -f $(TARGET) + @rm -rf $(OBJDIR) + + +$(OBJDIR)/%.o: %.c $(wildcard *.h) + @mkdir -p $(OBJDIR) + @echo "CC $<" + @$(CC) $(CFLAGS) -c -o $@ $< + +$(TARGET): $(patsubst %.c,$(OBJDIR)/%.o,$(wildcard *.c)) + @make -q -C .. || make -C .. --no-print-directory + @echo "LD -o $@" + @$(CC) -o $@ $^ $(wildcard ../$(OBJDIR)/*.o) $(LDFLAGS) diff --git a/server/data_stream.c b/server/data_stream.c new file mode 100644 index 0000000..e86114e --- /dev/null +++ b/server/data_stream.c @@ -0,0 +1,156 @@ +#include <stdio.h> +#include <limits.h> +#include <string.h> +#include <unistd.h> +#include <sys/time.h> +#include <libproc.h> +#include "data_stream.h" + + +struct data_stream { + int fd; + i64 pidbufsize; + pid_t *pids; + i64 namebufsize; + u8 *namebuf; + i64 writebufsize; + u8 *writebuf; + i64 cursor; + struct zbuffer *z; +}; + +static bool callback_error=false; + +static void write_data_fd(u8 *data,i64 len,void *fdp){ + int fd=*(int*)fdp; + i64 cursor=0; + while(cursor<len){ + i64 nwr=write(fd,data+cursor,len-cursor); + if(nwr<0){ + perror("write"); + callback_error=true; + return; + } + cursor+=nwr; + } +} + +static void bufappend(u8 **buf,i64 *cursor,i64 *cap,u8 *data,i64 datalen){ + if(*cursor+datalen>=*cap){ + if(*cap*2>LARGENUM)return; + *cap*=2; + *buf=realloc(*buf,*cap,u8); + } + memcpy(*buf+*cursor,data,datalen); + *cursor+=datalen; +} + +static void serialise8(u8 *buf,i64 value){ + for(i64 i=0;i<8;i++)buf[i]=(value>>(8*i))&0xff; +} + +static void serialise4(u8 *buf,i32 value){ + for(i64 i=0;i<4;i++)buf[i]=(value>>(8*i))&0xff; +} + +struct data_stream* data_stream_init(int fd){ + struct data_stream *s=malloc(1,struct data_stream); + s->fd=fd; + s->pidbufsize=1024; + s->pids=malloc(s->pidbufsize,pid_t); + s->namebufsize=64; + s->namebuf=malloc(s->namebufsize,u8); + s->writebufsize=4096; + s->writebuf=malloc(s->writebufsize,u8); + s->cursor=0; + s->z=zbuffer_init(write_data_fd,&s->fd); + + callback_error=false; + + return s; +} + +int data_stream_finish_destroy(struct data_stream *s){ + callback_error=false; + zbuffer_finish_destroy(s->z); + free(s->pids); + free(s->namebuf); + free(s->writebuf); + free(s); + if(callback_error)return -1; + return 0; +} + +int data_stream_frame(struct data_stream *s){ + callback_error=false; + + u8 buf8[8]; + + struct timeval timeval; + if(gettimeofday(&timeval,NULL)<0){ + perror("gettimeofday"); + timeval.tv_sec=0; + timeval.tv_usec=0; + } + serialise8(buf8,timeval.tv_sec); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); + serialise4(buf8,timeval.tv_usec); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,4); + + i64 numpids=proc_listallpids(s->pids,s->pidbufsize*sizeof(pid_t)); + while(s->pidbufsize<=LARGENUM&&numpids>=s->pidbufsize){ + s->pidbufsize*=2; + s->pids=realloc(s->pids,s->pidbufsize,pid_t); + numpids=proc_listallpids(s->pids,s->pidbufsize*sizeof(pid_t)); + } + serialise8(buf8,numpids); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); + + for(i64 index=0;index<numpids;index++) { + pid_t pid=s->pids[index]; + + struct proc_taskinfo info; + int result=proc_pidinfo(pid,PROC_PIDTASKINFO,0,&info,sizeof(info)); + if(result!=sizeof info)continue; + + serialise4(buf8,pid); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,4); + + size_t namelen=proc_name(pid,s->namebuf,s->namebufsize); + while(s->namebufsize<=LARGENUM&&namelen==0){ + s->namebufsize*=2; + s->namebuf=realloc(s->namebuf,s->namebufsize,u8); + namelen=proc_name(pid,s->namebuf,s->namebufsize); + } + if(namelen==0){ + s->namebuf[0]='\0'; + } + + fprintf(stderr,"index=%" PRIi64 " pid=%d namelen=%zu\n",index,pid,namelen); + serialise8(buf8,namelen); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,s->namebuf,namelen); + + serialise8(buf8,info.pti_total_user); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); + serialise8(buf8,info.pti_total_system); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); + serialise8(buf8,info.pti_resident_size); + bufappend(&s->writebuf,&s->cursor,&s->writebufsize,buf8,8); + + // fprintf(stderr,"PID %u (%s): %llu %llu %llu\n",pid,namebuf,info.pti_total_user,info.pti_total_system,info.pti_resident_size); + } + + zbuffer_write(s->z,s->writebuf,s->cursor); + s->cursor=0; + + if(callback_error)return -1; + return 0; +} + +int data_stream_flush(struct data_stream *s){ + callback_error=false; + zbuffer_flush(s->z); + if(callback_error)return -1; + return 0; +} diff --git a/server/data_stream.h b/server/data_stream.h new file mode 100644 index 0000000..c8baf18 --- /dev/null +++ b/server/data_stream.h @@ -0,0 +1,13 @@ +#pragma once + +#include "global.h" +#include "zbuffer.h" + + +struct data_stream; + +// Return NULL or -1 on error +struct data_stream* data_stream_init(int fd); +int data_stream_finish_destroy(struct data_stream *s); +int data_stream_frame(struct data_stream *s); +int data_stream_flush(struct data_stream *s); // can degrade compression diff --git a/server/main.c b/server/main.c new file mode 100644 index 0000000..3cf1ffd --- /dev/null +++ b/server/main.c @@ -0,0 +1,200 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <errno.h> +#include <sys/time.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <sys/select.h> +#include "global.h" +#include "data_stream.h" +#include "line_reader.h" +#include "zbuffer.h" + +#define PORT 57575 + +#define POLLDELAY (10*1000000) + +/*struct proc_taskinfo { + uint64_t pti_virtual_size; virtual memory size (bytes) + uint64_t pti_resident_size; resident memory size (bytes) + uint64_t pti_total_user; total time + uint64_t pti_total_system; + uint64_t pti_threads_user; existing threads only + uint64_t pti_threads_system; + int32_t pti_policy; default policy for new threads + int32_t pti_faults; number of page faults + int32_t pti_pageins; number of actual pageins + int32_t pti_cow_faults; number of copy-on-write faults + int32_t pti_messages_sent; number of messages sent + int32_t pti_messages_received; number of messages received + int32_t pti_syscalls_mach; number of mach system calls + int32_t pti_syscalls_unix; number of unix system calls + int32_t pti_csw; number of context switches + int32_t pti_threadnum; number of threads in the task + int32_t pti_numrunning; number of running threads + int32_t pti_priority; task priority +};*/ + +static i64 make_timestamp(void){ + struct timeval tv; + gettimeofday(&tv,NULL); + return tv.tv_sec*(i64)1000000+tv.tv_usec; +} + +static void connection_handler(int sock){ + struct data_stream *stream=data_stream_init(sock); + struct line_reader *reader=line_reader_init(sock); + + i64 polldelay=POLLDELAY; + i64 timeleft=polldelay; + + bool do_send_frames=false; + + bool do_quit=false; + + while(true){ + struct timeval timeout={timeleft/1000000,timeleft%1000000}; + i64 before=make_timestamp(); + + fd_set inset; + FD_ZERO(&inset); + FD_SET(sock,&inset); + int ret=select(sock+1,&inset,NULL,NULL,do_send_frames?&timeout:NULL); + if(ret<0){ + if(errno==EINTR)continue; + perror("select"); + break; + } + if(ret==0||!FD_ISSET(sock,&inset)){ // timeout + if(do_send_frames){ + if(data_stream_frame(stream)<0){ + printf("Error sending data\n"); + goto cleanup; + } + printf("."); fflush(stdout); + } + timeleft=polldelay; + continue; + } + + i64 after=make_timestamp(); + if(after<before){ + printf("Time ran backwards?\n"); + after=before; + } + timeleft-=after-before; + if(timeleft<0)timeleft=0; + + char buf[1024]; + i64 nr=read(sock,buf,sizeof buf); + if(nr<0){ + if(errno==EINTR)continue; + perror("read"); + break; + } + if(nr==0)break; // EOF + line_reader_supply_data(reader,buf,nr); + + char *line; + while((line=line_reader_get_line(reader,false))!=NULL){ + char *word=line; + + printf("Read <%s>\n",line); + + char *next=strchr(word,' '); + if(next!=NULL){ + *next='\0'; + next++; + } + if(strcmp(word,"delay")==0){ + char *endp; + i64 v=strtoll(next,&endp,10); + if(*next=='\0'||*endp!='\0'||v<=1000){ + free(line); + printf("Invalid number in delay\n"); + continue; + } + polldelay=v; + printf("polldelay -> %" PRIi64 "\n",polldelay); + if(timeleft>polldelay)timeleft=polldelay; + } else if(strcmp(word,"start")==0){ + do_send_frames=true; + printf("Sending enabled\n"); + } else if(strcmp(word,"stop")==0){ + do_send_frames=false; + printf("Sending disabled\n"); + } else if(strcmp(word,"flush")==0){ + printf("Flushing\n"); + if(data_stream_flush(stream)<0){ + printf("Error flushing\n"); + goto cleanup; + } + } else if(strcmp(word,"quit")==0){ + printf("Quitting by request\n"); + do_quit=true; + goto cleanup; + } else { + printf("Read unrecognised line <%s>!\n",line); + } + + free(line); + } + } + +cleanup: + data_stream_finish_destroy(stream); + if(do_quit)exit(0); +} + +int main(void){ + int sock=socket(PF_INET,SOCK_STREAM,0); + if(sock<0){ + perror("socket"); + return 1; + } + + int one=1; + if(setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&one,sizeof(int))<0){ + perror("setsockopt"); + return 1; + } + + struct sockaddr_in sin; + sin.sin_family=AF_INET; + sin.sin_addr.s_addr=htonl(INADDR_ANY); + sin.sin_port=htons(PORT); + + if(bind(sock,(struct sockaddr*)&sin,sizeof(sin))<0){ + close(sock); + perror("bind"); + return 1; + } + + if(listen(sock,1)<0){ + perror("listen"); + return 1; + } + + printf("Listening on port %d\n",PORT); + + while(true){ + struct sockaddr_storage client_addr; + socklen_t client_addr_sz=sizeof(client_addr); + int clientsock=accept(sock,(struct sockaddr*)&client_addr,&client_addr_sz); + if(clientsock<0){ + perror("accept"); + continue; + } + + char str[INET_ADDRSTRLEN]; + inet_ntop(client_addr.ss_family,&((struct sockaddr_in*)&client_addr)->sin_addr, + str,sizeof(str)); + + printf("Accept from %s\n",str); + connection_handler(clientsock); + close(clientsock); + } +} diff --git a/server/pstat.py b/server/pstat.py new file mode 100755 index 0000000..eed6528 --- /dev/null +++ b/server/pstat.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python3 +import psutil, os, sys, time + +t=time.clock_gettime(time.CLOCK_MONOTONIC) + +for p in psutil.process_iter(): + try: + with p.oneshot(): + print(p.pid,p.username(),p.name(),p.memory_info().rss,p.cpu_times()) + except Exception as e: + pass + +t=time.clock_gettime(time.CLOCK_MONOTONIC)-t +print(t,file=sys.stderr) diff --git a/server/zbuffer.c b/server/zbuffer.c new file mode 100644 index 0000000..af37f74 --- /dev/null +++ b/server/zbuffer.c @@ -0,0 +1,79 @@ +#include <assert.h> +#include <zlib.h> +#include "zbuffer.h" + +#define BUFSZ_ZOUT (256*1024) + + +struct zbuffer { + u8 *out; + i64 outfill; + z_stream strm; + + zbuffer_writefunc *wf; + void *payload; +}; + +struct zbuffer* zbuffer_init(zbuffer_writefunc *wf,void *payload){ + struct zbuffer *z=malloc(1,struct zbuffer); + z->out=malloc(BUFSZ_ZOUT,u8); + z->outfill=0; + + z->strm.zalloc=Z_NULL; + z->strm.zfree=Z_NULL; + z->strm.opaque=Z_NULL; + + // so zbuffer_write can start writing immediately + z->strm.avail_out=BUFSZ_ZOUT; + z->strm.next_out=z->out; + + if(deflateInit(&z->strm,Z_DEFAULT_COMPRESSION)!=Z_OK){ + return NULL; + } + + z->wf=wf; + z->payload=payload; + return z; +} + +void zbuffer_write(struct zbuffer *z,u8 *data,i64 len){ + z->strm.avail_in=len; + z->strm.next_in=data; + assert(z->strm.avail_out>0); + while(true){ + int ret=deflate(&z->strm,Z_NO_FLUSH); + assert(ret==Z_OK); + if(z->strm.avail_out>0)break; + z->wf(z->out,BUFSZ_ZOUT,z->payload); + z->strm.avail_out=BUFSZ_ZOUT; + z->strm.next_out=z->out; + } + assert(z->strm.avail_in==0); +} + +void zbuffer_flush(struct zbuffer *z){ + assert(z->strm.avail_out>0); + while(true){ + int ret=deflate(&z->strm,Z_SYNC_FLUSH); + assert(ret==Z_OK); + z->wf(z->out,BUFSZ_ZOUT-z->strm.avail_out,z->payload); + bool done=z->strm.avail_out>0; + z->strm.avail_out=BUFSZ_ZOUT; + z->strm.next_out=z->out; + if(done)break; + } +} + +void zbuffer_finish_destroy(struct zbuffer *z){ + assert(z->strm.avail_out>0); + while(true){ + int ret=deflate(&z->strm,Z_FINISH); + assert(ret==Z_OK||ret==Z_STREAM_END); + z->wf(z->out,BUFSZ_ZOUT-z->strm.avail_out,z->payload); + if(z->strm.avail_out>0)break; + z->strm.avail_out=BUFSZ_ZOUT; + z->strm.next_out=z->out; + } + free(z->out); + free(z); +} diff --git a/server/zbuffer.h b/server/zbuffer.h new file mode 100644 index 0000000..2e9f707 --- /dev/null +++ b/server/zbuffer.h @@ -0,0 +1,13 @@ +#pragma once + +#include "global.h" + + +typedef void zbuffer_writefunc(u8*,i64,void*); + +struct zbuffer; + +struct zbuffer* zbuffer_init(zbuffer_writefunc *wf,void *payload); +void zbuffer_write(struct zbuffer *z,u8 *data,i64 len); +void zbuffer_flush(struct zbuffer *z); // can degrade compression +void zbuffer_finish_destroy(struct zbuffer *z); |