turbo_tasks_testing/
lib.rs

1//! Testing utilities and macros for turbo-tasks and applications based on it.
2
3pub mod retry;
4mod run;
5
6use std::{
7    future::Future,
8    mem::replace,
9    panic::AssertUnwindSafe,
10    sync::{Arc, Mutex, Weak},
11};
12
13use anyhow::{Result, anyhow};
14use futures::FutureExt;
15use rustc_hash::FxHashMap;
16use tokio::sync::mpsc::Receiver;
17use turbo_tasks::{
18    CellId, ExecutionId, InvalidationReason, LocalTaskId, MagicAny, RawVc, ReadCellOptions,
19    ReadConsistency, TaskId, TaskPersistence, TraitTypeId, TurboTasksApi, TurboTasksCallApi,
20    backend::{CellContent, TaskCollectiblesMap, TypedCellContent},
21    event::{Event, EventListener},
22    message_queue::CompilationEvent,
23    test_helpers::with_turbo_tasks_for_testing,
24    util::{SharedError, StaticOrArc},
25};
26
27pub use crate::run::{Registration, run, run_with_tt, run_without_cache_check};
28
29enum Task {
30    Spawned(Event),
31    Finished(Result<RawVc, SharedError>),
32}
33
34#[derive(Default)]
35pub struct VcStorage {
36    this: Weak<Self>,
37    cells: Mutex<FxHashMap<(TaskId, CellId), CellContent>>,
38    tasks: Mutex<Vec<Task>>,
39}
40
41impl VcStorage {
42    fn dynamic_call(
43        &self,
44        func: &'static turbo_tasks::macro_helpers::NativeFunction,
45        this_arg: Option<RawVc>,
46        arg: Box<dyn MagicAny>,
47    ) -> RawVc {
48        let this = self.this.upgrade().unwrap();
49        let handle = tokio::runtime::Handle::current();
50        let future = func.execute(this_arg, &*arg);
51        let i = {
52            let mut tasks = self.tasks.lock().unwrap();
53            let i = tasks.len();
54            tasks.push(Task::Spawned(Event::new(move || {
55                format!("Task({i})::event")
56            })));
57            i
58        };
59        let task_id = TaskId::try_from(u32::try_from(i + 1).unwrap()).unwrap();
60        let execution_id = ExecutionId::try_from(u16::try_from(i + 1).unwrap()).unwrap();
61        handle.spawn(with_turbo_tasks_for_testing(
62            this.clone(),
63            task_id,
64            execution_id,
65            async move {
66                let result = AssertUnwindSafe(future).catch_unwind().await;
67
68                // Convert the unwind panic to an anyhow error that can be cloned.
69                let result = result
70                    .map_err(|any| match any.downcast::<String>() {
71                        Ok(owned) => anyhow!(owned),
72                        Err(any) => match any.downcast::<&'static str>() {
73                            Ok(str) => anyhow!(str),
74                            Err(_) => anyhow!("unknown panic"),
75                        },
76                    })
77                    .and_then(|r| r)
78                    .map_err(SharedError::new);
79
80                let mut tasks = this.tasks.lock().unwrap();
81                if let Task::Spawned(event) = replace(&mut tasks[i], Task::Finished(result)) {
82                    event.notify(usize::MAX);
83                }
84            },
85        ));
86        RawVc::TaskOutput(task_id)
87    }
88}
89
90impl TurboTasksCallApi for VcStorage {
91    fn dynamic_call(
92        &self,
93        func: &'static turbo_tasks::macro_helpers::NativeFunction,
94        this: Option<RawVc>,
95        arg: Box<dyn MagicAny>,
96        _persistence: TaskPersistence,
97    ) -> RawVc {
98        self.dynamic_call(func, this, arg)
99    }
100    fn native_call(
101        &self,
102        _func: &'static turbo_tasks::macro_helpers::NativeFunction,
103        _this: Option<RawVc>,
104        _arg: Box<dyn MagicAny>,
105        _persistence: TaskPersistence,
106    ) -> RawVc {
107        unreachable!()
108    }
109
110    fn trait_call(
111        &self,
112        _trait_type: &'static turbo_tasks::TraitMethod,
113        _this: RawVc,
114        _arg: Box<dyn MagicAny>,
115        _persistence: TaskPersistence,
116    ) -> RawVc {
117        unreachable!()
118    }
119
120    fn run_once(
121        &self,
122        _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
123    ) -> TaskId {
124        unreachable!()
125    }
126
127    fn run_once_with_reason(
128        &self,
129        _reason: StaticOrArc<dyn InvalidationReason>,
130        _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
131    ) -> TaskId {
132        unreachable!()
133    }
134
135    fn run_once_process(
136        &self,
137        _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
138    ) -> TaskId {
139        unreachable!()
140    }
141}
142
143impl TurboTasksApi for VcStorage {
144    fn invalidate(&self, _task: TaskId) {
145        unreachable!()
146    }
147
148    fn invalidate_with_reason(
149        &self,
150        _task: TaskId,
151        _reason: turbo_tasks::util::StaticOrArc<dyn turbo_tasks::InvalidationReason>,
152    ) {
153        unreachable!()
154    }
155
156    fn invalidate_serialization(&self, _task: TaskId) {
157        // ignore
158    }
159
160    fn notify_scheduled_tasks(&self) {
161        // ignore
162    }
163
164    fn try_read_task_output(
165        &self,
166        id: TaskId,
167        _consistency: ReadConsistency,
168    ) -> Result<Result<RawVc, EventListener>> {
169        let tasks = self.tasks.lock().unwrap();
170        let i = *id - 1;
171        let task = tasks.get(i as usize).unwrap();
172        match task {
173            Task::Spawned(event) => Ok(Err(event.listen())),
174            Task::Finished(result) => match result {
175                Ok(vc) => Ok(Ok(*vc)),
176                Err(err) => Err(anyhow!(err.clone())),
177            },
178        }
179    }
180
181    fn try_read_task_output_untracked(
182        &self,
183        task: TaskId,
184        consistency: ReadConsistency,
185    ) -> Result<Result<RawVc, EventListener>> {
186        self.try_read_task_output(task, consistency)
187    }
188
189    fn try_read_task_cell(
190        &self,
191        task: TaskId,
192        index: CellId,
193        _options: ReadCellOptions,
194    ) -> Result<Result<TypedCellContent, EventListener>> {
195        let map = self.cells.lock().unwrap();
196        Ok(Ok(if let Some(cell) = map.get(&(task, index)) {
197            cell.clone()
198        } else {
199            Default::default()
200        }
201        .into_typed(index.type_id)))
202    }
203
204    fn try_read_task_cell_untracked(
205        &self,
206        task: TaskId,
207        index: CellId,
208        _options: ReadCellOptions,
209    ) -> Result<Result<TypedCellContent, EventListener>> {
210        let map = self.cells.lock().unwrap();
211        Ok(Ok(if let Some(cell) = map.get(&(task, index)) {
212            cell.to_owned()
213        } else {
214            Default::default()
215        }
216        .into_typed(index.type_id)))
217    }
218
219    fn try_read_own_task_cell_untracked(
220        &self,
221        current_task: TaskId,
222        index: CellId,
223        options: ReadCellOptions,
224    ) -> Result<TypedCellContent> {
225        self.read_own_task_cell(current_task, index, options)
226    }
227
228    fn try_read_local_output(
229        &self,
230        _execution_id: ExecutionId,
231        _local_task_id: LocalTaskId,
232    ) -> Result<Result<RawVc, EventListener>> {
233        unimplemented!()
234    }
235
236    fn emit_collectible(&self, _trait_type: turbo_tasks::TraitTypeId, _collectible: RawVc) {
237        unimplemented!()
238    }
239
240    fn unemit_collectible(
241        &self,
242        _trait_type: turbo_tasks::TraitTypeId,
243        _collectible: RawVc,
244        _count: u32,
245    ) {
246        unimplemented!()
247    }
248
249    fn unemit_collectibles(
250        &self,
251        _trait_type: turbo_tasks::TraitTypeId,
252        _collectibles: &TaskCollectiblesMap,
253    ) {
254        unimplemented!()
255    }
256
257    fn read_task_collectibles(&self, _task: TaskId, _trait_id: TraitTypeId) -> TaskCollectiblesMap {
258        unimplemented!()
259    }
260
261    fn read_own_task_cell(
262        &self,
263        task: TaskId,
264        index: CellId,
265        _options: ReadCellOptions,
266    ) -> Result<TypedCellContent> {
267        let map = self.cells.lock().unwrap();
268        Ok(if let Some(cell) = map.get(&(task, index)) {
269            cell.to_owned()
270        } else {
271            Default::default()
272        }
273        .into_typed(index.type_id))
274    }
275
276    fn update_own_task_cell(&self, task: TaskId, index: CellId, content: CellContent) {
277        let mut map = self.cells.lock().unwrap();
278        let cell = map.entry((task, index)).or_default();
279        *cell = content;
280    }
281
282    fn connect_task(&self, _task: TaskId) {
283        // no-op
284    }
285
286    fn mark_own_task_as_finished(&self, _task: TaskId) {
287        // no-op
288    }
289
290    fn mark_own_task_as_session_dependent(&self, _task: TaskId) {
291        // no-op
292    }
293
294    fn set_own_task_aggregation_number(&self, _task: TaskId, _aggregation_number: u32) {
295        // no-op
296    }
297
298    fn detached_for_testing(
299        &self,
300        _f: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
301    ) -> std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
302        unimplemented!()
303    }
304
305    fn task_statistics(&self) -> &turbo_tasks::task_statistics::TaskStatisticsApi {
306        unimplemented!()
307    }
308
309    fn stop_and_wait(&self) -> std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
310        Box::pin(async {})
311    }
312
313    /// Should not be called on the testing VcStorage. These methods are only implemented for
314    /// structs with access to a `MessageQueue` like `TurboTasks`.
315    fn subscribe_to_compilation_events(
316        &self,
317        _event_types: Option<Vec<String>>,
318    ) -> Receiver<Arc<dyn CompilationEvent>> {
319        unimplemented!()
320    }
321
322    /// Should not be called on the testing VcStorage. These methods are only implemented for
323    /// structs with access to a `MessageQueue` like `TurboTasks`.
324    fn send_compilation_event(&self, _event: Arc<dyn CompilationEvent>) {
325        unimplemented!()
326    }
327}
328
329impl VcStorage {
330    pub fn with<T>(f: impl Future<Output = T>) -> impl Future<Output = T> {
331        with_turbo_tasks_for_testing(
332            Arc::new_cyclic(|weak| VcStorage {
333                this: weak.clone(),
334                ..Default::default()
335            }),
336            TaskId::MAX,
337            ExecutionId::MIN,
338            f,
339        )
340    }
341}