Asynchronicity

Theory

We could spend more time playing with HDF5’s many settings. But my personal experiments suggest that overall, we’re not going to get much faster on NVMe storage with lossless compression.1

Instead, let us try to solve another issue of our storage I/O code. Namely the fact that whenever the combination of HDF5 and the Linux kernel decides to actually send some of our data to storage, our code ends up waiting for that I/O to complete inside of HDF5Writer::write().

This is a shame because of instead of twiddling its thumbs like that, our code could instead be computing some extra simulation steps ahead. This way, whenever the I/O device is ready again, our code would be able to submit data to it faster, reducing the compute pause between two I/O transactions. In an ideal world, we should be able to get this to the point where whenever storage is able to accept new data from our code, our code is able to submit a new image immediately.

We must not overdo this optimization, however. The storage device is slower than the computation, so if we don’t cap the number of images that we generate in advance like this, we’ll just end up stacking up more and more images in RAM until we run out of memory. To avoid this, we will want to cap the number of pre-generated images that can be resident in RAM waiting to be picked up by I/O. This is called backpressure, and is an omnipresent concern in I/O-bound computations.

That’s it for the theory, now let’s see how this is done in practice.

Implementation

Here we will want to set up a dedicated I/O thread, which is in charge of offloading HDF5Writer::write() calls away from the main thread in such a way that only a bounded amount of pre-computed images may be waiting to be written to storage. To this end, the I/O thread will be provided with two FIFO channels:

  • One channel goes from the main thread to the I/O thread, and is used to submit computed images to the I/O thread so that they get written to storage. This channel is bounded to a certain length to prevent endless accumulation of computed images.
  • One channel goes from the I/O thread to the main thread, and is used to recycle previously allocated images after they have been written to disk. This channel does not need to be bounded because the other channel is enough to limit the number of images in flight.

Of course, we should not hardcode the FIFO length. So instead, we will make it configurable as yet another runner option:

/// Maximal number of images waiting to be written to storage
#[arg(long, default_value_t = 1)]
pub storage_buffer_len: usize,

Inside of run_simulation(), we will then set up a threading scope. This is the easiest API for spawning threads in Rust, because it lets us borrow data from the outer scope, in exchange for joining all spawned threads at the end.

// Set up HDF5 I/O (unchanged, only repeated to show where the scope goes)
let mut hdf5 =
    HDF5Writer::create(&opts.runner.file_name, concentrations.shape(), &opts.runner)?;

// Set up a scope for the I/O thread
std::thread::scope(|s| {
      // TODO: Set up I/O thread, then performed overlapped compute and I/O

      // Need to hint the compiler about the output error type
      Ok::<_, Box<dyn Error>>(())
})?;

// Remove final hdf5.close(), this will now be handled by the I/O thread
Ok(())

Then we set up FIFO communication channels with the I/O threads, making sure that the channel for sending images from the main thread to the I/O thread has a bounded capacity.

use std::sync::mpsc;

let (result_send, result_recv) =
    mpsc::sync_channel::<Array2<Float>>(opts.runner.storage_buffer_len);
let (recycle_send, recycle_recv) = mpsc::channel();

After this, we start the I/O thread. This thread acquires ownership of the HDF5Writer, the FIFO endpoint that received computed images, and the FIFO endpoint that sends back images after writing them to storage. It then proceeds to iterate over computed images from the main thread until the main thread drops result_send, sending back images after writing them to storage, and at the end it closes the HDF5 writer.

s.spawn(move || {
    for image in result_recv {
        hdf5.write(image.view()).expect("Failed to write image");
        // Ignore failures to recycle images: it only means that the main thread
        // has stopped running, which is normal at the end of the simulation.
        let _ = recycle_send.send(image);
    }
    hdf5.close().expect("Failed to close HDF5 file");
});

Notice that we allow ourselves to handle errors via panicking here. This is fine because if the I/O thread does panic, it will automatically close the result_recv FIFO channel, which will close the matching result_send endpoint on the main thread side. Thus the main thread will detect that the I/O thread is not able to accept images anymore and will be able to handle it as an error.

After this comes a tricky part: for everything to work out smoothly, we must start a new scope and move the main thread’s FIFO endpoints into it:

{
    // Move the I/O channels here to ensure they get dropped
    let (result_send, recycle_recv) = (result_send, recycle_recv);

    // ... compute and send images
}

The reason why we do this is that this ensures that once the main thread is done, its FIFO channels get dropped. This will be detected by the I/O thread, and will result in iteration over result_recv stopping. As a result, the I/O thread will move on, close the HDF5 writer, and terminate.

Finally, on the main thread side, we set up sending of owned images to the I/O thread, with proper allocation recycling…

use crate::data::Float;
use ndarray::Array2;
use std::sync::mpsc::TryRecvError;

let shape = concentrations.shape();
let data = concentrations.current_v()?;
let computed_image = ArrayView2::from_shape(shape, &data[..])?;
let mut owned_image = match recycle_recv.try_recv() {
    Ok(owned_image) => owned_image,
    Err(TryRecvError::Empty) => Array2::default(shape),
    Err(TryRecvError::Disconnected) => panic!("I/O thread stopped unexpectedly"),
};
owned_image.assign(&computed_image);
result_send.send(owned_image)?;

Exercise

Implement this optimization in your code and measure its performance impact on your hardware.

If your storage is fast, you may find that the sequential copy of the image data from computed_image to owned_image becomes a bottleneck. In that case, you will want to look into parallelizing this operation with Rayon so that it puts your RAM bandwidth to better use.


1

Lossy compression can get us a lot further, but it tends to trigger the anger of our scientist colleagues and only be accepted after long hours of political negotiation, so we should only consider it as a last resort.