turbo_tasks/
spawn.rs

1use std::{
2    panic::resume_unwind,
3    pin::Pin,
4    task::{Context, Poll},
5    thread,
6};
7
8use anyhow::Result;
9use futures::{FutureExt, ready};
10use tokio::runtime::Handle;
11use tracing::{Instrument, Span, info_span};
12
13use crate::{
14    TurboTasksPanic, capture_future::CaptureFuture, manager::turbo_tasks_future_scope, turbo_tasks,
15    turbo_tasks_scope,
16};
17
18pub struct JoinHandle<T> {
19    join_handle: tokio::task::JoinHandle<Result<T, TurboTasksPanic>>,
20}
21
22impl<T: Send + 'static> JoinHandle<T> {
23    pub fn join(self) -> T {
24        block_for_future(self)
25    }
26}
27
28impl<T> Future for JoinHandle<T> {
29    type Output = T;
30
31    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
32        let this = self.get_mut();
33        match ready!(this.join_handle.poll_unpin(cx)) {
34            Ok(res) => match res {
35                Ok(res) => Poll::Ready(res),
36                Err(e) => resume_unwind(e.into_panic()),
37            },
38            Err(e) => resume_unwind(e.into_panic()),
39        }
40    }
41}
42
43/// Spawns a future as separate task and returns a JoinHandle which can be used to await the result.
44/// The future has access to the current TurboTasks context and runs in the same tracing span.
45/// Allocations and cpu time is accounted to the current turbo-tasks function.
46pub fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> JoinHandle<T> {
47    let turbo_tasks = turbo_tasks();
48    let span = Span::current();
49    let join_handle = tokio::task::spawn(
50        turbo_tasks_future_scope(turbo_tasks, CaptureFuture::new(future)).instrument(span),
51    );
52    JoinHandle { join_handle }
53}
54
55/// Spawns a blocking function in a separate task using the blocking pool and returns a JoinHandle
56/// which can be used to await the result. The function has access to the current TurboTasks context
57/// and runs in the same tracing span.
58/// Allocations and cpu time is accounted to the current turbo-tasks function.
59pub fn spawn_blocking<T: Send + 'static>(
60    func: impl FnOnce() -> T + Send + 'static,
61) -> JoinHandle<T> {
62    let turbo_tasks = turbo_tasks();
63    let span = Span::current();
64    let join_handle = tokio::task::spawn_blocking(|| {
65        let _guard = span.entered();
66        Ok(turbo_tasks_scope(turbo_tasks, func))
67    });
68    JoinHandle { join_handle }
69}
70
71/// Spawns a thread which runs in background. It has access to the current TurboTasks context, but
72/// is not accounted towards the current turbo-tasks function.
73pub fn spawn_thread(func: impl FnOnce() + Send + 'static) {
74    let handle = Handle::current();
75    let span = info_span!("thread").or_current();
76    let turbo_tasks = turbo_tasks();
77    thread::spawn(move || {
78        let _span = span.entered();
79        turbo_tasks_scope(turbo_tasks, || {
80            let _guard = handle.enter();
81            func();
82        })
83    });
84}
85
86/// Tells the scheduler about blocking work happening in the current thread.
87/// It will make sure to allocate extra threads for the pool.
88pub fn block_in_place<R>(f: impl FnOnce() -> R + Send) -> R
89where
90    R: Send,
91{
92    tokio::task::block_in_place(f)
93}
94
95/// Blocking waits for a future to complete. This blocks the current thread potentially staling
96/// other concurrent futures (but not other concurrent tasks). Try to avoid this method infavor of
97/// awaiting the future instead.
98pub fn block_for_future<T: Send>(future: impl Future<Output = T> + Send + 'static) -> T {
99    let handle = Handle::current();
100    block_in_place(|| {
101        let _span = info_span!("blocking").entered();
102        handle.block_on(future)
103    })
104}