1use std::{
2 panic::resume_unwind,
3 pin::Pin,
4 task::{Context, Poll},
5 thread,
6};
7
8use anyhow::Result;
9use futures::{FutureExt, ready};
10use tokio::runtime::Handle;
11use tracing::{Instrument, Span, info_span};
12
13use crate::{
14 TurboTasksPanic, capture_future::CaptureFuture, manager::turbo_tasks_future_scope, turbo_tasks,
15 turbo_tasks_scope,
16};
17
18pub struct JoinHandle<T> {
19 join_handle: tokio::task::JoinHandle<Result<T, TurboTasksPanic>>,
20}
21
22impl<T: Send + 'static> JoinHandle<T> {
23 pub fn join(self) -> T {
24 block_for_future(self)
25 }
26}
27
28impl<T> Future for JoinHandle<T> {
29 type Output = T;
30
31 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
32 let this = self.get_mut();
33 match ready!(this.join_handle.poll_unpin(cx)) {
34 Ok(res) => match res {
35 Ok(res) => Poll::Ready(res),
36 Err(e) => resume_unwind(e.into_panic()),
37 },
38 Err(e) => resume_unwind(e.into_panic()),
39 }
40 }
41}
42
43pub fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> JoinHandle<T> {
47 let turbo_tasks = turbo_tasks();
48 let span = Span::current();
49 let join_handle = tokio::task::spawn(
50 turbo_tasks_future_scope(turbo_tasks, CaptureFuture::new(future)).instrument(span),
51 );
52 JoinHandle { join_handle }
53}
54
55pub fn spawn_blocking<T: Send + 'static>(
60 func: impl FnOnce() -> T + Send + 'static,
61) -> JoinHandle<T> {
62 let turbo_tasks = turbo_tasks();
63 let span = Span::current();
64 let join_handle = tokio::task::spawn_blocking(|| {
65 let _guard = span.entered();
66 Ok(turbo_tasks_scope(turbo_tasks, func))
67 });
68 JoinHandle { join_handle }
69}
70
71pub fn spawn_thread(func: impl FnOnce() + Send + 'static) {
74 let handle = Handle::current();
75 let span = info_span!("thread").or_current();
76 let turbo_tasks = turbo_tasks();
77 thread::spawn(move || {
78 let _span = span.entered();
79 turbo_tasks_scope(turbo_tasks, || {
80 let _guard = handle.enter();
81 func();
82 })
83 });
84}
85
86pub fn block_in_place<R>(f: impl FnOnce() -> R + Send) -> R
89where
90 R: Send,
91{
92 tokio::task::block_in_place(f)
93}
94
95pub fn block_for_future<T: Send>(future: impl Future<Output = T> + Send + 'static) -> T {
99 let handle = Handle::current();
100 block_in_place(|| {
101 let _span = info_span!("blocking").entered();
102 handle.block_on(future)
103 })
104}