turbo_tasks_testing/
lib.rs

1//! Testing utilities and macros for turbo-tasks and applications based on it.
2
3pub mod retry;
4mod run;
5
6use std::{
7    future::Future,
8    mem::replace,
9    panic::AssertUnwindSafe,
10    pin::Pin,
11    sync::{Arc, Mutex, Weak},
12};
13
14use anyhow::{Result, anyhow};
15use futures::FutureExt;
16use rustc_hash::FxHashMap;
17use tokio::sync::mpsc::Receiver;
18use turbo_tasks::{
19    CellId, ExecutionId, InvalidationReason, LocalTaskId, MagicAny, RawVc, ReadCellOptions,
20    ReadOutputOptions, TaskId, TaskPersistence, TraitTypeId, TurboTasksApi, TurboTasksCallApi,
21    backend::{CellContent, TaskCollectiblesMap, TypedCellContent, VerificationMode},
22    event::{Event, EventListener},
23    message_queue::CompilationEvent,
24    test_helpers::with_turbo_tasks_for_testing,
25    util::{SharedError, StaticOrArc},
26};
27
28pub use crate::run::{
29    Registration, run, run_once, run_once_without_cache_check, run_with_tt, run_without_cache_check,
30};
31
32enum Task {
33    Spawned(Event),
34    Finished(Result<RawVc, SharedError>),
35}
36
37#[derive(Default)]
38pub struct VcStorage {
39    this: Weak<Self>,
40    cells: Mutex<FxHashMap<(TaskId, CellId), CellContent>>,
41    tasks: Mutex<Vec<Task>>,
42}
43
44impl VcStorage {
45    fn dynamic_call(
46        &self,
47        func: &'static turbo_tasks::macro_helpers::NativeFunction,
48        this_arg: Option<RawVc>,
49        arg: Box<dyn MagicAny>,
50    ) -> RawVc {
51        let this = self.this.upgrade().unwrap();
52        let handle = tokio::runtime::Handle::current();
53        let future = func.execute(this_arg, &*arg);
54        let i = {
55            let mut tasks = self.tasks.lock().unwrap();
56            let i = tasks.len();
57            tasks.push(Task::Spawned(Event::new(move || {
58                move || format!("Task({i})::event")
59            })));
60            i
61        };
62        let task_id = TaskId::try_from(u32::try_from(i + 1).unwrap()).unwrap();
63        let execution_id = ExecutionId::try_from(u16::try_from(i + 1).unwrap()).unwrap();
64        handle.spawn(with_turbo_tasks_for_testing(
65            this.clone(),
66            task_id,
67            execution_id,
68            async move {
69                let result = AssertUnwindSafe(future).catch_unwind().await;
70
71                // Convert the unwind panic to an anyhow error that can be cloned.
72                let result = result
73                    .map_err(|any| match any.downcast::<String>() {
74                        Ok(owned) => anyhow!(owned),
75                        Err(any) => match any.downcast::<&'static str>() {
76                            Ok(str) => anyhow!(str),
77                            Err(_) => anyhow!("unknown panic"),
78                        },
79                    })
80                    .and_then(|r| r)
81                    .map_err(SharedError::new);
82
83                let mut tasks = this.tasks.lock().unwrap();
84                if let Task::Spawned(event) = replace(&mut tasks[i], Task::Finished(result)) {
85                    event.notify(usize::MAX);
86                }
87            },
88        ));
89        RawVc::TaskOutput(task_id)
90    }
91}
92
93impl TurboTasksCallApi for VcStorage {
94    fn dynamic_call(
95        &self,
96        func: &'static turbo_tasks::macro_helpers::NativeFunction,
97        this: Option<RawVc>,
98        arg: Box<dyn MagicAny>,
99        _persistence: TaskPersistence,
100    ) -> RawVc {
101        self.dynamic_call(func, this, arg)
102    }
103    fn native_call(
104        &self,
105        _func: &'static turbo_tasks::macro_helpers::NativeFunction,
106        _this: Option<RawVc>,
107        _arg: Box<dyn MagicAny>,
108        _persistence: TaskPersistence,
109    ) -> RawVc {
110        unreachable!()
111    }
112
113    fn trait_call(
114        &self,
115        _trait_type: &'static turbo_tasks::TraitMethod,
116        _this: RawVc,
117        _arg: Box<dyn MagicAny>,
118        _persistence: TaskPersistence,
119    ) -> RawVc {
120        unreachable!()
121    }
122
123    fn run(
124        &self,
125        _future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
126    ) -> Pin<
127        Box<dyn Future<Output = Result<(), turbo_tasks::backend::TurboTasksExecutionError>> + Send>,
128    > {
129        unreachable!()
130    }
131
132    fn run_once(
133        &self,
134        _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
135    ) -> Pin<
136        Box<dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static>,
137    > {
138        unreachable!()
139    }
140
141    fn run_once_with_reason(
142        &self,
143        _reason: StaticOrArc<dyn InvalidationReason>,
144        _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
145    ) -> Pin<
146        Box<dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static>,
147    > {
148        unreachable!()
149    }
150
151    fn start_once_process(
152        &self,
153        _future: std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
154    ) {
155        unreachable!()
156    }
157}
158
159impl TurboTasksApi for VcStorage {
160    fn invalidate(&self, _task: TaskId) {
161        unreachable!()
162    }
163
164    fn invalidate_with_reason(
165        &self,
166        _task: TaskId,
167        _reason: turbo_tasks::util::StaticOrArc<dyn turbo_tasks::InvalidationReason>,
168    ) {
169        unreachable!()
170    }
171
172    fn invalidate_serialization(&self, _task: TaskId) {
173        // ignore
174    }
175
176    fn try_read_task_output(
177        &self,
178        id: TaskId,
179        _options: ReadOutputOptions,
180    ) -> Result<Result<RawVc, EventListener>> {
181        let tasks = self.tasks.lock().unwrap();
182        let i = *id - 1;
183        let task = tasks.get(i as usize).unwrap();
184        match task {
185            Task::Spawned(event) => Ok(Err(event.listen())),
186            Task::Finished(result) => match result {
187                Ok(vc) => Ok(Ok(*vc)),
188                Err(err) => Err(anyhow!(err.clone())),
189            },
190        }
191    }
192
193    fn try_read_task_cell(
194        &self,
195        task: TaskId,
196        index: CellId,
197        _options: ReadCellOptions,
198    ) -> Result<Result<TypedCellContent, EventListener>> {
199        let map = self.cells.lock().unwrap();
200        Ok(Ok(if let Some(cell) = map.get(&(task, index)) {
201            cell.clone()
202        } else {
203            Default::default()
204        }
205        .into_typed(index.type_id)))
206    }
207    fn try_read_own_task_cell(
208        &self,
209        current_task: TaskId,
210        index: CellId,
211        options: ReadCellOptions,
212    ) -> Result<TypedCellContent> {
213        self.read_own_task_cell(current_task, index, options)
214    }
215
216    fn try_read_local_output(
217        &self,
218        _execution_id: ExecutionId,
219        _local_task_id: LocalTaskId,
220    ) -> Result<Result<RawVc, EventListener>> {
221        unimplemented!()
222    }
223
224    fn emit_collectible(&self, _trait_type: turbo_tasks::TraitTypeId, _collectible: RawVc) {
225        unimplemented!()
226    }
227
228    fn unemit_collectible(
229        &self,
230        _trait_type: turbo_tasks::TraitTypeId,
231        _collectible: RawVc,
232        _count: u32,
233    ) {
234        unimplemented!()
235    }
236
237    fn unemit_collectibles(
238        &self,
239        _trait_type: turbo_tasks::TraitTypeId,
240        _collectibles: &TaskCollectiblesMap,
241    ) {
242        unimplemented!()
243    }
244
245    fn read_task_collectibles(&self, _task: TaskId, _trait_id: TraitTypeId) -> TaskCollectiblesMap {
246        unimplemented!()
247    }
248
249    fn read_own_task_cell(
250        &self,
251        task: TaskId,
252        index: CellId,
253        _options: ReadCellOptions,
254    ) -> Result<TypedCellContent> {
255        let map = self.cells.lock().unwrap();
256        Ok(if let Some(cell) = map.get(&(task, index)) {
257            cell.to_owned()
258        } else {
259            Default::default()
260        }
261        .into_typed(index.type_id))
262    }
263
264    fn update_own_task_cell(
265        &self,
266        task: TaskId,
267        index: CellId,
268        content: CellContent,
269        _verification_mode: VerificationMode,
270    ) {
271        let mut map = self.cells.lock().unwrap();
272        let cell = map.entry((task, index)).or_default();
273        *cell = content;
274    }
275
276    fn connect_task(&self, _task: TaskId) {
277        // no-op
278    }
279
280    fn mark_own_task_as_finished(&self, _task: TaskId) {
281        // no-op
282    }
283
284    fn mark_own_task_as_session_dependent(&self, _task: TaskId) {
285        // no-op
286    }
287
288    fn set_own_task_aggregation_number(&self, _task: TaskId, _aggregation_number: u32) {
289        // no-op
290    }
291
292    fn detached_for_testing(
293        &self,
294        _f: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
295    ) -> std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
296        unimplemented!()
297    }
298
299    fn task_statistics(&self) -> &turbo_tasks::task_statistics::TaskStatisticsApi {
300        unimplemented!()
301    }
302
303    fn stop_and_wait(&self) -> std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
304        Box::pin(async {})
305    }
306
307    /// Should not be called on the testing VcStorage. These methods are only implemented for
308    /// structs with access to a `MessageQueue` like `TurboTasks`.
309    fn subscribe_to_compilation_events(
310        &self,
311        _event_types: Option<Vec<String>>,
312    ) -> Receiver<Arc<dyn CompilationEvent>> {
313        unimplemented!()
314    }
315
316    /// Should not be called on the testing VcStorage. These methods are only implemented for
317    /// structs with access to a `MessageQueue` like `TurboTasks`.
318    fn send_compilation_event(&self, _event: Arc<dyn CompilationEvent>) {
319        unimplemented!()
320    }
321
322    fn is_tracking_dependencies(&self) -> bool {
323        false
324    }
325}
326
327impl VcStorage {
328    pub fn with<T>(f: impl Future<Output = T>) -> impl Future<Output = T> {
329        with_turbo_tasks_for_testing(
330            Arc::new_cyclic(|weak| VcStorage {
331                this: weak.clone(),
332                ..Default::default()
333            }),
334            TaskId::MAX,
335            ExecutionId::MIN,
336            f,
337        )
338    }
339}