1use std::{
2 panic::resume_unwind,
3 pin::Pin,
4 task::{Context, Poll},
5 thread,
6 time::{Duration, Instant},
7};
8
9use anyhow::Result;
10use futures::{FutureExt, ready};
11use tokio::runtime::Handle;
12use tracing::{Instrument, Span, info_span};
13use turbo_tasks_malloc::{AllocationInfo, TurboMalloc};
14
15use crate::{
16 TurboTasksPanic,
17 capture_future::{self, CaptureFuture},
18 manager::turbo_tasks_future_scope,
19 turbo_tasks, turbo_tasks_scope,
20};
21
22pub struct JoinHandle<T> {
23 join_handle: tokio::task::JoinHandle<(Result<T, TurboTasksPanic>, Duration, AllocationInfo)>,
24}
25
26impl<T: Send + 'static> JoinHandle<T> {
27 pub fn join(self) -> T {
28 block_for_future(self)
29 }
30}
31
32impl<T> Future for JoinHandle<T> {
33 type Output = T;
34
35 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
36 let this = self.get_mut();
37 match ready!(this.join_handle.poll_unpin(cx)) {
38 Ok((res, duration, alloc_info)) => {
39 capture_future::add_duration(duration);
40 capture_future::add_allocation_info(alloc_info);
41 match res {
42 Ok(res) => Poll::Ready(res),
43 Err(e) => resume_unwind(e.into_panic()),
44 }
45 }
46 Err(e) => resume_unwind(e.into_panic()),
47 }
48 }
49}
50
51pub fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> JoinHandle<T> {
55 let turbo_tasks = turbo_tasks();
56 let span = Span::current();
57 let join_handle = tokio::task::spawn(
58 turbo_tasks_future_scope(turbo_tasks, CaptureFuture::new(future)).instrument(span),
59 );
60 JoinHandle { join_handle }
61}
62
63pub fn spawn_blocking<T: Send + 'static>(
68 func: impl FnOnce() -> T + Send + 'static,
69) -> JoinHandle<T> {
70 let turbo_tasks = turbo_tasks();
71 let span = Span::current();
72 let join_handle = tokio::task::spawn_blocking(|| {
73 let _guard = span.entered();
74 let start = Instant::now();
75 let start_allocations = TurboMalloc::allocation_counters();
76 let r = turbo_tasks_scope(turbo_tasks, func);
77 (Ok(r), start.elapsed(), start_allocations.until_now())
78 });
79 JoinHandle { join_handle }
80}
81
82pub fn spawn_thread(func: impl FnOnce() + Send + 'static) {
85 let handle = Handle::current();
86 let span = info_span!("thread").or_current();
87 let turbo_tasks = turbo_tasks();
88 thread::spawn(move || {
89 let _span = span.entered();
90 turbo_tasks_scope(turbo_tasks, || {
91 let _guard = handle.enter();
92 func();
93 })
94 });
95}
96
97pub fn block_in_place<R>(f: impl FnOnce() -> R + Send) -> R
100where
101 R: Send,
102{
103 tokio::task::block_in_place(f)
104}
105
106pub fn block_for_future<T: Send>(future: impl Future<Output = T> + Send + 'static) -> T {
110 let handle = Handle::current();
111 block_in_place(|| {
112 let _span = info_span!("blocking").entered();
113 handle.block_on(future)
114 })
115}