Conversation
Add a new worker called "script", which compiles previously introduced parser using LLVM into bytecode for execution. It also provides a set of functionality available at runtime for such scripts. Note that it's a breaking change in terms of the interface. If before the workload configuration was passed as a first positional argument, now it has to be passed via `-c workload.toml`, to distinguish from passing a script via `-f script.ber`.
Molter73
left a comment
There was a problem hiding this comment.
Still need to go through script.rs, but it is late on a Friday, so I'll come back to this later. Overall looks good!
I'm getting an error when trying to compile:
error: No suitable version of LLVM was found system-wide or pointed
to by LLVM_SYS_201_PREFIX.
Consider using `llvmenv` to compile an appropriate copy of LLVM, and
refer to the llvm-sys documentation for more information.
llvm-sys: https://crates.io/crates/llvm-sys
llvmenv: https://crates.io/crates/llvmenv
--> /home/mmoltras/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/llvm-sys-201.0.1/src/lib.rs:533:1
|
533 | / std::compile_error!(concat!(
534 | | "No suitable version of LLVM was found system-wide or pointed
535 | | to by LLVM_SYS_",
536 | | env!("CARGO_PKG_VERSION_MAJOR"),
... |
543 | | llvmenv: https://crates.io/crates/llvmenv"
544 | | ));
| |__^
Do we have any guidance on what version of LLVM we should have available?
| flag_f: Option<String>, | ||
| } | ||
|
|
||
| fn run_script(script_path: String) -> Vec<(i32, u64)> { |
There was a problem hiding this comment.
Probably a good idea to move this function and run_workload to a separate file, we should aim to have a simple main function in here and nothing else.
| let works = apply_rules(works); | ||
|
|
||
| works.into_iter().for_each(|node| { | ||
| debug!("AST NODE: {:?}", node); | ||
|
|
||
| let Node::Work { | ||
| name: _, | ||
| args, | ||
| instructions: _, | ||
| dist: _, | ||
| } = node | ||
| else { | ||
| unreachable!() | ||
| }; | ||
|
|
||
| let workers: u32 = args | ||
| .get("workers") | ||
| .cloned() | ||
| .unwrap_or(String::from("0")) | ||
| .parse() | ||
| .unwrap(); | ||
|
|
||
| info!("Config: {:?}", config); | ||
| let duration: u64 = args | ||
| .get("duration") | ||
| .cloned() | ||
| .unwrap_or(String::from("0")) | ||
| .parse() | ||
| .unwrap(); | ||
|
|
||
| let h: Vec<_> = (0..workers) | ||
| .map(|_| { | ||
| let worker = new_script_worker(node.clone()); | ||
|
|
||
| match fork() { | ||
| Ok(Fork::Parent(child)) => { | ||
| info!("Child {}", child); | ||
| Some((child, duration)) | ||
| } | ||
| Ok(Fork::Child) => { | ||
| worker.run_payload().unwrap(); | ||
| None | ||
| } | ||
| Err(e) => { | ||
| warn!("Failed: {e:?}"); | ||
| None | ||
| } | ||
| } | ||
| }) | ||
| .collect(); | ||
|
|
||
| handles.extend(h); | ||
| }); | ||
|
|
||
| handles.iter().filter_map(|i| *i).collect() |
There was a problem hiding this comment.
I think this is equivalent and (maybe) slightly easier to read:
| let works = apply_rules(works); | |
| works.into_iter().for_each(|node| { | |
| debug!("AST NODE: {:?}", node); | |
| let Node::Work { | |
| name: _, | |
| args, | |
| instructions: _, | |
| dist: _, | |
| } = node | |
| else { | |
| unreachable!() | |
| }; | |
| let workers: u32 = args | |
| .get("workers") | |
| .cloned() | |
| .unwrap_or(String::from("0")) | |
| .parse() | |
| .unwrap(); | |
| info!("Config: {:?}", config); | |
| let duration: u64 = args | |
| .get("duration") | |
| .cloned() | |
| .unwrap_or(String::from("0")) | |
| .parse() | |
| .unwrap(); | |
| let h: Vec<_> = (0..workers) | |
| .map(|_| { | |
| let worker = new_script_worker(node.clone()); | |
| match fork() { | |
| Ok(Fork::Parent(child)) => { | |
| info!("Child {}", child); | |
| Some((child, duration)) | |
| } | |
| Ok(Fork::Child) => { | |
| worker.run_payload().unwrap(); | |
| None | |
| } | |
| Err(e) => { | |
| warn!("Failed: {e:?}"); | |
| None | |
| } | |
| } | |
| }) | |
| .collect(); | |
| handles.extend(h); | |
| }); | |
| handles.iter().filter_map(|i| *i).collect() | |
| apply_rules(works) | |
| .into_iter() | |
| .map(|node| { | |
| let Node::Work { args, .. } = node else { | |
| unreachable!() | |
| }; | |
| let workers: u32 = args | |
| .get("workers") | |
| .cloned() | |
| .unwrap_or(String::from("0")) | |
| .parse() | |
| .unwrap(); | |
| let duration: u64 = args | |
| .get("duration") | |
| .cloned() | |
| .unwrap_or(String::from("0")) | |
| .parse() | |
| .unwrap(); | |
| (0..workers) | |
| .filter_map(|_| { | |
| let worker = new_script_worker(node.clone()); | |
| match fork() { | |
| Ok(Fork::Parent(child)) => { | |
| info!("Child {}", child); | |
| Some((child, duration)) | |
| } | |
| Ok(Fork::Child) => { | |
| worker.run_payload().unwrap(); | |
| None | |
| } | |
| Err(e) => { | |
| warn!("Failed: {e:?}"); | |
| None | |
| } | |
| } | |
| }) | |
| .collect::<Vec<_>>() | |
| }) | |
| .flatten() | |
| .collect() |
| let workers: u32 = args | ||
| .get("workers") | ||
| .cloned() | ||
| .unwrap_or(String::from("0")) | ||
| .parse() | ||
| .unwrap(); | ||
|
|
||
| info!("Config: {:?}", config); | ||
| let duration: u64 = args | ||
| .get("duration") | ||
| .cloned() | ||
| .unwrap_or(String::from("0")) | ||
| .parse() | ||
| .unwrap(); |
There was a problem hiding this comment.
I wonder if we could give args are more concise type that handled all this logic under the hood 🤔
| .collect(); | ||
|
|
||
| info!("In total: {}", upper); | ||
| handles.iter().filter_map(|i| *i).collect() |
There was a problem hiding this comment.
We can probably change the map operation in line 72 to filter_map and replace this line with just handles.
This will also save an iteration and allocation, since this line will iterate over all the elements of handles, remove the None elements and allocate a new vector for the remaining items, all of which can be done during the original iteration.
| // Ignore processes without specified duration -- we don't want | ||
| // neither terminate them, nor count against processes to compare. | ||
| let watched_processes = processes | ||
| .iter() | ||
| .filter(|(_, duration)| *duration > 0) | ||
| .collect::<Vec<_>>(); | ||
|
|
||
| // Find all processes with expired duration. If we've received | ||
| // SIGTERM, get all processes. | ||
| let expired = watched_processes | ||
| .iter() | ||
| .filter(|(_, duration)| { | ||
| *duration < elapsed | ||
| || terminating.load(Ordering::Relaxed) | ||
| }) | ||
| .collect::<Vec<_>>(); | ||
|
|
||
| for (handle, _) in &expired { | ||
| info!("Terminating: {}", *handle); | ||
| let _ = kill(Pid::from_raw(*handle), Signal::SIGKILL); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| if expired.len() == watched_processes.len() { | ||
| break; | ||
| } |
There was a problem hiding this comment.
I think this is equivalent (barring some comments):
| // Ignore processes without specified duration -- we don't want | |
| // neither terminate them, nor count against processes to compare. | |
| let watched_processes = processes | |
| .iter() | |
| .filter(|(_, duration)| *duration > 0) | |
| .collect::<Vec<_>>(); | |
| // Find all processes with expired duration. If we've received | |
| // SIGTERM, get all processes. | |
| let expired = watched_processes | |
| .iter() | |
| .filter(|(_, duration)| { | |
| *duration < elapsed | |
| || terminating.load(Ordering::Relaxed) | |
| }) | |
| .collect::<Vec<_>>(); | |
| for (handle, _) in &expired { | |
| info!("Terminating: {}", *handle); | |
| let _ = kill(Pid::from_raw(*handle), Signal::SIGKILL); | |
| } | |
| }); | |
| } | |
| if expired.len() == watched_processes.len() { | |
| break; | |
| } | |
| for (handle, _) in processes.extract_if(.., |(_, duration)| { | |
| (*duration > 0 && *duration < elapsed) | |
| || terminating.load(Ordering::Relaxed) | |
| }) { | |
| let _ = kill(Pid::from_raw(*handle), Signal::SIGKILL); | |
| } | |
| if processes.is_empty() | |
| || processes.iter().all(|(_, duration)| *duration == 0) | |
| { | |
| break; | |
| } | |
Add a new worker called "script", which compiles previously introduced parser using LLVM into bytecode for execution. It also provides a set of functionality available at runtime for such scripts. It doesn't implement the machine state part yet.
Note that it's a breaking change in terms of the interface. If before the workload configuration was passed as a first positional argument, now it has to be passed via
-c workload.toml, to distinguish from passing a script via-f script.ber.