1pub mod retry;
4mod run;
5
6use std::{
7 future::Future,
8 mem::replace,
9 panic::AssertUnwindSafe,
10 pin::Pin,
11 sync::{Arc, Mutex, Weak},
12};
13
14use anyhow::{Result, anyhow, bail};
15use futures::FutureExt;
16use rustc_hash::FxHashMap;
17use smallvec::SmallVec;
18use tokio::sync::mpsc::Receiver;
19use turbo_tasks::{
20 CellId, DynTaskInputs, ExecutionId, InvalidationReason, LocalTaskId, RawVc, ReadCellOptions,
21 ReadOutputOptions, StackDynTaskInputs, TaskId, TaskPersistence, TraitTypeId, TurboTasksApi,
22 TurboTasksCallApi,
23 backend::{CellContent, TaskCollectiblesMap, TypedCellContent, VerificationMode},
24 event::{Event, EventListener},
25 message_queue::CompilationEvent,
26 test_helpers::with_turbo_tasks_for_testing,
27 util::{SharedError, StaticOrArc},
28};
29
30pub use crate::run::{
31 Registration, TestInstance, run, run_once, run_once_without_cache_check, run_with_tt,
32 run_without_cache_check, test_instance,
33};
34
35enum Task {
36 Spawned(Event),
37 Finished(Result<RawVc, SharedError>),
38}
39
40#[derive(Default)]
41pub struct VcStorage {
42 this: Weak<Self>,
43 cells: Mutex<FxHashMap<(TaskId, CellId), CellContent>>,
44 tasks: Mutex<Vec<Task>>,
45}
46
47impl VcStorage {
48 fn dynamic_call(
49 &self,
50 func: &'static turbo_tasks::macro_helpers::NativeFunction,
51 this_arg: Option<RawVc>,
52 arg: Box<dyn DynTaskInputs>,
53 ) -> RawVc {
54 let this = self.this.upgrade().unwrap();
55 let handle = tokio::runtime::Handle::current();
56 let future = func.execute(this_arg, &*arg);
57 let i = {
58 let mut tasks = self.tasks.lock().unwrap();
59 let i = tasks.len();
60 tasks.push(Task::Spawned(Event::new(move || {
61 move || format!("Task({i})::event")
62 })));
63 i
64 };
65 let task_id = TaskId::try_from(u32::try_from(i + 1).unwrap()).unwrap();
66 let execution_id = ExecutionId::try_from(u16::try_from(i + 1).unwrap()).unwrap();
67 handle.spawn(with_turbo_tasks_for_testing(
68 this.clone(),
69 task_id,
70 execution_id,
71 async move {
72 let result = AssertUnwindSafe(future).catch_unwind().await;
73
74 let result = match result {
76 Ok(Ok(raw_vc)) => Ok(raw_vc.to_non_local().await),
77 _ => result,
78 };
79 let result = result
80 .map_err(|any| match any.downcast::<String>() {
81 Ok(owned) => anyhow!(owned),
82 Err(any) => match any.downcast::<&'static str>() {
83 Ok(str) => anyhow!(str),
84 Err(_) => anyhow!("unknown panic"),
85 },
86 })
87 .and_then(|r| r)
88 .map_err(SharedError::new);
89
90 let mut tasks = this.tasks.lock().unwrap();
91 if let Task::Spawned(event) = replace(&mut tasks[i], Task::Finished(result)) {
92 event.notify(usize::MAX);
93 }
94 },
95 ));
96 RawVc::TaskOutput(task_id)
97 }
98}
99
100impl TurboTasksCallApi for VcStorage {
101 fn dynamic_call(
102 &self,
103 func: &'static turbo_tasks::macro_helpers::NativeFunction,
104 this: Option<RawVc>,
105 arg: &mut dyn StackDynTaskInputs,
106 _persistence: TaskPersistence,
107 ) -> RawVc {
108 self.dynamic_call(func, this, arg.take_box())
109 }
110 fn native_call(
111 &self,
112 func: &'static turbo_tasks::macro_helpers::NativeFunction,
113 this: Option<RawVc>,
114 arg: &mut dyn StackDynTaskInputs,
115 _persistence: TaskPersistence,
116 ) -> RawVc {
117 self.dynamic_call(func, this, arg.take_box())
118 }
119
120 fn trait_call(
121 &self,
122 _trait_type: &'static turbo_tasks::TraitMethod,
123 _this: RawVc,
124 _arg: &mut dyn StackDynTaskInputs,
125 _persistence: TaskPersistence,
126 ) -> RawVc {
127 unreachable!()
128 }
129
130 fn run(
131 &self,
132 _future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
133 ) -> Pin<
134 Box<dyn Future<Output = Result<(), turbo_tasks::backend::TurboTasksExecutionError>> + Send>,
135 > {
136 unreachable!()
137 }
138
139 fn run_once(
140 &self,
141 _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
142 ) -> Pin<
143 Box<dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static>,
144 > {
145 unreachable!()
146 }
147
148 fn run_once_with_reason(
149 &self,
150 _reason: StaticOrArc<dyn InvalidationReason>,
151 _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
152 ) -> Pin<
153 Box<dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static>,
154 > {
155 unreachable!()
156 }
157
158 fn start_once_process(
159 &self,
160 _future: std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
161 ) {
162 unreachable!()
163 }
164
165 fn send_compilation_event(&self, _event: Arc<dyn CompilationEvent>) {
168 unimplemented!()
169 }
170
171 fn get_task_name(&self, task: TaskId) -> String {
172 format!("Task({})", task)
173 }
174}
175
176impl TurboTasksApi for VcStorage {
177 fn invalidate(&self, _task: TaskId) {
178 unreachable!()
179 }
180
181 fn invalidate_with_reason(
182 &self,
183 _task: TaskId,
184 _reason: turbo_tasks::util::StaticOrArc<dyn turbo_tasks::InvalidationReason>,
185 ) {
186 unreachable!()
187 }
188
189 fn invalidate_serialization(&self, _task: TaskId) {
190 }
192
193 fn try_read_task_output(
194 &self,
195 id: TaskId,
196 _options: ReadOutputOptions,
197 ) -> Result<Result<RawVc, EventListener>> {
198 let tasks = self.tasks.lock().unwrap();
199 let i = *id - 1;
200 let task = tasks.get(i as usize).unwrap();
201 match task {
202 Task::Spawned(event) => Ok(Err(event.listen())),
203 Task::Finished(result) => match result {
204 Ok(vc) => Ok(Ok(*vc)),
205 Err(err) => bail!(err.clone()),
206 },
207 }
208 }
209
210 fn try_read_task_cell(
211 &self,
212 task: TaskId,
213 index: CellId,
214 _options: ReadCellOptions,
215 ) -> Result<Result<TypedCellContent, EventListener>> {
216 let map = self.cells.lock().unwrap();
217 Ok(Ok(if let Some(cell) = map.get(&(task, index)) {
218 cell.clone()
219 } else {
220 Default::default()
221 }
222 .into_typed(index.type_id)))
223 }
224 fn try_read_own_task_cell(
225 &self,
226 current_task: TaskId,
227 index: CellId,
228 ) -> Result<TypedCellContent> {
229 self.read_own_task_cell(current_task, index)
230 }
231
232 fn try_read_local_output(
233 &self,
234 _execution_id: ExecutionId,
235 _local_task_id: LocalTaskId,
236 ) -> Result<Result<RawVc, EventListener>> {
237 unimplemented!()
238 }
239
240 fn emit_collectible(&self, _trait_type: turbo_tasks::TraitTypeId, _collectible: RawVc) {
241 unimplemented!()
242 }
243
244 fn unemit_collectible(
245 &self,
246 _trait_type: turbo_tasks::TraitTypeId,
247 _collectible: RawVc,
248 _count: u32,
249 ) {
250 unimplemented!()
251 }
252
253 fn unemit_collectibles(
254 &self,
255 _trait_type: turbo_tasks::TraitTypeId,
256 _collectibles: &TaskCollectiblesMap,
257 ) {
258 unimplemented!()
259 }
260
261 fn read_task_collectibles(&self, _task: TaskId, _trait_id: TraitTypeId) -> TaskCollectiblesMap {
262 unimplemented!()
263 }
264
265 fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent> {
266 let map = self.cells.lock().unwrap();
267 Ok(if let Some(cell) = map.get(&(task, index)) {
268 cell.to_owned()
269 } else {
270 Default::default()
271 }
272 .into_typed(index.type_id))
273 }
274
275 fn update_own_task_cell(
276 &self,
277 task: TaskId,
278 index: CellId,
279 content: CellContent,
280 _updated_key_hashes: Option<SmallVec<[u64; 2]>>,
281 _content_hash: Option<[u8; 16]>,
282 _verification_mode: VerificationMode,
283 ) {
284 let mut map = self.cells.lock().unwrap();
285 let cell = map.entry((task, index)).or_default();
286 *cell = content;
287 }
288
289 fn connect_task(&self, _task: TaskId) {
290 }
292
293 fn mark_own_task_as_finished(&self, _task: TaskId) {
294 }
296
297 fn spawn_detached_for_testing(
298 &self,
299 _f: std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
300 ) {
301 unimplemented!()
302 }
303
304 fn task_statistics(&self) -> &turbo_tasks::task_statistics::TaskStatisticsApi {
305 unimplemented!()
306 }
307
308 fn stop_and_wait(&self) -> std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
309 Box::pin(async {})
310 }
311
312 fn subscribe_to_compilation_events(
315 &self,
316 _event_types: Option<Vec<String>>,
317 ) -> Receiver<Arc<dyn CompilationEvent>> {
318 unimplemented!()
319 }
320
321 fn is_tracking_dependencies(&self) -> bool {
322 false
323 }
324}
325
326impl VcStorage {
327 pub fn with<T>(f: impl Future<Output = T>) -> impl Future<Output = T> {
328 with_turbo_tasks_for_testing(
329 Arc::new_cyclic(|weak| VcStorage {
330 this: weak.clone(),
331 ..Default::default()
332 }),
333 TaskId::MAX,
334 ExecutionId::MIN,
335 f,
336 )
337 }
338}