1pub mod retry;
4mod run;
5
6use std::{
7 borrow::Cow,
8 future::Future,
9 mem::replace,
10 panic::AssertUnwindSafe,
11 sync::{Arc, Mutex, Weak},
12};
13
14use anyhow::{Result, anyhow};
15use futures::FutureExt;
16use rustc_hash::FxHashMap;
17use tokio::sync::mpsc::Receiver;
18use turbo_tasks::{
19 CellId, ExecutionId, InvalidationReason, LocalTaskId, MagicAny, RawVc, ReadCellOptions,
20 ReadConsistency, TaskId, TaskPersistence, TraitTypeId, TurboTasksApi, TurboTasksCallApi,
21 backend::{CellContent, TaskCollectiblesMap, TypedCellContent},
22 event::{Event, EventListener},
23 message_queue::CompilationEvent,
24 registry,
25 test_helpers::with_turbo_tasks_for_testing,
26 util::{SharedError, StaticOrArc},
27};
28
29pub use crate::run::{Registration, run, run_with_tt, run_without_cache_check};
30
31enum Task {
32 Spawned(Event),
33 Finished(Result<RawVc, SharedError>),
34}
35
36#[derive(Default)]
37pub struct VcStorage {
38 this: Weak<Self>,
39 cells: Mutex<FxHashMap<(TaskId, CellId), CellContent>>,
40 tasks: Mutex<Vec<Task>>,
41}
42
43impl VcStorage {
44 fn dynamic_call(
45 &self,
46 func: turbo_tasks::FunctionId,
47 this_arg: Option<RawVc>,
48 arg: Box<dyn MagicAny>,
49 ) -> RawVc {
50 let this = self.this.upgrade().unwrap();
51 let handle = tokio::runtime::Handle::current();
52 let future = registry::get_function(func).execute(this_arg, &*arg);
53 let i = {
54 let mut tasks = self.tasks.lock().unwrap();
55 let i = tasks.len();
56 tasks.push(Task::Spawned(Event::new(move || {
57 format!("Task({i})::event")
58 })));
59 i
60 };
61 let task_id = TaskId::try_from(u32::try_from(i + 1).unwrap()).unwrap();
62 let execution_id = ExecutionId::try_from(u16::try_from(i + 1).unwrap()).unwrap();
63 handle.spawn(with_turbo_tasks_for_testing(
64 this.clone(),
65 task_id,
66 execution_id,
67 async move {
68 let result = AssertUnwindSafe(future).catch_unwind().await;
69
70 let result = result
72 .map_err(|any| match any.downcast::<String>() {
73 Ok(owned) => anyhow!(owned),
74 Err(any) => match any.downcast::<&'static str>() {
75 Ok(str) => anyhow!(str),
76 Err(_) => anyhow!("unknown panic"),
77 },
78 })
79 .and_then(|r| r)
80 .map_err(SharedError::new);
81
82 let mut tasks = this.tasks.lock().unwrap();
83 if let Task::Spawned(event) = replace(&mut tasks[i], Task::Finished(result)) {
84 event.notify(usize::MAX);
85 }
86 },
87 ));
88 RawVc::TaskOutput(task_id)
89 }
90}
91
92impl TurboTasksCallApi for VcStorage {
93 fn dynamic_call(
94 &self,
95 func: turbo_tasks::FunctionId,
96 this: Option<RawVc>,
97 arg: Box<dyn MagicAny>,
98 _persistence: TaskPersistence,
99 ) -> RawVc {
100 self.dynamic_call(func, this, arg)
101 }
102 fn native_call(
103 &self,
104 _func: turbo_tasks::FunctionId,
105 _this: Option<RawVc>,
106 _arg: Box<dyn MagicAny>,
107 _persistence: TaskPersistence,
108 ) -> RawVc {
109 unreachable!()
110 }
111
112 fn trait_call(
113 &self,
114 _trait_type: turbo_tasks::TraitTypeId,
115 _trait_fn_name: Cow<'static, str>,
116 _this: RawVc,
117 _arg: Box<dyn MagicAny>,
118 _persistence: TaskPersistence,
119 ) -> RawVc {
120 unreachable!()
121 }
122
123 fn run_once(
124 &self,
125 _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
126 ) -> TaskId {
127 unreachable!()
128 }
129
130 fn run_once_with_reason(
131 &self,
132 _reason: StaticOrArc<dyn InvalidationReason>,
133 _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
134 ) -> TaskId {
135 unreachable!()
136 }
137
138 fn run_once_process(
139 &self,
140 _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
141 ) -> TaskId {
142 unreachable!()
143 }
144}
145
146impl TurboTasksApi for VcStorage {
147 fn pin(&self) -> Arc<dyn TurboTasksApi> {
148 self.this.upgrade().unwrap()
149 }
150
151 fn invalidate(&self, _task: TaskId) {
152 unreachable!()
153 }
154
155 fn invalidate_with_reason(
156 &self,
157 _task: TaskId,
158 _reason: turbo_tasks::util::StaticOrArc<dyn turbo_tasks::InvalidationReason>,
159 ) {
160 unreachable!()
161 }
162
163 fn invalidate_serialization(&self, _task: TaskId) {
164 }
166
167 fn notify_scheduled_tasks(&self) {
168 }
170
171 fn try_read_task_output(
172 &self,
173 id: TaskId,
174 _consistency: ReadConsistency,
175 ) -> Result<Result<RawVc, EventListener>> {
176 let tasks = self.tasks.lock().unwrap();
177 let i = *id - 1;
178 let task = tasks.get(i as usize).unwrap();
179 match task {
180 Task::Spawned(event) => Ok(Err(event.listen())),
181 Task::Finished(result) => match result {
182 Ok(vc) => Ok(Ok(*vc)),
183 Err(err) => Err(anyhow!(err.clone())),
184 },
185 }
186 }
187
188 fn try_read_task_output_untracked(
189 &self,
190 task: TaskId,
191 consistency: ReadConsistency,
192 ) -> Result<Result<RawVc, EventListener>> {
193 self.try_read_task_output(task, consistency)
194 }
195
196 fn try_read_task_cell(
197 &self,
198 task: TaskId,
199 index: CellId,
200 _options: ReadCellOptions,
201 ) -> Result<Result<TypedCellContent, EventListener>> {
202 let map = self.cells.lock().unwrap();
203 Ok(Ok(if let Some(cell) = map.get(&(task, index)) {
204 cell.clone()
205 } else {
206 Default::default()
207 }
208 .into_typed(index.type_id)))
209 }
210
211 fn try_read_task_cell_untracked(
212 &self,
213 task: TaskId,
214 index: CellId,
215 _options: ReadCellOptions,
216 ) -> Result<Result<TypedCellContent, EventListener>> {
217 let map = self.cells.lock().unwrap();
218 Ok(Ok(if let Some(cell) = map.get(&(task, index)) {
219 cell.to_owned()
220 } else {
221 Default::default()
222 }
223 .into_typed(index.type_id)))
224 }
225
226 fn try_read_own_task_cell_untracked(
227 &self,
228 current_task: TaskId,
229 index: CellId,
230 options: ReadCellOptions,
231 ) -> Result<TypedCellContent> {
232 self.read_own_task_cell(current_task, index, options)
233 }
234
235 fn try_read_local_output(
236 &self,
237 _execution_id: ExecutionId,
238 _local_task_id: LocalTaskId,
239 ) -> Result<Result<RawVc, EventListener>> {
240 unimplemented!()
241 }
242
243 fn emit_collectible(&self, _trait_type: turbo_tasks::TraitTypeId, _collectible: RawVc) {
244 unimplemented!()
245 }
246
247 fn unemit_collectible(
248 &self,
249 _trait_type: turbo_tasks::TraitTypeId,
250 _collectible: RawVc,
251 _count: u32,
252 ) {
253 unimplemented!()
254 }
255
256 fn unemit_collectibles(
257 &self,
258 _trait_type: turbo_tasks::TraitTypeId,
259 _collectibles: &TaskCollectiblesMap,
260 ) {
261 unimplemented!()
262 }
263
264 fn read_task_collectibles(&self, _task: TaskId, _trait_id: TraitTypeId) -> TaskCollectiblesMap {
265 unimplemented!()
266 }
267
268 fn read_own_task_cell(
269 &self,
270 task: TaskId,
271 index: CellId,
272 _options: ReadCellOptions,
273 ) -> Result<TypedCellContent> {
274 let map = self.cells.lock().unwrap();
275 Ok(if let Some(cell) = map.get(&(task, index)) {
276 cell.to_owned()
277 } else {
278 Default::default()
279 }
280 .into_typed(index.type_id))
281 }
282
283 fn update_own_task_cell(&self, task: TaskId, index: CellId, content: CellContent) {
284 let mut map = self.cells.lock().unwrap();
285 let cell = map.entry((task, index)).or_default();
286 *cell = content;
287 }
288
289 fn connect_task(&self, _task: TaskId) {
290 }
292
293 fn mark_own_task_as_finished(&self, _task: TaskId) {
294 }
296
297 fn mark_own_task_as_session_dependent(&self, _task: TaskId) {
298 }
300
301 fn set_own_task_aggregation_number(&self, _task: TaskId, _aggregation_number: u32) {
302 }
304
305 fn detached_for_testing(
306 &self,
307 _f: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
308 ) -> std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
309 unimplemented!()
310 }
311
312 fn task_statistics(&self) -> &turbo_tasks::task_statistics::TaskStatisticsApi {
313 unimplemented!()
314 }
315
316 fn stop_and_wait(&self) -> std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
317 Box::pin(async {})
318 }
319
320 fn subscribe_to_compilation_events(
323 &self,
324 _event_types: Option<Vec<String>>,
325 ) -> Receiver<Arc<dyn CompilationEvent>> {
326 unimplemented!()
327 }
328
329 fn send_compilation_event(&self, _event: Arc<dyn CompilationEvent>) {
332 unimplemented!()
333 }
334}
335
336impl VcStorage {
337 pub fn with<T>(f: impl Future<Output = T>) -> impl Future<Output = T> {
338 with_turbo_tasks_for_testing(
339 Arc::new_cyclic(|weak| VcStorage {
340 this: weak.clone(),
341 ..Default::default()
342 }),
343 TaskId::MAX,
344 ExecutionId::MIN,
345 f,
346 )
347 }
348}