turbo_tasks_testing/
lib.rs

1//! Testing utilities and macros for turbo-tasks and applications based on it.
2
3pub mod retry;
4mod run;
5
6use std::{
7    future::Future,
8    mem::replace,
9    panic::AssertUnwindSafe,
10    pin::Pin,
11    sync::{Arc, Mutex, Weak},
12};
13
14use anyhow::{Result, anyhow};
15use futures::FutureExt;
16use rustc_hash::FxHashMap;
17use smallvec::SmallVec;
18use tokio::sync::mpsc::Receiver;
19use turbo_tasks::{
20    CellId, ExecutionId, InvalidationReason, LocalTaskId, MagicAny, RawVc, ReadCellOptions,
21    ReadOutputOptions, TaskId, TaskPersistence, TraitTypeId, TurboTasksApi, TurboTasksCallApi,
22    backend::{CellContent, TaskCollectiblesMap, TypedCellContent, VerificationMode},
23    event::{Event, EventListener},
24    message_queue::CompilationEvent,
25    test_helpers::with_turbo_tasks_for_testing,
26    util::{SharedError, StaticOrArc},
27};
28
29pub use crate::run::{
30    Registration, run, run_once, run_once_without_cache_check, run_with_tt, run_without_cache_check,
31};
32
33enum Task {
34    Spawned(Event),
35    Finished(Result<RawVc, SharedError>),
36}
37
38#[derive(Default)]
39pub struct VcStorage {
40    this: Weak<Self>,
41    cells: Mutex<FxHashMap<(TaskId, CellId), CellContent>>,
42    tasks: Mutex<Vec<Task>>,
43}
44
45impl VcStorage {
46    fn dynamic_call(
47        &self,
48        func: &'static turbo_tasks::macro_helpers::NativeFunction,
49        this_arg: Option<RawVc>,
50        arg: Box<dyn MagicAny>,
51    ) -> RawVc {
52        let this = self.this.upgrade().unwrap();
53        let handle = tokio::runtime::Handle::current();
54        let future = func.execute(this_arg, &*arg);
55        let i = {
56            let mut tasks = self.tasks.lock().unwrap();
57            let i = tasks.len();
58            tasks.push(Task::Spawned(Event::new(move || {
59                move || format!("Task({i})::event")
60            })));
61            i
62        };
63        let task_id = TaskId::try_from(u32::try_from(i + 1).unwrap()).unwrap();
64        let execution_id = ExecutionId::try_from(u16::try_from(i + 1).unwrap()).unwrap();
65        handle.spawn(with_turbo_tasks_for_testing(
66            this.clone(),
67            task_id,
68            execution_id,
69            async move {
70                let result = AssertUnwindSafe(future).catch_unwind().await;
71
72                // Convert the unwind panic to an anyhow error that can be cloned.
73                let result = result
74                    .map_err(|any| match any.downcast::<String>() {
75                        Ok(owned) => anyhow!(owned),
76                        Err(any) => match any.downcast::<&'static str>() {
77                            Ok(str) => anyhow!(str),
78                            Err(_) => anyhow!("unknown panic"),
79                        },
80                    })
81                    .and_then(|r| r)
82                    .map_err(SharedError::new);
83
84                let mut tasks = this.tasks.lock().unwrap();
85                if let Task::Spawned(event) = replace(&mut tasks[i], Task::Finished(result)) {
86                    event.notify(usize::MAX);
87                }
88            },
89        ));
90        RawVc::TaskOutput(task_id)
91    }
92}
93
94impl TurboTasksCallApi for VcStorage {
95    fn dynamic_call(
96        &self,
97        func: &'static turbo_tasks::macro_helpers::NativeFunction,
98        this: Option<RawVc>,
99        arg: Box<dyn MagicAny>,
100        _persistence: TaskPersistence,
101    ) -> RawVc {
102        self.dynamic_call(func, this, arg)
103    }
104    fn native_call(
105        &self,
106        _func: &'static turbo_tasks::macro_helpers::NativeFunction,
107        _this: Option<RawVc>,
108        _arg: Box<dyn MagicAny>,
109        _persistence: TaskPersistence,
110    ) -> RawVc {
111        unreachable!()
112    }
113
114    fn trait_call(
115        &self,
116        _trait_type: &'static turbo_tasks::TraitMethod,
117        _this: RawVc,
118        _arg: Box<dyn MagicAny>,
119        _persistence: TaskPersistence,
120    ) -> RawVc {
121        unreachable!()
122    }
123
124    fn run(
125        &self,
126        _future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
127    ) -> Pin<
128        Box<dyn Future<Output = Result<(), turbo_tasks::backend::TurboTasksExecutionError>> + Send>,
129    > {
130        unreachable!()
131    }
132
133    fn run_once(
134        &self,
135        _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
136    ) -> Pin<
137        Box<dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static>,
138    > {
139        unreachable!()
140    }
141
142    fn run_once_with_reason(
143        &self,
144        _reason: StaticOrArc<dyn InvalidationReason>,
145        _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
146    ) -> Pin<
147        Box<dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static>,
148    > {
149        unreachable!()
150    }
151
152    fn start_once_process(
153        &self,
154        _future: std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
155    ) {
156        unreachable!()
157    }
158}
159
160impl TurboTasksApi for VcStorage {
161    fn invalidate(&self, _task: TaskId) {
162        unreachable!()
163    }
164
165    fn invalidate_with_reason(
166        &self,
167        _task: TaskId,
168        _reason: turbo_tasks::util::StaticOrArc<dyn turbo_tasks::InvalidationReason>,
169    ) {
170        unreachable!()
171    }
172
173    fn invalidate_serialization(&self, _task: TaskId) {
174        // ignore
175    }
176
177    fn try_read_task_output(
178        &self,
179        id: TaskId,
180        _options: ReadOutputOptions,
181    ) -> Result<Result<RawVc, EventListener>> {
182        let tasks = self.tasks.lock().unwrap();
183        let i = *id - 1;
184        let task = tasks.get(i as usize).unwrap();
185        match task {
186            Task::Spawned(event) => Ok(Err(event.listen())),
187            Task::Finished(result) => match result {
188                Ok(vc) => Ok(Ok(*vc)),
189                Err(err) => Err(anyhow!(err.clone())),
190            },
191        }
192    }
193
194    fn try_read_task_cell(
195        &self,
196        task: TaskId,
197        index: CellId,
198        _options: ReadCellOptions,
199    ) -> Result<Result<TypedCellContent, EventListener>> {
200        let map = self.cells.lock().unwrap();
201        Ok(Ok(if let Some(cell) = map.get(&(task, index)) {
202            cell.clone()
203        } else {
204            Default::default()
205        }
206        .into_typed(index.type_id)))
207    }
208    fn try_read_own_task_cell(
209        &self,
210        current_task: TaskId,
211        index: CellId,
212        options: ReadCellOptions,
213    ) -> Result<TypedCellContent> {
214        self.read_own_task_cell(current_task, index, options)
215    }
216
217    fn try_read_local_output(
218        &self,
219        _execution_id: ExecutionId,
220        _local_task_id: LocalTaskId,
221    ) -> Result<Result<RawVc, EventListener>> {
222        unimplemented!()
223    }
224
225    fn emit_collectible(&self, _trait_type: turbo_tasks::TraitTypeId, _collectible: RawVc) {
226        unimplemented!()
227    }
228
229    fn unemit_collectible(
230        &self,
231        _trait_type: turbo_tasks::TraitTypeId,
232        _collectible: RawVc,
233        _count: u32,
234    ) {
235        unimplemented!()
236    }
237
238    fn unemit_collectibles(
239        &self,
240        _trait_type: turbo_tasks::TraitTypeId,
241        _collectibles: &TaskCollectiblesMap,
242    ) {
243        unimplemented!()
244    }
245
246    fn read_task_collectibles(&self, _task: TaskId, _trait_id: TraitTypeId) -> TaskCollectiblesMap {
247        unimplemented!()
248    }
249
250    fn read_own_task_cell(
251        &self,
252        task: TaskId,
253        index: CellId,
254        _options: ReadCellOptions,
255    ) -> Result<TypedCellContent> {
256        let map = self.cells.lock().unwrap();
257        Ok(if let Some(cell) = map.get(&(task, index)) {
258            cell.to_owned()
259        } else {
260            Default::default()
261        }
262        .into_typed(index.type_id))
263    }
264
265    fn update_own_task_cell(
266        &self,
267        task: TaskId,
268        index: CellId,
269        _is_serializable_cell_content: bool,
270        content: CellContent,
271        _updated_key_hashes: Option<SmallVec<[u64; 2]>>,
272        _verification_mode: VerificationMode,
273    ) {
274        let mut map = self.cells.lock().unwrap();
275        let cell = map.entry((task, index)).or_default();
276        *cell = content;
277    }
278
279    fn connect_task(&self, _task: TaskId) {
280        // no-op
281    }
282
283    fn mark_own_task_as_finished(&self, _task: TaskId) {
284        // no-op
285    }
286
287    fn mark_own_task_as_session_dependent(&self, _task: TaskId) {
288        // no-op
289    }
290
291    fn set_own_task_aggregation_number(&self, _task: TaskId, _aggregation_number: u32) {
292        // no-op
293    }
294
295    fn spawn_detached_for_testing(
296        &self,
297        _f: std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
298    ) {
299        unimplemented!()
300    }
301
302    fn task_statistics(&self) -> &turbo_tasks::task_statistics::TaskStatisticsApi {
303        unimplemented!()
304    }
305
306    fn stop_and_wait(&self) -> std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
307        Box::pin(async {})
308    }
309
310    /// Should not be called on the testing VcStorage. These methods are only implemented for
311    /// structs with access to a `MessageQueue` like `TurboTasks`.
312    fn subscribe_to_compilation_events(
313        &self,
314        _event_types: Option<Vec<String>>,
315    ) -> Receiver<Arc<dyn CompilationEvent>> {
316        unimplemented!()
317    }
318
319    /// Should not be called on the testing VcStorage. These methods are only implemented for
320    /// structs with access to a `MessageQueue` like `TurboTasks`.
321    fn send_compilation_event(&self, _event: Arc<dyn CompilationEvent>) {
322        unimplemented!()
323    }
324
325    fn is_tracking_dependencies(&self) -> bool {
326        false
327    }
328}
329
330impl VcStorage {
331    pub fn with<T>(f: impl Future<Output = T>) -> impl Future<Output = T> {
332        with_turbo_tasks_for_testing(
333            Arc::new_cyclic(|weak| VcStorage {
334                this: weak.clone(),
335                ..Default::default()
336            }),
337            TaskId::MAX,
338            ExecutionId::MIN,
339            f,
340        )
341    }
342}