Downloading 100,000 Files Using Async Rust


Rust's new async/await feature makes it
easy to stop and start asynchronous tasks

(from: Wikimedia Commons)

Imagine if you had a text file containing thousands of URLs:

$ cat urls.txt
https://example.com/1.html
https://example.com/2.html
https://example.com/3.html

etc...

https://example.com/99999.html
https://example.com/100000.html

…and you needed to download all of those HTML pages efficiently. How would you do it? Maybe a shell script using xargs and curl? Maybe a simple Golang program? Go’s powerful concurrency features would work well for this.

Instead, I decided to try to use Rust. I’ve read a lot about safe concurrency in Rust, but I’ve never tried it. I also wanted to learn what Rust’s new “async/await” feature was all about. This seemed like the perfect task for asynchronous Rust code.

TL/DR: Here’s the code. The rest of this post will explain how it works.

Getting Started With Reqwest

There are many different Rust HTTP clients to choose from, and apparently some controversy about which works best. Because I’m a Rust newbie, I decided simply to pick the most popular: reqwest. Request is a high level, easy to use HTTP client, written by Sean McArthur. He just updated it to work with Tokio, Rust’s new async/await engine, so this is the perfect time to try using it. Here’s the example from the readme:

use std::collections::HashMap;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let resp = reqwest::get("https://httpbin.org/ip")
        .await?
        .json::<HashMap<String, String>>()
        .await?;
    println!("{:#?}", resp);
    Ok(())
}

This version downloads some JSON and parses it. Notice the new async and await keywords. The main function is async - this means that the function becomes part of a large state machine run by Tokio. When you mark a function asynchronous, you can then call await inside it, which will pause that function temporarily, allowing other asynchronous functions to run on the same thread.

I decided to modify this to print out the number of bytes downloaded instead; you could easily change it to save the data to a file or do whatever you want.

let path = "https://httpbin.org/ip";
match reqwest::get(path).await {
    Ok(resp) => {
        match resp.text().await {
            Ok(text) => {
                println!("RESPONSE: {} bytes from {}", text.len(), path);
            }
            Err(_) => println!("ERROR reading {}", path),
        }
    }
    Err(_) => println!("ERROR downloading {}", path),
}
Ok(())

This is a two step process:

  • First I call get(path) to send the HTTP GET request. Then I use await to wait for the request to finish and return a result.

  • Second, if the request was successful, I call resp.text() to get the contents of the response body. And I wait again while that is loaded.

I handle the errors explicitly and always return a unit result Ok(()) because that makes the code below simpler when I start downloading more than one page concurrently.

Visually, I can draw the get and text calls like this:

First I call get and wait, then I call text and wait.

But what is asynchronous about this? This reads like normal, single threaded code. I do one thing, then I do another.

Sending 3 Concurrent Requests

The magic happens when I have more than one request I want to make in parallel. Let’s use three hard coded path strings:

let paths = vec![
    "https://example.com/1.html".to_string(),
    "https://example.com/2.html".to_string(),
    "https://example.com/3.html".to_string(),
];

To download the 3 HTML files in parallel, I spawn three Tokio “tasks” and wait for them all to complete. (This requires adding the futures crate to Cargo.toml, which implements join_all.)

// Iterate over the paths.
let mut tasks: Vec<JoinHandle<Result<(), ()>>>= vec![];
for path in paths {

    // Copy each path into a new string
    // that can be consumed/captured by the task closure
    let path = path.clone();

    // Create a Tokio task for each path
    tasks.push(tokio::spawn(async move {
        match reqwest::get(&path).await {
            Ok(resp) => {
                match resp.text().await {
                    Ok(text) => {
                        println!("RESPONSE: {} bytes from {}", text.len(), path);
                    }
                    Err(_) => println!("ERROR reading {}", path),
                }
            }
            Err(_) => println!("ERROR downloading {}", path),
        }
        Ok(())
    }));
}

// Wait for them all to finish
println!("Started {} tasks. Waiting...", tasks.len());
join_all(tasks).await;

Each Tokio task is a closure passed to the tokio::spawn function, marked async move. I create a copy of each path, using path.clone(), so the closure has its own copy of the path string with its own lifetime.

The complex type annotation on the tasks array indicates what each call to spawn returns: a JoinHandle enclosing a Result. To keep things simple, I handle all errors in the closure and just return Ok(()). This means each JoinHandle contains a trivial result: Result<(), ()>. I could have written the closure to return some value and/or some error value instead.

After the loop is finished and all three tasks have been spawned, I call join_all(tasks).await to wait for them all to finish.

Asynchronous vs Multithreaded

At first glance, it looks like this code is spawning three different threads. I even call a spawn function. A multithreaded download might look like this:

We have 3 paths, so we have 3 threads. Each thread calls get and waits, and then calls text and waits.

However, Rust’s Tokio engine doesn’t work that way. Instead of launching an entirely new thread for each task, it runs all three tasks on the same thread.

Update: Wesley Moore pointed out on Twitter that: "Tokio multiplexes m tasks into a pool of n threads so it’s able to use all available cores. (M:N threading)." It looks like Tokio supports both a Basic (single threaded) and Threaded (thread pool) Scheduler; see the docs for more information.

I imagine three tasks running on one thread like this:

Each time I call await, Rust stops one task and starts another using the same thread. In fact, depending on how long it takes for each task to complete, they might be run in a different order:

There’s no way to predict ahead of time what order the tasks will run it. That’s why I needed to copy each path string above; each task needs it own copy of the string with its own independent lifetime because it might be run at any time.

The only guarantee I have is that the join_all call at the bottom will block until all of the tasks have finished; that is, until all of the futures I pushed onto the tasks array have completed.

Sending 100,000 Concurrent Requests

I can scale this up to 100,000 requests by reading the URLs in from a file instead:

fn read_lines(path: &str) -> Result<Vec<String>, Box<dyn Error>> {
    let file = File::open(path)?;
    let reader = BufReader::new(file);
    Ok(
        reader.lines().filter_map(Result::ok).collect()
    )
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {

	let paths: Vec<String> = read_lines("urls.txt")?;

etc...

When I tried this out for the first time I was excited: How long would it take to download 100,000 HTML pages simultaneously like this? Would it be 100,000x faster than downloading one file? I typed cargo run --release to build my code in release mode and get the best possible performance out of Rust. Asynchronous code, zero cost abstractions, no garbage collector, this was going to be great!

Of course, it didn’t work.

What happened? The problem is the web server can't handle so many concurrent network connections. Using my thread/task diagram, launching all 100,000 tasks might look like this:

I spawn 100,000 tasks all on to the same thread, and Tokio starts executing them all. Each time my code above calls get(&path).await, Tokio pauses that task and starts another, which calls get(&path).await again, opening yet another HTTP request. My laptop quickly runs out of network resources and these tasks start to fail.

Sending a Buffered, Concurrent Stream of 100,000 Requests

Instead, I need to limit the number of concurrent Tokio tasks - the number of concurrent HTTP requests. I need the diagram to look something like this:

After the first 8 tasks are started, the first 8 blue boxes on the left, Tokio waits for at least one of them to complete before starting a 9th task. I indicate this with the “max concurrency” arrow.

Once one of the first 8 calls to reqwest::get completes, Tokio is free to run a 9th task. The first "pop from buffer" arrow. And once that 9th task or any other task completes, Tokio starts a 10th task, etc., in this manner processing all 100,000 tasks 8 at a time.

To achieve this, I can use StreamExt trait’s buffer_unordered function:

let fetches = futures::stream::iter(
    paths.into_iter().map(|path| {
        async move {
            match reqwest::get(&path).await {
                Ok(resp) => {
                    match resp.text().await {
                        Ok(text) => {
                            println!("RESPONSE: {} bytes from {}", text.len(), path);
                        }
                        Err(_) => println!("ERROR reading {}", path),
                    }
                }
                Err(_) => println!("ERROR downloading {}", path),
            }
        }
})
).buffer_unordered(8).collect::<Vec<()>>();
println!("Waiting...");
fetches.await;

First I create an iterator which maps all of the paths to my closures, and passes them to futures::stream::iter. This will create a list of futures, each one executing my closure.

At the bottom I call buffer_unordered and pass in 8. The code in buffer_unordered will execute up to 8 futures from the stream concurrently, and then start to buffer the remaining futures. As each task completes, each HTTP request in my example, buffer_unordered will pull another task out of its buffer and execute it.

This code will slowly but steadily iterate over the 100,000 URLs, downloading them in parallel. Experimenting with this, it doesn’t seem to matter very much exactly what level of concurrency I pick. I found the best performance when I picked a concurrency of 50. Using 50 concurrent Tokio tasks, it took about 30 minutes to download all one hundred thousand HTML files.

However, none of that matters. I’m not measuring the performance of Rust, Tokio or Reqwest. These numbers have more to do with the web server and network connection I’m using. The real performance here was my own developer performance: With just a few lines of code I was able to write an asynchronous I/O program that can scale as much as I would like. The async and await keywords make this code easy to write and easy to read.