588 lines
18 KiB
Rust
588 lines
18 KiB
Rust
//! Threads that can borrow variables from the stack.
|
|
//!
|
|
//! Create a scope when spawned threads need to access variables on the stack:
|
|
//!
|
|
//! ```
|
|
//! use crossbeam_utils::thread;
|
|
//!
|
|
//! let people = vec![
|
|
//! "Alice".to_string(),
|
|
//! "Bob".to_string(),
|
|
//! "Carol".to_string(),
|
|
//! ];
|
|
//!
|
|
//! thread::scope(|s| {
|
|
//! for person in &people {
|
|
//! s.spawn(move |_| {
|
|
//! println!("Hello, {}!", person);
|
|
//! });
|
|
//! }
|
|
//! }).unwrap();
|
|
//! ```
|
|
//!
|
|
//! # Why scoped threads?
|
|
//!
|
|
//! Suppose we wanted to re-write the previous example using plain threads:
|
|
//!
|
|
//! ```compile_fail,E0597
|
|
//! use std::thread;
|
|
//!
|
|
//! let people = vec![
|
|
//! "Alice".to_string(),
|
|
//! "Bob".to_string(),
|
|
//! "Carol".to_string(),
|
|
//! ];
|
|
//!
|
|
//! let mut threads = Vec::new();
|
|
//!
|
|
//! for person in &people {
|
|
//! threads.push(thread::spawn(move || {
|
|
//! println!("Hello, {}!", person);
|
|
//! }));
|
|
//! }
|
|
//!
|
|
//! for thread in threads {
|
|
//! thread.join().unwrap();
|
|
//! }
|
|
//! ```
|
|
//!
|
|
//! This doesn't work because the borrow checker complains about `people` not living long enough:
|
|
//!
|
|
//! ```text
|
|
//! error[E0597]: `people` does not live long enough
|
|
//! --> src/main.rs:12:20
|
|
//! |
|
|
//! 12 | for person in &people {
|
|
//! | ^^^^^^ borrowed value does not live long enough
|
|
//! ...
|
|
//! 21 | }
|
|
//! | - borrowed value only lives until here
|
|
//! |
|
|
//! = note: borrowed value must be valid for the static lifetime...
|
|
//! ```
|
|
//!
|
|
//! The problem here is that spawned threads are not allowed to borrow variables on stack because
|
|
//! the compiler cannot prove they will be joined before `people` is destroyed.
|
|
//!
|
|
//! Scoped threads are a mechanism to guarantee to the compiler that spawned threads will be joined
|
|
//! before the scope ends.
|
|
//!
|
|
//! # How scoped threads work
|
|
//!
|
|
//! If a variable is borrowed by a thread, the thread must complete before the variable is
|
|
//! destroyed. Threads spawned using [`std::thread::spawn`] can only borrow variables with the
|
|
//! `'static` lifetime because the borrow checker cannot be sure when the thread will complete.
|
|
//!
|
|
//! A scope creates a clear boundary between variables outside the scope and threads inside the
|
|
//! scope. Whenever a scope spawns a thread, it promises to join the thread before the scope ends.
|
|
//! This way we guarantee to the borrow checker that scoped threads only live within the scope and
|
|
//! can safely access variables outside it.
|
|
//!
|
|
//! # Nesting scoped threads
|
|
//!
|
|
//! Sometimes scoped threads need to spawn more threads within the same scope. This is a little
|
|
//! tricky because argument `s` lives *inside* the invocation of `thread::scope()` and as such
|
|
//! cannot be borrowed by scoped threads:
|
|
//!
|
|
//! ```compile_fail,E0373,E0521
|
|
//! use crossbeam_utils::thread;
|
|
//!
|
|
//! thread::scope(|s| {
|
|
//! s.spawn(|_| {
|
|
//! // Not going to compile because we're trying to borrow `s`,
|
|
//! // which lives *inside* the scope! :(
|
|
//! s.spawn(|_| println!("nested thread"));
|
|
//! });
|
|
//! });
|
|
//! ```
|
|
//!
|
|
//! Fortunately, there is a solution. Every scoped thread is passed a reference to its scope as an
|
|
//! argument, which can be used for spawning nested threads:
|
|
//!
|
|
//! ```
|
|
//! use crossbeam_utils::thread;
|
|
//!
|
|
//! thread::scope(|s| {
|
|
//! // Note the `|s|` here.
|
|
//! s.spawn(|s| {
|
|
//! // Yay, this works because we're using a fresh argument `s`! :)
|
|
//! s.spawn(|_| println!("nested thread"));
|
|
//! });
|
|
//! }).unwrap();
|
|
//! ```
|
|
|
|
use std::fmt;
|
|
use std::io;
|
|
use std::marker::PhantomData;
|
|
use std::mem;
|
|
use std::panic;
|
|
use std::sync::{Arc, Mutex};
|
|
use std::thread;
|
|
|
|
use crate::sync::WaitGroup;
|
|
use cfg_if::cfg_if;
|
|
|
|
type SharedVec<T> = Arc<Mutex<Vec<T>>>;
|
|
type SharedOption<T> = Arc<Mutex<Option<T>>>;
|
|
|
|
/// Creates a new scope for spawning threads.
|
|
///
|
|
/// All child threads that haven't been manually joined will be automatically joined just before
|
|
/// this function invocation ends. If all joined threads have successfully completed, `Ok` is
|
|
/// returned with the return value of `f`. If any of the joined threads has panicked, an `Err` is
|
|
/// returned containing errors from panicked threads. Note that if panics are implemented by
|
|
/// aborting the process, no error is returned; see the notes of [std::panic::catch_unwind].
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use crossbeam_utils::thread;
|
|
///
|
|
/// let var = vec![1, 2, 3];
|
|
///
|
|
/// thread::scope(|s| {
|
|
/// s.spawn(|_| {
|
|
/// println!("A child thread borrowing `var`: {:?}", var);
|
|
/// });
|
|
/// }).unwrap();
|
|
/// ```
|
|
pub fn scope<'env, F, R>(f: F) -> thread::Result<R>
|
|
where
|
|
F: FnOnce(&Scope<'env>) -> R,
|
|
{
|
|
let wg = WaitGroup::new();
|
|
let scope = Scope::<'env> {
|
|
handles: SharedVec::default(),
|
|
wait_group: wg.clone(),
|
|
_marker: PhantomData,
|
|
};
|
|
|
|
// Execute the scoped function, but catch any panics.
|
|
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&scope)));
|
|
|
|
// Wait until all nested scopes are dropped.
|
|
drop(scope.wait_group);
|
|
wg.wait();
|
|
|
|
// Join all remaining spawned threads.
|
|
let panics: Vec<_> = scope
|
|
.handles
|
|
.lock()
|
|
.unwrap()
|
|
// Filter handles that haven't been joined, join them, and collect errors.
|
|
.drain(..)
|
|
.filter_map(|handle| handle.lock().unwrap().take())
|
|
.filter_map(|handle| handle.join().err())
|
|
.collect();
|
|
|
|
// If `f` has panicked, resume unwinding.
|
|
// If any of the child threads have panicked, return the panic errors.
|
|
// Otherwise, everything is OK and return the result of `f`.
|
|
match result {
|
|
Err(err) => panic::resume_unwind(err),
|
|
Ok(res) => {
|
|
if panics.is_empty() {
|
|
Ok(res)
|
|
} else {
|
|
Err(Box::new(panics))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// A scope for spawning threads.
|
|
pub struct Scope<'env> {
|
|
/// The list of the thread join handles.
|
|
handles: SharedVec<SharedOption<thread::JoinHandle<()>>>,
|
|
|
|
/// Used to wait until all subscopes all dropped.
|
|
wait_group: WaitGroup,
|
|
|
|
/// Borrows data with invariant lifetime `'env`.
|
|
_marker: PhantomData<&'env mut &'env ()>,
|
|
}
|
|
|
|
unsafe impl Sync for Scope<'_> {}
|
|
|
|
impl<'env> Scope<'env> {
|
|
/// Spawns a scoped thread.
|
|
///
|
|
/// This method is similar to the [`spawn`] function in Rust's standard library. The difference
|
|
/// is that this thread is scoped, meaning it's guaranteed to terminate before the scope exits,
|
|
/// allowing it to reference variables outside the scope.
|
|
///
|
|
/// The scoped thread is passed a reference to this scope as an argument, which can be used for
|
|
/// spawning nested threads.
|
|
///
|
|
/// The returned [handle](ScopedJoinHandle) can be used to manually
|
|
/// [join](ScopedJoinHandle::join) the thread before the scope exits.
|
|
///
|
|
/// This will create a thread using default parameters of [`ScopedThreadBuilder`], if you want to specify the
|
|
/// stack size or the name of the thread, use this API instead.
|
|
///
|
|
/// [`spawn`]: std::thread::spawn
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// Panics if the OS fails to create a thread; use [`ScopedThreadBuilder::spawn`]
|
|
/// to recover from such errors.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use crossbeam_utils::thread;
|
|
///
|
|
/// thread::scope(|s| {
|
|
/// let handle = s.spawn(|_| {
|
|
/// println!("A child thread is running");
|
|
/// 42
|
|
/// });
|
|
///
|
|
/// // Join the thread and retrieve its result.
|
|
/// let res = handle.join().unwrap();
|
|
/// assert_eq!(res, 42);
|
|
/// }).unwrap();
|
|
/// ```
|
|
pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
|
|
where
|
|
F: FnOnce(&Scope<'env>) -> T,
|
|
F: Send + 'env,
|
|
T: Send + 'env,
|
|
{
|
|
self.builder()
|
|
.spawn(f)
|
|
.expect("failed to spawn scoped thread")
|
|
}
|
|
|
|
/// Creates a builder that can configure a thread before spawning.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use crossbeam_utils::thread;
|
|
///
|
|
/// thread::scope(|s| {
|
|
/// s.builder()
|
|
/// .spawn(|_| println!("A child thread is running"))
|
|
/// .unwrap();
|
|
/// }).unwrap();
|
|
/// ```
|
|
pub fn builder<'scope>(&'scope self) -> ScopedThreadBuilder<'scope, 'env> {
|
|
ScopedThreadBuilder {
|
|
scope: self,
|
|
builder: thread::Builder::new(),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl fmt::Debug for Scope<'_> {
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
f.pad("Scope { .. }")
|
|
}
|
|
}
|
|
|
|
/// Configures the properties of a new thread.
|
|
///
|
|
/// The two configurable properties are:
|
|
///
|
|
/// - [`name`]: Specifies an [associated name for the thread][naming-threads].
|
|
/// - [`stack_size`]: Specifies the [desired stack size for the thread][stack-size].
|
|
///
|
|
/// The [`spawn`] method will take ownership of the builder and return an [`io::Result`] of the
|
|
/// thread handle with the given configuration.
|
|
///
|
|
/// The [`Scope::spawn`] method uses a builder with default configuration and unwraps its return
|
|
/// value. You may want to use this builder when you want to recover from a failure to launch a
|
|
/// thread.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use crossbeam_utils::thread;
|
|
///
|
|
/// thread::scope(|s| {
|
|
/// s.builder()
|
|
/// .spawn(|_| println!("Running a child thread"))
|
|
/// .unwrap();
|
|
/// }).unwrap();
|
|
/// ```
|
|
///
|
|
/// [`name`]: ScopedThreadBuilder::name
|
|
/// [`stack_size`]: ScopedThreadBuilder::stack_size
|
|
/// [`spawn`]: ScopedThreadBuilder::spawn
|
|
/// [`io::Result`]: std::io::Result
|
|
/// [naming-threads]: std::thread#naming-threads
|
|
/// [stack-size]: std::thread#stack-size
|
|
#[derive(Debug)]
|
|
pub struct ScopedThreadBuilder<'scope, 'env> {
|
|
scope: &'scope Scope<'env>,
|
|
builder: thread::Builder,
|
|
}
|
|
|
|
impl<'scope, 'env> ScopedThreadBuilder<'scope, 'env> {
|
|
/// Sets the name for the new thread.
|
|
///
|
|
/// The name must not contain null bytes (`\0`).
|
|
///
|
|
/// For more information about named threads, see [here][naming-threads].
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use crossbeam_utils::thread;
|
|
/// use std::thread::current;
|
|
///
|
|
/// thread::scope(|s| {
|
|
/// s.builder()
|
|
/// .name("my thread".to_string())
|
|
/// .spawn(|_| assert_eq!(current().name(), Some("my thread")))
|
|
/// .unwrap();
|
|
/// }).unwrap();
|
|
/// ```
|
|
///
|
|
/// [naming-threads]: std::thread#naming-threads
|
|
pub fn name(mut self, name: String) -> ScopedThreadBuilder<'scope, 'env> {
|
|
self.builder = self.builder.name(name);
|
|
self
|
|
}
|
|
|
|
/// Sets the size of the stack for the new thread.
|
|
///
|
|
/// The stack size is measured in bytes.
|
|
///
|
|
/// For more information about the stack size for threads, see [here][stack-size].
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use crossbeam_utils::thread;
|
|
///
|
|
/// thread::scope(|s| {
|
|
/// s.builder()
|
|
/// .stack_size(32 * 1024)
|
|
/// .spawn(|_| println!("Running a child thread"))
|
|
/// .unwrap();
|
|
/// }).unwrap();
|
|
/// ```
|
|
///
|
|
/// [stack-size]: std::thread#stack-size
|
|
pub fn stack_size(mut self, size: usize) -> ScopedThreadBuilder<'scope, 'env> {
|
|
self.builder = self.builder.stack_size(size);
|
|
self
|
|
}
|
|
|
|
/// Spawns a scoped thread with this configuration.
|
|
///
|
|
/// The scoped thread is passed a reference to this scope as an argument, which can be used for
|
|
/// spawning nested threads.
|
|
///
|
|
/// The returned handle can be used to manually join the thread before the scope exits.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Unlike the [`Scope::spawn`] method, this method yields an
|
|
/// [`io::Result`] to capture any failure to create the thread at
|
|
/// the OS level.
|
|
///
|
|
/// [`io::Result`]: std::io::Result
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// Panics if a thread name was set and it contained null bytes.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use crossbeam_utils::thread;
|
|
///
|
|
/// thread::scope(|s| {
|
|
/// let handle = s.builder()
|
|
/// .spawn(|_| {
|
|
/// println!("A child thread is running");
|
|
/// 42
|
|
/// })
|
|
/// .unwrap();
|
|
///
|
|
/// // Join the thread and retrieve its result.
|
|
/// let res = handle.join().unwrap();
|
|
/// assert_eq!(res, 42);
|
|
/// }).unwrap();
|
|
/// ```
|
|
pub fn spawn<F, T>(self, f: F) -> io::Result<ScopedJoinHandle<'scope, T>>
|
|
where
|
|
F: FnOnce(&Scope<'env>) -> T,
|
|
F: Send + 'env,
|
|
T: Send + 'env,
|
|
{
|
|
// The result of `f` will be stored here.
|
|
let result = SharedOption::default();
|
|
|
|
// Spawn the thread and grab its join handle and thread handle.
|
|
let (handle, thread) = {
|
|
let result = Arc::clone(&result);
|
|
|
|
// A clone of the scope that will be moved into the new thread.
|
|
let scope = Scope::<'env> {
|
|
handles: Arc::clone(&self.scope.handles),
|
|
wait_group: self.scope.wait_group.clone(),
|
|
_marker: PhantomData,
|
|
};
|
|
|
|
// Spawn the thread.
|
|
let handle = {
|
|
let closure = move || {
|
|
// Make sure the scope is inside the closure with the proper `'env` lifetime.
|
|
let scope: Scope<'env> = scope;
|
|
|
|
// Run the closure.
|
|
let res = f(&scope);
|
|
|
|
// Store the result if the closure didn't panic.
|
|
*result.lock().unwrap() = Some(res);
|
|
};
|
|
|
|
// Allocate `closure` on the heap and erase the `'env` bound.
|
|
let closure: Box<dyn FnOnce() + Send + 'env> = Box::new(closure);
|
|
let closure: Box<dyn FnOnce() + Send + 'static> =
|
|
unsafe { mem::transmute(closure) };
|
|
|
|
// Finally, spawn the closure.
|
|
self.builder.spawn(closure)?
|
|
};
|
|
|
|
let thread = handle.thread().clone();
|
|
let handle = Arc::new(Mutex::new(Some(handle)));
|
|
(handle, thread)
|
|
};
|
|
|
|
// Add the handle to the shared list of join handles.
|
|
self.scope.handles.lock().unwrap().push(Arc::clone(&handle));
|
|
|
|
Ok(ScopedJoinHandle {
|
|
handle,
|
|
result,
|
|
thread,
|
|
_marker: PhantomData,
|
|
})
|
|
}
|
|
}
|
|
|
|
unsafe impl<T> Send for ScopedJoinHandle<'_, T> {}
|
|
unsafe impl<T> Sync for ScopedJoinHandle<'_, T> {}
|
|
|
|
/// A handle that can be used to join its scoped thread.
|
|
///
|
|
/// This struct is created by the [`Scope::spawn`] method and the
|
|
/// [`ScopedThreadBuilder::spawn`] method.
|
|
pub struct ScopedJoinHandle<'scope, T> {
|
|
/// A join handle to the spawned thread.
|
|
handle: SharedOption<thread::JoinHandle<()>>,
|
|
|
|
/// Holds the result of the inner closure.
|
|
result: SharedOption<T>,
|
|
|
|
/// A handle to the the spawned thread.
|
|
thread: thread::Thread,
|
|
|
|
/// Borrows the parent scope with lifetime `'scope`.
|
|
_marker: PhantomData<&'scope ()>,
|
|
}
|
|
|
|
impl<T> ScopedJoinHandle<'_, T> {
|
|
/// Waits for the thread to finish and returns its result.
|
|
///
|
|
/// If the child thread panics, an error is returned. Note that if panics are implemented by
|
|
/// aborting the process, no error is returned; see the notes of [std::panic::catch_unwind].
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// This function may panic on some platforms if a thread attempts to join itself or otherwise
|
|
/// may create a deadlock with joining threads.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use crossbeam_utils::thread;
|
|
///
|
|
/// thread::scope(|s| {
|
|
/// let handle1 = s.spawn(|_| println!("I'm a happy thread :)"));
|
|
/// let handle2 = s.spawn(|_| panic!("I'm a sad thread :("));
|
|
///
|
|
/// // Join the first thread and verify that it succeeded.
|
|
/// let res = handle1.join();
|
|
/// assert!(res.is_ok());
|
|
///
|
|
/// // Join the second thread and verify that it panicked.
|
|
/// let res = handle2.join();
|
|
/// assert!(res.is_err());
|
|
/// }).unwrap();
|
|
/// ```
|
|
pub fn join(self) -> thread::Result<T> {
|
|
// Take out the handle. The handle will surely be available because the root scope waits
|
|
// for nested scopes before joining remaining threads.
|
|
let handle = self.handle.lock().unwrap().take().unwrap();
|
|
|
|
// Join the thread and then take the result out of its inner closure.
|
|
handle
|
|
.join()
|
|
.map(|()| self.result.lock().unwrap().take().unwrap())
|
|
}
|
|
|
|
/// Returns a handle to the underlying thread.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// use crossbeam_utils::thread;
|
|
///
|
|
/// thread::scope(|s| {
|
|
/// let handle = s.spawn(|_| println!("A child thread is running"));
|
|
/// println!("The child thread ID: {:?}", handle.thread().id());
|
|
/// }).unwrap();
|
|
/// ```
|
|
pub fn thread(&self) -> &thread::Thread {
|
|
&self.thread
|
|
}
|
|
}
|
|
|
|
cfg_if! {
|
|
if #[cfg(unix)] {
|
|
use std::os::unix::thread::{JoinHandleExt, RawPthread};
|
|
|
|
impl<T> JoinHandleExt for ScopedJoinHandle<'_, T> {
|
|
fn as_pthread_t(&self) -> RawPthread {
|
|
// Borrow the handle. The handle will surely be available because the root scope waits
|
|
// for nested scopes before joining remaining threads.
|
|
let handle = self.handle.lock().unwrap();
|
|
handle.as_ref().unwrap().as_pthread_t()
|
|
}
|
|
fn into_pthread_t(self) -> RawPthread {
|
|
self.as_pthread_t()
|
|
}
|
|
}
|
|
} else if #[cfg(windows)] {
|
|
use std::os::windows::io::{AsRawHandle, IntoRawHandle, RawHandle};
|
|
|
|
impl<T> AsRawHandle for ScopedJoinHandle<'_, T> {
|
|
fn as_raw_handle(&self) -> RawHandle {
|
|
// Borrow the handle. The handle will surely be available because the root scope waits
|
|
// for nested scopes before joining remaining threads.
|
|
let handle = self.handle.lock().unwrap();
|
|
handle.as_ref().unwrap().as_raw_handle()
|
|
}
|
|
}
|
|
|
|
impl<T> IntoRawHandle for ScopedJoinHandle<'_, T> {
|
|
fn into_raw_handle(self) -> RawHandle {
|
|
self.as_raw_handle()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T> fmt::Debug for ScopedJoinHandle<'_, T> {
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
f.pad("ScopedJoinHandle { .. }")
|
|
}
|
|
}
|