Asynchronicity
Theory
We could spend more time playing with HDF5’s many settings. But my personal experiments suggest that overall, we’re not going to get much faster on NVMe storage with lossless compression.1
Instead, let us try to solve another issue of our storage I/O code. Namely the
fact that whenever the combination of HDF5 and the Linux kernel decides to
actually send some of our data to storage, our code ends up waiting for that I/O
to complete inside of HDF5Writer::write()
.
This is a shame because of instead of twiddling its thumbs like that, our code could instead be computing some extra simulation steps ahead. This way, whenever the I/O device is ready again, our code would be able to submit data to it faster, reducing the compute pause between two I/O transactions. In an ideal world, we should be able to get this to the point where whenever storage is able to accept new data from our code, our code is able to submit a new image immediately.
We must not overdo this optimization, however. The storage device is slower than the computation, so if we don’t cap the number of images that we generate in advance like this, we’ll just end up stacking up more and more images in RAM until we run out of memory. To avoid this, we will want to cap the number of pre-generated images that can be resident in RAM waiting to be picked up by I/O. This is called backpressure, and is an omnipresent concern in I/O-bound computations.
That’s it for the theory, now let’s see how this is done in practice.
Implementation
Here we will want to set up a dedicated I/O thread, which is in charge of
offloading HDF5Writer::write()
calls away from the main thread in such a way
that only a bounded amount of pre-computed images may be waiting to be written
to storage. To this end, the I/O thread will be provided with two FIFO
channels:
- One channel goes from the main thread to the I/O thread, and is used to submit computed images to the I/O thread so that they get written to storage. This channel is bounded to a certain length to prevent endless accumulation of computed images.
- One channel goes from the I/O thread to the main thread, and is used to recycle previously allocated images after they have been written to disk. This channel does not need to be bounded because the other channel is enough to limit the number of images in flight.
Of course, we should not hardcode the FIFO length. So instead, we will make it configurable as yet another runner option:
/// Maximal number of images waiting to be written to storage
#[arg(long, default_value_t = 1)]
pub storage_buffer_len: usize,
Inside of run_simulation()
, we will then set up a threading scope. This is the
easiest API for spawning threads in Rust, because it lets us borrow data from
the outer scope, in exchange for joining all spawned threads at the end.
// Set up HDF5 I/O (unchanged, only repeated to show where the scope goes)
let mut hdf5 =
HDF5Writer::create(&opts.runner.file_name, concentrations.shape(), &opts.runner)?;
// Set up a scope for the I/O thread
std::thread::scope(|s| {
// TODO: Set up I/O thread, then performed overlapped compute and I/O
// Need to hint the compiler about the output error type
Ok::<_, Box<dyn Error>>(())
})?;
// Remove final hdf5.close(), this will now be handled by the I/O thread
Ok(())
Then we set up FIFO communication channels with the I/O threads, making sure that the channel for sending images from the main thread to the I/O thread has a bounded capacity.
use std::sync::mpsc;
let (result_send, result_recv) =
mpsc::sync_channel::<Array2<Float>>(opts.runner.storage_buffer_len);
let (recycle_send, recycle_recv) = mpsc::channel();
After this, we start the I/O thread. This thread acquires ownership of the
HDF5Writer
, the FIFO endpoint that received computed images, and the FIFO
endpoint that sends back images after writing them to storage. It then proceeds
to iterate over computed images from the main thread until the main thread drops
result_send
, sending back images after writing them to storage, and at the end
it closes the HDF5 writer.
s.spawn(move || {
for image in result_recv {
hdf5.write(image.view()).expect("Failed to write image");
// Ignore failures to recycle images: it only means that the main thread
// has stopped running, which is normal at the end of the simulation.
let _ = recycle_send.send(image);
}
hdf5.close().expect("Failed to close HDF5 file");
});
Notice that we allow ourselves to handle errors via panicking here. This is fine
because if the I/O thread does panic, it will automatically close the
result_recv
FIFO channel, which will close the matching result_send
endpoint
on the main thread side. Thus the main thread will detect that the I/O thread is
not able to accept images anymore and will be able to handle it as an error.
After this comes a tricky part: for everything to work out smoothly, we must start a new scope and move the main thread’s FIFO endpoints into it:
{
// Move the I/O channels here to ensure they get dropped
let (result_send, recycle_recv) = (result_send, recycle_recv);
// ... compute and send images
}
The reason why we do this is that this ensures that once the main thread is
done, its FIFO channels get dropped. This will be detected by the I/O thread,
and will result in iteration over result_recv
stopping. As a result, the I/O
thread will move on, close the HDF5 writer, and terminate.
Finally, on the main thread side, we set up sending of owned images to the I/O thread, with proper allocation recycling…
use crate::data::Float;
use ndarray::Array2;
use std::sync::mpsc::TryRecvError;
let shape = concentrations.shape();
let data = concentrations.current_v()?;
let computed_image = ArrayView2::from_shape(shape, &data[..])?;
let mut owned_image = match recycle_recv.try_recv() {
Ok(owned_image) => owned_image,
Err(TryRecvError::Empty) => Array2::default(shape),
Err(TryRecvError::Disconnected) => panic!("I/O thread stopped unexpectedly"),
};
owned_image.assign(&computed_image);
result_send.send(owned_image)?;
Exercise
Implement this optimization in your code and measure its performance impact on your hardware.
If your storage is fast, you may find that the sequential copy of the image data
from computed_image
to owned_image
becomes a bottleneck. In that case, you
will want to look into parallelizing this operation with Rayon so that it puts
your RAM bandwidth to better use.
Lossy compression can get us a lot further, but it tends to trigger the anger of our scientist colleagues and only be accepted after long hours of political negotiation, so we should only consider it as a last resort.