turbo_tasks/
spawn.rs

1use std::{
2    panic::resume_unwind,
3    pin::Pin,
4    task::{Context, Poll},
5    thread,
6    time::{Duration, Instant},
7};
8
9use anyhow::Result;
10use futures::{FutureExt, ready};
11use tokio::runtime::Handle;
12use tracing::{Instrument, Span, info_span};
13use turbo_tasks_malloc::{AllocationInfo, TurboMalloc};
14
15use crate::{
16    TurboTasksPanic,
17    capture_future::{self, CaptureFuture},
18    manager::turbo_tasks_future_scope,
19    turbo_tasks, turbo_tasks_scope,
20};
21
22pub struct JoinHandle<T> {
23    join_handle: tokio::task::JoinHandle<(Result<T, TurboTasksPanic>, Duration, AllocationInfo)>,
24}
25
26impl<T: Send + 'static> JoinHandle<T> {
27    pub fn join(self) -> T {
28        block_for_future(self)
29    }
30}
31
32impl<T> Future for JoinHandle<T> {
33    type Output = T;
34
35    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
36        let this = self.get_mut();
37        match ready!(this.join_handle.poll_unpin(cx)) {
38            Ok((res, duration, alloc_info)) => {
39                capture_future::add_duration(duration);
40                capture_future::add_allocation_info(alloc_info);
41                match res {
42                    Ok(res) => Poll::Ready(res),
43                    Err(e) => resume_unwind(e.into_panic()),
44                }
45            }
46            Err(e) => resume_unwind(e.into_panic()),
47        }
48    }
49}
50
51/// Spawns a future as separate task and returns a JoinHandle which can be used to await the result.
52/// The future has access to the current TurboTasks context and runs in the same tracing span.
53/// Allocations and cpu time is accounted to the current turbo-tasks function.
54pub fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> JoinHandle<T> {
55    let turbo_tasks = turbo_tasks();
56    let span = Span::current();
57    let join_handle = tokio::task::spawn(
58        turbo_tasks_future_scope(turbo_tasks, CaptureFuture::new(future)).instrument(span),
59    );
60    JoinHandle { join_handle }
61}
62
63/// Spawns a blocking function in a separate task using the blocking pool and returns a JoinHandle
64/// which can be used to await the result. The function has access to the current TurboTasks context
65/// and runs in the same tracing span.
66/// Allocations and cpu time is accounted to the current turbo-tasks function.
67pub fn spawn_blocking<T: Send + 'static>(
68    func: impl FnOnce() -> T + Send + 'static,
69) -> JoinHandle<T> {
70    let turbo_tasks = turbo_tasks();
71    let span = Span::current();
72    let join_handle = tokio::task::spawn_blocking(|| {
73        let _guard = span.entered();
74        let start = Instant::now();
75        let start_allocations = TurboMalloc::allocation_counters();
76        let r = turbo_tasks_scope(turbo_tasks, func);
77        (Ok(r), start.elapsed(), start_allocations.until_now())
78    });
79    JoinHandle { join_handle }
80}
81
82/// Spawns a thread which runs in background. It has access to the current TurboTasks context, but
83/// is not accounted towards the current turbo-tasks function.
84pub fn spawn_thread(func: impl FnOnce() + Send + 'static) {
85    let handle = Handle::current();
86    let span = info_span!("thread").or_current();
87    let turbo_tasks = turbo_tasks();
88    thread::spawn(move || {
89        let _span = span.entered();
90        turbo_tasks_scope(turbo_tasks, || {
91            let _guard = handle.enter();
92            func();
93        })
94    });
95}
96
97/// Tells the scheduler about blocking work happening in the current thread.
98/// It will make sure to allocate extra threads for the pool.
99pub fn block_in_place<R>(f: impl FnOnce() -> R + Send) -> R
100where
101    R: Send,
102{
103    tokio::task::block_in_place(f)
104}
105
106/// Blocking waits for a future to complete. This blocks the current thread potentially staling
107/// other concurrent futures (but not other concurrent tasks). Try to avoid this method infavor of
108/// awaiting the future instead.
109pub fn block_for_future<T: Send>(future: impl Future<Output = T> + Send + 'static) -> T {
110    let handle = Handle::current();
111    block_in_place(|| {
112        let _span = info_span!("blocking").entered();
113        handle.block_on(future)
114    })
115}