1pub mod retry;
4mod run;
5
6use std::{
7 future::Future,
8 mem::replace,
9 panic::AssertUnwindSafe,
10 pin::Pin,
11 sync::{Arc, Mutex, Weak},
12};
13
14use anyhow::{Result, anyhow, bail};
15use futures::FutureExt;
16use rustc_hash::FxHashMap;
17use smallvec::SmallVec;
18use tokio::sync::mpsc::Receiver;
19use turbo_tasks::{
20 CellId, DynTaskInputs, ExecutionId, InvalidationReason, LocalTaskId, RawVc, ReadCellOptions,
21 ReadOutputOptions, StackDynTaskInputs, TaskId, TaskPersistence, TraitTypeId, TurboTasksApi,
22 TurboTasksCallApi,
23 backend::{CellContent, TaskCollectiblesMap, TypedCellContent, VerificationMode},
24 event::{Event, EventListener},
25 message_queue::CompilationEvent,
26 test_helpers::with_turbo_tasks_for_testing,
27 util::{SharedError, StaticOrArc},
28};
29
30pub use crate::run::{
31 Registration, run, run_once, run_once_without_cache_check, run_with_tt, run_without_cache_check,
32};
33
34enum Task {
35 Spawned(Event),
36 Finished(Result<RawVc, SharedError>),
37}
38
39#[derive(Default)]
40pub struct VcStorage {
41 this: Weak<Self>,
42 cells: Mutex<FxHashMap<(TaskId, CellId), CellContent>>,
43 tasks: Mutex<Vec<Task>>,
44}
45
46impl VcStorage {
47 fn dynamic_call(
48 &self,
49 func: &'static turbo_tasks::macro_helpers::NativeFunction,
50 this_arg: Option<RawVc>,
51 arg: Box<dyn DynTaskInputs>,
52 ) -> RawVc {
53 let this = self.this.upgrade().unwrap();
54 let handle = tokio::runtime::Handle::current();
55 let future = func.execute(this_arg, &*arg);
56 let i = {
57 let mut tasks = self.tasks.lock().unwrap();
58 let i = tasks.len();
59 tasks.push(Task::Spawned(Event::new(move || {
60 move || format!("Task({i})::event")
61 })));
62 i
63 };
64 let task_id = TaskId::try_from(u32::try_from(i + 1).unwrap()).unwrap();
65 let execution_id = ExecutionId::try_from(u16::try_from(i + 1).unwrap()).unwrap();
66 handle.spawn(with_turbo_tasks_for_testing(
67 this.clone(),
68 task_id,
69 execution_id,
70 async move {
71 let result = AssertUnwindSafe(future).catch_unwind().await;
72
73 let result = result
75 .map_err(|any| match any.downcast::<String>() {
76 Ok(owned) => anyhow!(owned),
77 Err(any) => match any.downcast::<&'static str>() {
78 Ok(str) => anyhow!(str),
79 Err(_) => anyhow!("unknown panic"),
80 },
81 })
82 .and_then(|r| r)
83 .map_err(SharedError::new);
84
85 let mut tasks = this.tasks.lock().unwrap();
86 if let Task::Spawned(event) = replace(&mut tasks[i], Task::Finished(result)) {
87 event.notify(usize::MAX);
88 }
89 },
90 ));
91 RawVc::TaskOutput(task_id)
92 }
93}
94
95impl TurboTasksCallApi for VcStorage {
96 fn dynamic_call(
97 &self,
98 func: &'static turbo_tasks::macro_helpers::NativeFunction,
99 this: Option<RawVc>,
100 arg: &mut dyn StackDynTaskInputs,
101 _persistence: TaskPersistence,
102 ) -> RawVc {
103 self.dynamic_call(func, this, arg.take_box())
104 }
105 fn native_call(
106 &self,
107 func: &'static turbo_tasks::macro_helpers::NativeFunction,
108 this: Option<RawVc>,
109 arg: &mut dyn StackDynTaskInputs,
110 _persistence: TaskPersistence,
111 ) -> RawVc {
112 self.dynamic_call(func, this, arg.take_box())
113 }
114
115 fn trait_call(
116 &self,
117 _trait_type: &'static turbo_tasks::TraitMethod,
118 _this: RawVc,
119 _arg: &mut dyn StackDynTaskInputs,
120 _persistence: TaskPersistence,
121 ) -> RawVc {
122 unreachable!()
123 }
124
125 fn run(
126 &self,
127 _future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
128 ) -> Pin<
129 Box<dyn Future<Output = Result<(), turbo_tasks::backend::TurboTasksExecutionError>> + Send>,
130 > {
131 unreachable!()
132 }
133
134 fn run_once(
135 &self,
136 _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
137 ) -> Pin<
138 Box<dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static>,
139 > {
140 unreachable!()
141 }
142
143 fn run_once_with_reason(
144 &self,
145 _reason: StaticOrArc<dyn InvalidationReason>,
146 _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
147 ) -> Pin<
148 Box<dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static>,
149 > {
150 unreachable!()
151 }
152
153 fn start_once_process(
154 &self,
155 _future: std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
156 ) {
157 unreachable!()
158 }
159
160 fn send_compilation_event(&self, _event: Arc<dyn CompilationEvent>) {
163 unimplemented!()
164 }
165
166 fn get_task_name(&self, task: TaskId) -> String {
167 format!("Task({})", task)
168 }
169}
170
171impl TurboTasksApi for VcStorage {
172 fn invalidate(&self, _task: TaskId) {
173 unreachable!()
174 }
175
176 fn invalidate_with_reason(
177 &self,
178 _task: TaskId,
179 _reason: turbo_tasks::util::StaticOrArc<dyn turbo_tasks::InvalidationReason>,
180 ) {
181 unreachable!()
182 }
183
184 fn invalidate_serialization(&self, _task: TaskId) {
185 }
187
188 fn try_read_task_output(
189 &self,
190 id: TaskId,
191 _options: ReadOutputOptions,
192 ) -> Result<Result<RawVc, EventListener>> {
193 let tasks = self.tasks.lock().unwrap();
194 let i = *id - 1;
195 let task = tasks.get(i as usize).unwrap();
196 match task {
197 Task::Spawned(event) => Ok(Err(event.listen())),
198 Task::Finished(result) => match result {
199 Ok(vc) => Ok(Ok(*vc)),
200 Err(err) => bail!(err.clone()),
201 },
202 }
203 }
204
205 fn try_read_task_cell(
206 &self,
207 task: TaskId,
208 index: CellId,
209 _options: ReadCellOptions,
210 ) -> Result<Result<TypedCellContent, EventListener>> {
211 let map = self.cells.lock().unwrap();
212 Ok(Ok(if let Some(cell) = map.get(&(task, index)) {
213 cell.clone()
214 } else {
215 Default::default()
216 }
217 .into_typed(index.type_id)))
218 }
219 fn try_read_own_task_cell(
220 &self,
221 current_task: TaskId,
222 index: CellId,
223 options: ReadCellOptions,
224 ) -> Result<TypedCellContent> {
225 self.read_own_task_cell(current_task, index, options)
226 }
227
228 fn try_read_local_output(
229 &self,
230 _execution_id: ExecutionId,
231 _local_task_id: LocalTaskId,
232 ) -> Result<Result<RawVc, EventListener>> {
233 unimplemented!()
234 }
235
236 fn emit_collectible(&self, _trait_type: turbo_tasks::TraitTypeId, _collectible: RawVc) {
237 unimplemented!()
238 }
239
240 fn unemit_collectible(
241 &self,
242 _trait_type: turbo_tasks::TraitTypeId,
243 _collectible: RawVc,
244 _count: u32,
245 ) {
246 unimplemented!()
247 }
248
249 fn unemit_collectibles(
250 &self,
251 _trait_type: turbo_tasks::TraitTypeId,
252 _collectibles: &TaskCollectiblesMap,
253 ) {
254 unimplemented!()
255 }
256
257 fn read_task_collectibles(&self, _task: TaskId, _trait_id: TraitTypeId) -> TaskCollectiblesMap {
258 unimplemented!()
259 }
260
261 fn read_own_task_cell(
262 &self,
263 task: TaskId,
264 index: CellId,
265 _options: ReadCellOptions,
266 ) -> Result<TypedCellContent> {
267 let map = self.cells.lock().unwrap();
268 Ok(if let Some(cell) = map.get(&(task, index)) {
269 cell.to_owned()
270 } else {
271 Default::default()
272 }
273 .into_typed(index.type_id))
274 }
275
276 fn update_own_task_cell(
277 &self,
278 task: TaskId,
279 index: CellId,
280 _is_serializable_cell_content: bool,
281 content: CellContent,
282 _updated_key_hashes: Option<SmallVec<[u64; 2]>>,
283 _content_hash: Option<[u8; 16]>,
284 _verification_mode: VerificationMode,
285 ) {
286 let mut map = self.cells.lock().unwrap();
287 let cell = map.entry((task, index)).or_default();
288 *cell = content;
289 }
290
291 fn connect_task(&self, _task: TaskId) {
292 }
294
295 fn mark_own_task_as_finished(&self, _task: TaskId) {
296 }
298
299 fn mark_own_task_as_session_dependent(&self, _task: TaskId) {
300 }
302
303 fn spawn_detached_for_testing(
304 &self,
305 _f: std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
306 ) {
307 unimplemented!()
308 }
309
310 fn task_statistics(&self) -> &turbo_tasks::task_statistics::TaskStatisticsApi {
311 unimplemented!()
312 }
313
314 fn stop_and_wait(&self) -> std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
315 Box::pin(async {})
316 }
317
318 fn subscribe_to_compilation_events(
321 &self,
322 _event_types: Option<Vec<String>>,
323 ) -> Receiver<Arc<dyn CompilationEvent>> {
324 unimplemented!()
325 }
326
327 fn is_tracking_dependencies(&self) -> bool {
328 false
329 }
330}
331
332impl VcStorage {
333 pub fn with<T>(f: impl Future<Output = T>) -> impl Future<Output = T> {
334 with_turbo_tasks_for_testing(
335 Arc::new_cyclic(|weak| VcStorage {
336 this: weak.clone(),
337 ..Default::default()
338 }),
339 TaskId::MAX,
340 ExecutionId::MIN,
341 f,
342 )
343 }
344}