“This is the 22nd day of my participation in the Gwen Challenge in November. Check out the details: The Last Gwen Challenge in 2021.”


To parallelize AVG_values_jSONL, we cannot add par_iter() to the combination as we did for sum_of_squares(). Par_iter () is only defined for a specific container type, because it depends on knowing the number of items in the container and being able to divide them into subparts, which is not appropriate for general-purpose iterators.

Rayon, however, specifically provides a different mechanism for the use of streaming data, a par_bridge() defined on an iterator with the corresponding trait.

The method adjusts an ordinary iterator to a parallel iterator, using the clever design of taking individual items from the iterator and balancing them in Rayon’s thread. Most importantly, it implements ParallelIterator, which means you can use the adapter exactly as you would use the values returned by par_iter() on a collection.

Let’s try inserting par_bridge() in the iteration chain:

use rayon::prelude::*;

use std::io::{BufRead, BufReader, Read};
use serde_json::{Map, Value};

fn avg_values_jsonl(input: impl Read) -> f64 {
    let input = BufReader::new(input);
    let mut cnt = 0usize;
    let total: f64 = input
        .lines()
        .map(Result::unwrap)
        .par_bridge()  // this is new
        .filter_map(|line| serde_json::from_str(&line).ok())
        .filter_map(|obj: Map<String, Value>| obj.get("value").cloned())
        .filter_map(|value| {
            cnt += 1;
            value.as_f64()
        })
        .sum();
    total / cnt as f64
}
Copy the code

As expected, our first attempt failed to compile.

Compiling playground v0.1.0 (/home/hniksic/work/playground)
error[E0599]: no method named `par_bridge` found for struct `std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>` in the current scope
   --> src/main.rs:12:10
    |
12  |           .par_bridge()
    |            ^^^^^^^^^^ method not found in `std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>`
    |
    = note: the method `par_bridge` exists but the following trait bounds were not satisfied:
            `std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: std::marker::Send`
            which is required by `std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: rayon::iter::par_bridge::ParallelBridge`
            `&std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: std::marker::Send`
            which is required by `&std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: rayon::iter::par_bridge::ParallelBridge`
            `&std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: std::iter::Iterator`
            which is required by `&std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: rayon::iter::par_bridge::ParallelBridge`
            `&mut std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: std::marker::Send`
            which is required by `&mut std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: rayon::iter::par_bridge::ParallelBridge`
Copy the code

Fascinating, isn’t it? (Still not as bad as Tokio, check out their vomit, HHHH).

The error message is a bit trickier to see because the iterator we are trying to call par_bridge() has a complex generic type from the map() wrapper lines() wrapper BufReader input.

The actual problem is explained in the “comment”, which says: “The method par_bridge exists, but the following attribute boundaries are not met. STD: : marker: : Send “.

The iterator returned by input.lines() does not implement Send because it contains the value moved from the input, and the input type only knows that it implements the Read trait. Without Send, Rayon has no permission to Send iterators to another thread, as it might need to.

If this function is allowed to compile in writing, calling it with an input that is not Send, perhaps because it contains Rc<_> or some other non-SEND type, will crash the program. Fortunately, Rustc prevented this from happening and rejected the code due to the lack of bindings, even though the error messages could have been smoother.

Once we understand the problem, the solution is simple: add the Send attribute binding to Read and declare input as INPUT: impl Read + Send. With this change, we get a different compilation error.

error[E0594]: cannot assign to `cnt`, as it is a captured variable in a `Fn` closure
  --> src/main.rs:17:13
   |
17 |             cnt += 1;
   |             ^^^^^^^^ cannot assign
Copy the code

The problem here is that the closure changes the shared state, the CNT counter.

This requires the closure to capture the CNT with a unique (mutable) reference, which makes it an FnMut closure. This is perfectly fine in single-threaded code, but Rayon plans to call the closure from multiple threads, so it requires an Fn closure. The compiler rejects assignments in the Fn closure, allowing us to avoid potential problems with data races. It is RUST

Neither of these problems is specific to Rayon, and we would have encountered exactly the same error if we had tried to pass the closure to multiple threads in some other way. We can solve the assignment problem by switching CNT to AtomicUsize, which can be safely modified by sharing references.

use rayon::prelude::*;

use std::sync::atomic::{AtomicUsize, Ordering};
use std::io::{BufRead, BufReader, Read};
use serde_json::{Map, Value};

fn avg_values_jsonl(input: impl Read + Send) - >f64 {
    let input = BufReader::new(input);

    let cnt = AtomicUsize::new(0);
    let total: f64 = input
        .lines()
        .map(Result::unwrap)
        .par_bridge()
        .filter_map(|line| serde_json::from_str(&line).ok())
        .filter_map(|obj: Map<String, Value>| obj.get("value").cloned())
        .filter_map(|value| {
            cnt.fetch_add(1, Ordering::Relaxed);
            value.as_f64()
        })
        .sum();
    total / cnt.into_inner() as f64
}
Copy the code