Here is a little fact for you (I originally intended to call it a “fun fact,” but realized that’s stretching the concept of “fun” just a little too far!). In Rust’s TcpStream, and, more importantly, most APIs that aim to mimic the standard library’s TcpStream such as mio or Tokio, the stream is created with the TCP_NODELAY flag set to false. In practice, this means that Nagle’s algorithm is used, which can cause some issues with latency outliers and possibly reduced throughput on some workloads.
Nagle’s algorithm is an algorithm that aims to reduce network congestion by pooling small network packages together. If you look at non-blocking I/O implementations in other languages, many, if not most, disable this algorithm by default. This is not the case in most Rust implementations and is worth being aware of. You can disable it by simply calling TcpStream::set_nodelay(true). If you try to create your own async library or rely on Tokio/mio, and observe lower throughput than expected or latency problems, it’s worth checking whether this flag is set to true or not.
To continue with the code, the next step is setting TcpStream to non-blocking by calling Tcp Stream::set_nonblocking(true).
After that, we write our request to the server before we register interest in read events by setting the EPOLLIN flag bit in the interests bitmask.
For each iteration, we push the stream to the end of our streams collection.
The next part of the main function is handling incoming events.
Let’s take a look at the last part of our main function:
let mut handled_events = 0;
while handled_events < n_events {
let mut events = Vec::with_capacity(10);
poll.poll(&mut events, None)?;
if events.is_empty() {
println!(“TIMEOUT (OR SPURIOUS EVENT NOTIFICATION)”);
continue;
}
handled_events += handle_events(&events, &mut streams)?;
}
println!(“FINISHED”);
Ok(())
}
The first thing we do is create a variable called handled_events to track how many events we have handled.
Next is our event loop. We loop as long as the handled events are less than the number of events we expect. Once all events are handled, we exit the loop.
Inside the loop, we create a Vec<Event> with the capacity to store 10 events. It’s important that we create this using Vec::with_capacity since the operating system will assume that we pass it memory that we’ve allocated. We could choose any number of events here and it would work just fine, but setting too low a number would limit how many events the operating system could notify us about on each wakeup.
Next is our blocking call to Poll::poll. As you know, this will actually tell the operating system to park our thread and wake us up when an event has occurred.
If we’re woken up, but there are no events in the list, it’s either a timeout or a spurious event (which could happen, so we need a way to check whether a timeout has actually elapsed if that’s important to us). If that’s the case, we simply call Poll::poll once more.
If there are events to be handled, we pass these on to the handle_events function together with a mutable reference to our streams collection.
Leave a Reply