1pub mod retry;
4mod run;
5
6use std::{
7 future::Future,
8 mem::replace,
9 panic::AssertUnwindSafe,
10 sync::{Arc, Mutex, Weak},
11};
12
13use anyhow::{Result, anyhow};
14use futures::FutureExt;
15use rustc_hash::FxHashMap;
16use tokio::sync::mpsc::Receiver;
17use turbo_tasks::{
18 CellId, ExecutionId, InvalidationReason, LocalTaskId, MagicAny, RawVc, ReadCellOptions,
19 ReadConsistency, TaskId, TaskPersistence, TraitTypeId, TurboTasksApi, TurboTasksCallApi,
20 backend::{CellContent, TaskCollectiblesMap, TypedCellContent},
21 event::{Event, EventListener},
22 message_queue::CompilationEvent,
23 test_helpers::with_turbo_tasks_for_testing,
24 util::{SharedError, StaticOrArc},
25};
26
27pub use crate::run::{Registration, run, run_with_tt, run_without_cache_check};
28
29enum Task {
30 Spawned(Event),
31 Finished(Result<RawVc, SharedError>),
32}
33
34#[derive(Default)]
35pub struct VcStorage {
36 this: Weak<Self>,
37 cells: Mutex<FxHashMap<(TaskId, CellId), CellContent>>,
38 tasks: Mutex<Vec<Task>>,
39}
40
41impl VcStorage {
42 fn dynamic_call(
43 &self,
44 func: &'static turbo_tasks::macro_helpers::NativeFunction,
45 this_arg: Option<RawVc>,
46 arg: Box<dyn MagicAny>,
47 ) -> RawVc {
48 let this = self.this.upgrade().unwrap();
49 let handle = tokio::runtime::Handle::current();
50 let future = func.execute(this_arg, &*arg);
51 let i = {
52 let mut tasks = self.tasks.lock().unwrap();
53 let i = tasks.len();
54 tasks.push(Task::Spawned(Event::new(move || {
55 format!("Task({i})::event")
56 })));
57 i
58 };
59 let task_id = TaskId::try_from(u32::try_from(i + 1).unwrap()).unwrap();
60 let execution_id = ExecutionId::try_from(u16::try_from(i + 1).unwrap()).unwrap();
61 handle.spawn(with_turbo_tasks_for_testing(
62 this.clone(),
63 task_id,
64 execution_id,
65 async move {
66 let result = AssertUnwindSafe(future).catch_unwind().await;
67
68 let result = result
70 .map_err(|any| match any.downcast::<String>() {
71 Ok(owned) => anyhow!(owned),
72 Err(any) => match any.downcast::<&'static str>() {
73 Ok(str) => anyhow!(str),
74 Err(_) => anyhow!("unknown panic"),
75 },
76 })
77 .and_then(|r| r)
78 .map_err(SharedError::new);
79
80 let mut tasks = this.tasks.lock().unwrap();
81 if let Task::Spawned(event) = replace(&mut tasks[i], Task::Finished(result)) {
82 event.notify(usize::MAX);
83 }
84 },
85 ));
86 RawVc::TaskOutput(task_id)
87 }
88}
89
90impl TurboTasksCallApi for VcStorage {
91 fn dynamic_call(
92 &self,
93 func: &'static turbo_tasks::macro_helpers::NativeFunction,
94 this: Option<RawVc>,
95 arg: Box<dyn MagicAny>,
96 _persistence: TaskPersistence,
97 ) -> RawVc {
98 self.dynamic_call(func, this, arg)
99 }
100 fn native_call(
101 &self,
102 _func: &'static turbo_tasks::macro_helpers::NativeFunction,
103 _this: Option<RawVc>,
104 _arg: Box<dyn MagicAny>,
105 _persistence: TaskPersistence,
106 ) -> RawVc {
107 unreachable!()
108 }
109
110 fn trait_call(
111 &self,
112 _trait_type: &'static turbo_tasks::TraitMethod,
113 _this: RawVc,
114 _arg: Box<dyn MagicAny>,
115 _persistence: TaskPersistence,
116 ) -> RawVc {
117 unreachable!()
118 }
119
120 fn run_once(
121 &self,
122 _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
123 ) -> TaskId {
124 unreachable!()
125 }
126
127 fn run_once_with_reason(
128 &self,
129 _reason: StaticOrArc<dyn InvalidationReason>,
130 _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
131 ) -> TaskId {
132 unreachable!()
133 }
134
135 fn run_once_process(
136 &self,
137 _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
138 ) -> TaskId {
139 unreachable!()
140 }
141}
142
143impl TurboTasksApi for VcStorage {
144 fn invalidate(&self, _task: TaskId) {
145 unreachable!()
146 }
147
148 fn invalidate_with_reason(
149 &self,
150 _task: TaskId,
151 _reason: turbo_tasks::util::StaticOrArc<dyn turbo_tasks::InvalidationReason>,
152 ) {
153 unreachable!()
154 }
155
156 fn invalidate_serialization(&self, _task: TaskId) {
157 }
159
160 fn notify_scheduled_tasks(&self) {
161 }
163
164 fn try_read_task_output(
165 &self,
166 id: TaskId,
167 _consistency: ReadConsistency,
168 ) -> Result<Result<RawVc, EventListener>> {
169 let tasks = self.tasks.lock().unwrap();
170 let i = *id - 1;
171 let task = tasks.get(i as usize).unwrap();
172 match task {
173 Task::Spawned(event) => Ok(Err(event.listen())),
174 Task::Finished(result) => match result {
175 Ok(vc) => Ok(Ok(*vc)),
176 Err(err) => Err(anyhow!(err.clone())),
177 },
178 }
179 }
180
181 fn try_read_task_output_untracked(
182 &self,
183 task: TaskId,
184 consistency: ReadConsistency,
185 ) -> Result<Result<RawVc, EventListener>> {
186 self.try_read_task_output(task, consistency)
187 }
188
189 fn try_read_task_cell(
190 &self,
191 task: TaskId,
192 index: CellId,
193 _options: ReadCellOptions,
194 ) -> Result<Result<TypedCellContent, EventListener>> {
195 let map = self.cells.lock().unwrap();
196 Ok(Ok(if let Some(cell) = map.get(&(task, index)) {
197 cell.clone()
198 } else {
199 Default::default()
200 }
201 .into_typed(index.type_id)))
202 }
203
204 fn try_read_task_cell_untracked(
205 &self,
206 task: TaskId,
207 index: CellId,
208 _options: ReadCellOptions,
209 ) -> Result<Result<TypedCellContent, EventListener>> {
210 let map = self.cells.lock().unwrap();
211 Ok(Ok(if let Some(cell) = map.get(&(task, index)) {
212 cell.to_owned()
213 } else {
214 Default::default()
215 }
216 .into_typed(index.type_id)))
217 }
218
219 fn try_read_own_task_cell_untracked(
220 &self,
221 current_task: TaskId,
222 index: CellId,
223 options: ReadCellOptions,
224 ) -> Result<TypedCellContent> {
225 self.read_own_task_cell(current_task, index, options)
226 }
227
228 fn try_read_local_output(
229 &self,
230 _execution_id: ExecutionId,
231 _local_task_id: LocalTaskId,
232 ) -> Result<Result<RawVc, EventListener>> {
233 unimplemented!()
234 }
235
236 fn emit_collectible(&self, _trait_type: turbo_tasks::TraitTypeId, _collectible: RawVc) {
237 unimplemented!()
238 }
239
240 fn unemit_collectible(
241 &self,
242 _trait_type: turbo_tasks::TraitTypeId,
243 _collectible: RawVc,
244 _count: u32,
245 ) {
246 unimplemented!()
247 }
248
249 fn unemit_collectibles(
250 &self,
251 _trait_type: turbo_tasks::TraitTypeId,
252 _collectibles: &TaskCollectiblesMap,
253 ) {
254 unimplemented!()
255 }
256
257 fn read_task_collectibles(&self, _task: TaskId, _trait_id: TraitTypeId) -> TaskCollectiblesMap {
258 unimplemented!()
259 }
260
261 fn read_own_task_cell(
262 &self,
263 task: TaskId,
264 index: CellId,
265 _options: ReadCellOptions,
266 ) -> Result<TypedCellContent> {
267 let map = self.cells.lock().unwrap();
268 Ok(if let Some(cell) = map.get(&(task, index)) {
269 cell.to_owned()
270 } else {
271 Default::default()
272 }
273 .into_typed(index.type_id))
274 }
275
276 fn update_own_task_cell(&self, task: TaskId, index: CellId, content: CellContent) {
277 let mut map = self.cells.lock().unwrap();
278 let cell = map.entry((task, index)).or_default();
279 *cell = content;
280 }
281
282 fn connect_task(&self, _task: TaskId) {
283 }
285
286 fn mark_own_task_as_finished(&self, _task: TaskId) {
287 }
289
290 fn mark_own_task_as_session_dependent(&self, _task: TaskId) {
291 }
293
294 fn set_own_task_aggregation_number(&self, _task: TaskId, _aggregation_number: u32) {
295 }
297
298 fn detached_for_testing(
299 &self,
300 _f: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
301 ) -> std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
302 unimplemented!()
303 }
304
305 fn task_statistics(&self) -> &turbo_tasks::task_statistics::TaskStatisticsApi {
306 unimplemented!()
307 }
308
309 fn stop_and_wait(&self) -> std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
310 Box::pin(async {})
311 }
312
313 fn subscribe_to_compilation_events(
316 &self,
317 _event_types: Option<Vec<String>>,
318 ) -> Receiver<Arc<dyn CompilationEvent>> {
319 unimplemented!()
320 }
321
322 fn send_compilation_event(&self, _event: Arc<dyn CompilationEvent>) {
325 unimplemented!()
326 }
327}
328
329impl VcStorage {
330 pub fn with<T>(f: impl Future<Output = T>) -> impl Future<Output = T> {
331 with_turbo_tasks_for_testing(
332 Arc::new_cyclic(|weak| VcStorage {
333 this: weak.clone(),
334 ..Default::default()
335 }),
336 TaskId::MAX,
337 ExecutionId::MIN,
338 f,
339 )
340 }
341}