aboutsummaryrefslogtreecommitdiff
path: root/worker/src/main.rs
blob: f35a297228c9176a2a2683c802cf179197527da7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
use std::env;
use std::fs::File;
use std::io::{self, Write};
use std::process;
use tempfile;
use dd_utils::protocol::*;
use crate::connection::Connection;
use crate::compute_core::ComputeCore;

mod compute_core;
mod connection;

#[repr(C)]
struct mandel_input {
	ltx: f64,
    lty: f64,
	rbx: f64,
    rby: f64,
	width: usize,
    height: usize,
	maxiter: usize,
}

fn aligned_buffer(num_bytes: usize, align_bits: usize, backend: &mut Vec<u8>) -> &mut [u8] {
    assert!(align_bits > 0);

    backend.resize(num_bytes + (1 << align_bits) - 1, 0u8);
    let begin_ptr = backend.as_ptr() as usize;
    let aligned_ptr = ((begin_ptr - 1) & !((1 << align_bits) - 1)) + (1 << align_bits);
    let offset = aligned_ptr - begin_ptr;
    &mut backend[offset .. offset + num_bytes]
}

fn main() -> io::Result<()> {
    let args = env::args().collect::<Vec<_>>();
    if args.len() != 2 {
        eprintln!("Usage: {} <controller ip:port>", args[0]);
        process::exit(1);
    }

    let controller_addr = &args[1];

    let tempdir = tempfile::tempdir()?;

    let mut conn = Connection::new(controller_addr)?;

    let mut ccore: Option<ComputeCore> = None;

    loop {
        let opt_msg = conn.receive()?;
        let msg = match opt_msg { Some(msg) => msg, None => break };

        match msg.body {
            MessageBody::Version(version) => {
                if version == 1 {
                    conn.reply(msg.id, Ok(Reply::Version(true)))?;
                } else {
                    conn.reply(msg.id, Ok(Reply::Version(false)))?;
                    conn.close();
                    break;
                }
            }

            MessageBody::NewCore(_name, libfile) => {
                if ccore.is_some() {
                    println!("Unloading current core");
                    ccore = None;
                }

                let path = tempdir.path().join("core.so");

                let res =
                    File::create(&path)
                        .and_then(|mut f| { f.write_all(&libfile)?; Ok(f) })
                        .and_then(|f| {
                            drop(f);
                            println!("Wrote new core to {}, loading", path.display());
                            ComputeCore::load(&path)
                        });

                match res {
                    Ok(new_ccore) => {
                        ccore = Some(new_ccore);
                        conn.reply(msg.id, Ok(Reply::NewCore))?;
                    }
                    Err(err) => {
                        conn.reply(msg.id, Err(format!("Could not load core: {}", err)))?;
                    }
                }
            }

            MessageBody::Job(_jobid, mut input) => {
                if let Some(ccore) = &mut ccore {
                    let (ret, output) = ccore.run_job(&mut input);
                    conn.reply(msg.id, Ok(Reply::Job(ret, output)))?;
                } else {
                    conn.reply(msg.id, Err("No core loaded".to_string()))?;
                }
            }
        }
    }

    // let mut core = compute_core::ComputeCore::load(&lib_fname)?;

    // let mut input = b"kaas\0".to_vec();
    // println!("Input: {:?} = {}", input, String::from_utf8_lossy(&input));
    // let (ret, output) = core.run_job(&mut input);
    // println!("Exit code: {}", ret);
    // println!("Output: {:?} = {}", output, String::from_utf8_lossy(&output));

    // let input = mandel_input {
    //     ltx: -1.5, lty: 1.0,
    //     rbx: 1.0, rby: 1.0,
    //     width: 2000, height: 1600,
    //     maxiter: 1024
    // };
    // let mut input_bytes_backend = Vec::new();
    // let input_bytes = aligned_buffer(std::mem::size_of::<mandel_input>(), 3, &mut input_bytes_backend);
    // unsafe {
    //     std::ptr::copy(&input as *const mandel_input as *const u8, input_bytes.as_mut_ptr(), std::mem::size_of::<mandel_input>());
    // }
    // let (ret, output) = core.run_job(input_bytes);
    // println!("ret = {}", ret);

    Ok(())
}