Skip to main content

turbo_tasks_testing/
lib.rs

1//! Testing utilities and macros for turbo-tasks and applications based on it.
2
3pub mod retry;
4mod run;
5
6use std::{
7    future::Future,
8    mem::replace,
9    panic::AssertUnwindSafe,
10    pin::Pin,
11    sync::{Arc, Mutex, Weak},
12};
13
14use anyhow::{Result, anyhow, bail};
15use futures::FutureExt;
16use rustc_hash::FxHashMap;
17use smallvec::SmallVec;
18use tokio::sync::mpsc::Receiver;
19use turbo_tasks::{
20    CellId, DynTaskInputs, ExecutionId, InvalidationReason, LocalTaskId, RawVc, ReadCellOptions,
21    ReadOutputOptions, StackDynTaskInputs, TaskId, TaskPersistence, TraitTypeId, TurboTasksApi,
22    TurboTasksCallApi,
23    backend::{CellContent, TaskCollectiblesMap, TypedCellContent, VerificationMode},
24    event::{Event, EventListener},
25    message_queue::CompilationEvent,
26    test_helpers::with_turbo_tasks_for_testing,
27    util::{SharedError, StaticOrArc},
28};
29
30pub use crate::run::{
31    Registration, run, run_once, run_once_without_cache_check, run_with_tt, run_without_cache_check,
32};
33
34enum Task {
35    Spawned(Event),
36    Finished(Result<RawVc, SharedError>),
37}
38
39#[derive(Default)]
40pub struct VcStorage {
41    this: Weak<Self>,
42    cells: Mutex<FxHashMap<(TaskId, CellId), CellContent>>,
43    tasks: Mutex<Vec<Task>>,
44}
45
46impl VcStorage {
47    fn dynamic_call(
48        &self,
49        func: &'static turbo_tasks::macro_helpers::NativeFunction,
50        this_arg: Option<RawVc>,
51        arg: Box<dyn DynTaskInputs>,
52    ) -> RawVc {
53        let this = self.this.upgrade().unwrap();
54        let handle = tokio::runtime::Handle::current();
55        let future = func.execute(this_arg, &*arg);
56        let i = {
57            let mut tasks = self.tasks.lock().unwrap();
58            let i = tasks.len();
59            tasks.push(Task::Spawned(Event::new(move || {
60                move || format!("Task({i})::event")
61            })));
62            i
63        };
64        let task_id = TaskId::try_from(u32::try_from(i + 1).unwrap()).unwrap();
65        let execution_id = ExecutionId::try_from(u16::try_from(i + 1).unwrap()).unwrap();
66        handle.spawn(with_turbo_tasks_for_testing(
67            this.clone(),
68            task_id,
69            execution_id,
70            async move {
71                let result = AssertUnwindSafe(future).catch_unwind().await;
72
73                // Convert the unwind panic to an anyhow error that can be cloned.
74                let result = result
75                    .map_err(|any| match any.downcast::<String>() {
76                        Ok(owned) => anyhow!(owned),
77                        Err(any) => match any.downcast::<&'static str>() {
78                            Ok(str) => anyhow!(str),
79                            Err(_) => anyhow!("unknown panic"),
80                        },
81                    })
82                    .and_then(|r| r)
83                    .map_err(SharedError::new);
84
85                let mut tasks = this.tasks.lock().unwrap();
86                if let Task::Spawned(event) = replace(&mut tasks[i], Task::Finished(result)) {
87                    event.notify(usize::MAX);
88                }
89            },
90        ));
91        RawVc::TaskOutput(task_id)
92    }
93}
94
95impl TurboTasksCallApi for VcStorage {
96    fn dynamic_call(
97        &self,
98        func: &'static turbo_tasks::macro_helpers::NativeFunction,
99        this: Option<RawVc>,
100        arg: &mut dyn StackDynTaskInputs,
101        _persistence: TaskPersistence,
102    ) -> RawVc {
103        self.dynamic_call(func, this, arg.take_box())
104    }
105    fn native_call(
106        &self,
107        func: &'static turbo_tasks::macro_helpers::NativeFunction,
108        this: Option<RawVc>,
109        arg: &mut dyn StackDynTaskInputs,
110        _persistence: TaskPersistence,
111    ) -> RawVc {
112        self.dynamic_call(func, this, arg.take_box())
113    }
114
115    fn trait_call(
116        &self,
117        _trait_type: &'static turbo_tasks::TraitMethod,
118        _this: RawVc,
119        _arg: &mut dyn StackDynTaskInputs,
120        _persistence: TaskPersistence,
121    ) -> RawVc {
122        unreachable!()
123    }
124
125    fn run(
126        &self,
127        _future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
128    ) -> Pin<
129        Box<dyn Future<Output = Result<(), turbo_tasks::backend::TurboTasksExecutionError>> + Send>,
130    > {
131        unreachable!()
132    }
133
134    fn run_once(
135        &self,
136        _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
137    ) -> Pin<
138        Box<dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static>,
139    > {
140        unreachable!()
141    }
142
143    fn run_once_with_reason(
144        &self,
145        _reason: StaticOrArc<dyn InvalidationReason>,
146        _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
147    ) -> Pin<
148        Box<dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static>,
149    > {
150        unreachable!()
151    }
152
153    fn start_once_process(
154        &self,
155        _future: std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
156    ) {
157        unreachable!()
158    }
159
160    /// Should not be called on the testing VcStorage. These methods are only implemented for
161    /// structs with access to a `MessageQueue` like `TurboTasks`.
162    fn send_compilation_event(&self, _event: Arc<dyn CompilationEvent>) {
163        unimplemented!()
164    }
165
166    fn get_task_name(&self, task: TaskId) -> String {
167        format!("Task({})", task)
168    }
169}
170
171impl TurboTasksApi for VcStorage {
172    fn invalidate(&self, _task: TaskId) {
173        unreachable!()
174    }
175
176    fn invalidate_with_reason(
177        &self,
178        _task: TaskId,
179        _reason: turbo_tasks::util::StaticOrArc<dyn turbo_tasks::InvalidationReason>,
180    ) {
181        unreachable!()
182    }
183
184    fn invalidate_serialization(&self, _task: TaskId) {
185        // ignore
186    }
187
188    fn try_read_task_output(
189        &self,
190        id: TaskId,
191        _options: ReadOutputOptions,
192    ) -> Result<Result<RawVc, EventListener>> {
193        let tasks = self.tasks.lock().unwrap();
194        let i = *id - 1;
195        let task = tasks.get(i as usize).unwrap();
196        match task {
197            Task::Spawned(event) => Ok(Err(event.listen())),
198            Task::Finished(result) => match result {
199                Ok(vc) => Ok(Ok(*vc)),
200                Err(err) => bail!(err.clone()),
201            },
202        }
203    }
204
205    fn try_read_task_cell(
206        &self,
207        task: TaskId,
208        index: CellId,
209        _options: ReadCellOptions,
210    ) -> Result<Result<TypedCellContent, EventListener>> {
211        let map = self.cells.lock().unwrap();
212        Ok(Ok(if let Some(cell) = map.get(&(task, index)) {
213            cell.clone()
214        } else {
215            Default::default()
216        }
217        .into_typed(index.type_id)))
218    }
219    fn try_read_own_task_cell(
220        &self,
221        current_task: TaskId,
222        index: CellId,
223        options: ReadCellOptions,
224    ) -> Result<TypedCellContent> {
225        self.read_own_task_cell(current_task, index, options)
226    }
227
228    fn try_read_local_output(
229        &self,
230        _execution_id: ExecutionId,
231        _local_task_id: LocalTaskId,
232    ) -> Result<Result<RawVc, EventListener>> {
233        unimplemented!()
234    }
235
236    fn emit_collectible(&self, _trait_type: turbo_tasks::TraitTypeId, _collectible: RawVc) {
237        unimplemented!()
238    }
239
240    fn unemit_collectible(
241        &self,
242        _trait_type: turbo_tasks::TraitTypeId,
243        _collectible: RawVc,
244        _count: u32,
245    ) {
246        unimplemented!()
247    }
248
249    fn unemit_collectibles(
250        &self,
251        _trait_type: turbo_tasks::TraitTypeId,
252        _collectibles: &TaskCollectiblesMap,
253    ) {
254        unimplemented!()
255    }
256
257    fn read_task_collectibles(&self, _task: TaskId, _trait_id: TraitTypeId) -> TaskCollectiblesMap {
258        unimplemented!()
259    }
260
261    fn read_own_task_cell(
262        &self,
263        task: TaskId,
264        index: CellId,
265        _options: ReadCellOptions,
266    ) -> Result<TypedCellContent> {
267        let map = self.cells.lock().unwrap();
268        Ok(if let Some(cell) = map.get(&(task, index)) {
269            cell.to_owned()
270        } else {
271            Default::default()
272        }
273        .into_typed(index.type_id))
274    }
275
276    fn update_own_task_cell(
277        &self,
278        task: TaskId,
279        index: CellId,
280        _is_serializable_cell_content: bool,
281        content: CellContent,
282        _updated_key_hashes: Option<SmallVec<[u64; 2]>>,
283        _content_hash: Option<[u8; 16]>,
284        _verification_mode: VerificationMode,
285    ) {
286        let mut map = self.cells.lock().unwrap();
287        let cell = map.entry((task, index)).or_default();
288        *cell = content;
289    }
290
291    fn connect_task(&self, _task: TaskId) {
292        // no-op
293    }
294
295    fn mark_own_task_as_finished(&self, _task: TaskId) {
296        // no-op
297    }
298
299    fn mark_own_task_as_session_dependent(&self, _task: TaskId) {
300        // no-op
301    }
302
303    fn spawn_detached_for_testing(
304        &self,
305        _f: std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
306    ) {
307        unimplemented!()
308    }
309
310    fn task_statistics(&self) -> &turbo_tasks::task_statistics::TaskStatisticsApi {
311        unimplemented!()
312    }
313
314    fn stop_and_wait(&self) -> std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
315        Box::pin(async {})
316    }
317
318    /// Should not be called on the testing VcStorage. These methods are only implemented for
319    /// structs with access to a `MessageQueue` like `TurboTasks`.
320    fn subscribe_to_compilation_events(
321        &self,
322        _event_types: Option<Vec<String>>,
323    ) -> Receiver<Arc<dyn CompilationEvent>> {
324        unimplemented!()
325    }
326
327    fn is_tracking_dependencies(&self) -> bool {
328        false
329    }
330}
331
332impl VcStorage {
333    pub fn with<T>(f: impl Future<Output = T>) -> impl Future<Output = T> {
334        with_turbo_tasks_for_testing(
335            Arc::new_cyclic(|weak| VcStorage {
336                this: weak.clone(),
337                ..Default::default()
338            }),
339            TaskId::MAX,
340            ExecutionId::MIN,
341            f,
342        )
343    }
344}