Skip to main content

turbo_tasks_testing/
lib.rs

1//! Testing utilities and macros for turbo-tasks and applications based on it.
2
3pub mod retry;
4mod run;
5
6use std::{
7    future::Future,
8    mem::replace,
9    panic::AssertUnwindSafe,
10    pin::Pin,
11    sync::{Arc, Mutex, Weak},
12};
13
14use anyhow::{Result, anyhow, bail};
15use futures::FutureExt;
16use rustc_hash::FxHashMap;
17use smallvec::SmallVec;
18use tokio::sync::mpsc::Receiver;
19use turbo_tasks::{
20    CellId, DynTaskInputs, ExecutionId, InvalidationReason, LocalTaskId, RawVc, ReadCellOptions,
21    ReadOutputOptions, StackDynTaskInputs, TaskId, TaskPersistence, TraitTypeId, TurboTasksApi,
22    TurboTasksCallApi,
23    backend::{CellContent, TaskCollectiblesMap, TypedCellContent, VerificationMode},
24    event::{Event, EventListener},
25    message_queue::CompilationEvent,
26    test_helpers::with_turbo_tasks_for_testing,
27    util::{SharedError, StaticOrArc},
28};
29
30pub use crate::run::{
31    Registration, run, run_once, run_once_without_cache_check, run_with_tt, run_without_cache_check,
32};
33
34enum Task {
35    Spawned(Event),
36    Finished(Result<RawVc, SharedError>),
37}
38
39#[derive(Default)]
40pub struct VcStorage {
41    this: Weak<Self>,
42    cells: Mutex<FxHashMap<(TaskId, CellId), CellContent>>,
43    tasks: Mutex<Vec<Task>>,
44}
45
46impl VcStorage {
47    fn dynamic_call(
48        &self,
49        func: &'static turbo_tasks::macro_helpers::NativeFunction,
50        this_arg: Option<RawVc>,
51        arg: Box<dyn DynTaskInputs>,
52    ) -> RawVc {
53        let this = self.this.upgrade().unwrap();
54        let handle = tokio::runtime::Handle::current();
55        let future = func.execute(this_arg, &*arg);
56        let i = {
57            let mut tasks = self.tasks.lock().unwrap();
58            let i = tasks.len();
59            tasks.push(Task::Spawned(Event::new(move || {
60                move || format!("Task({i})::event")
61            })));
62            i
63        };
64        let task_id = TaskId::try_from(u32::try_from(i + 1).unwrap()).unwrap();
65        let execution_id = ExecutionId::try_from(u16::try_from(i + 1).unwrap()).unwrap();
66        handle.spawn(with_turbo_tasks_for_testing(
67            this.clone(),
68            task_id,
69            execution_id,
70            async move {
71                let result = AssertUnwindSafe(future).catch_unwind().await;
72
73                // Convert the unwind panic to an anyhow error that can be cloned.
74                let result = result
75                    .map_err(|any| match any.downcast::<String>() {
76                        Ok(owned) => anyhow!(owned),
77                        Err(any) => match any.downcast::<&'static str>() {
78                            Ok(str) => anyhow!(str),
79                            Err(_) => anyhow!("unknown panic"),
80                        },
81                    })
82                    .and_then(|r| r)
83                    .map_err(SharedError::new);
84
85                let mut tasks = this.tasks.lock().unwrap();
86                if let Task::Spawned(event) = replace(&mut tasks[i], Task::Finished(result)) {
87                    event.notify(usize::MAX);
88                }
89            },
90        ));
91        RawVc::TaskOutput(task_id)
92    }
93}
94
95impl TurboTasksCallApi for VcStorage {
96    fn dynamic_call(
97        &self,
98        func: &'static turbo_tasks::macro_helpers::NativeFunction,
99        this: Option<RawVc>,
100        arg: &mut dyn StackDynTaskInputs,
101        _persistence: TaskPersistence,
102    ) -> RawVc {
103        self.dynamic_call(func, this, arg.take_box())
104    }
105    fn native_call(
106        &self,
107        func: &'static turbo_tasks::macro_helpers::NativeFunction,
108        this: Option<RawVc>,
109        arg: &mut dyn StackDynTaskInputs,
110        _persistence: TaskPersistence,
111    ) -> RawVc {
112        self.dynamic_call(func, this, arg.take_box())
113    }
114
115    fn trait_call(
116        &self,
117        _trait_type: &'static turbo_tasks::TraitMethod,
118        _this: RawVc,
119        _arg: &mut dyn StackDynTaskInputs,
120        _persistence: TaskPersistence,
121    ) -> RawVc {
122        unreachable!()
123    }
124
125    fn run(
126        &self,
127        _future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
128    ) -> Pin<
129        Box<dyn Future<Output = Result<(), turbo_tasks::backend::TurboTasksExecutionError>> + Send>,
130    > {
131        unreachable!()
132    }
133
134    fn run_once(
135        &self,
136        _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
137    ) -> Pin<
138        Box<dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static>,
139    > {
140        unreachable!()
141    }
142
143    fn run_once_with_reason(
144        &self,
145        _reason: StaticOrArc<dyn InvalidationReason>,
146        _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
147    ) -> Pin<
148        Box<dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static>,
149    > {
150        unreachable!()
151    }
152
153    fn start_once_process(
154        &self,
155        _future: std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
156    ) {
157        unreachable!()
158    }
159
160    /// Should not be called on the testing VcStorage. These methods are only implemented for
161    /// structs with access to a `MessageQueue` like `TurboTasks`.
162    fn send_compilation_event(&self, _event: Arc<dyn CompilationEvent>) {
163        unimplemented!()
164    }
165
166    fn get_task_name(&self, task: TaskId) -> String {
167        format!("Task({})", task)
168    }
169}
170
171impl TurboTasksApi for VcStorage {
172    fn invalidate(&self, _task: TaskId) {
173        unreachable!()
174    }
175
176    fn invalidate_with_reason(
177        &self,
178        _task: TaskId,
179        _reason: turbo_tasks::util::StaticOrArc<dyn turbo_tasks::InvalidationReason>,
180    ) {
181        unreachable!()
182    }
183
184    fn invalidate_serialization(&self, _task: TaskId) {
185        // ignore
186    }
187
188    fn try_read_task_output(
189        &self,
190        id: TaskId,
191        _options: ReadOutputOptions,
192    ) -> Result<Result<RawVc, EventListener>> {
193        let tasks = self.tasks.lock().unwrap();
194        let i = *id - 1;
195        let task = tasks.get(i as usize).unwrap();
196        match task {
197            Task::Spawned(event) => Ok(Err(event.listen())),
198            Task::Finished(result) => match result {
199                Ok(vc) => Ok(Ok(*vc)),
200                Err(err) => bail!(err.clone()),
201            },
202        }
203    }
204
205    fn try_read_task_cell(
206        &self,
207        task: TaskId,
208        index: CellId,
209        _options: ReadCellOptions,
210    ) -> Result<Result<TypedCellContent, EventListener>> {
211        let map = self.cells.lock().unwrap();
212        Ok(Ok(if let Some(cell) = map.get(&(task, index)) {
213            cell.clone()
214        } else {
215            Default::default()
216        }
217        .into_typed(index.type_id)))
218    }
219    fn try_read_own_task_cell(
220        &self,
221        current_task: TaskId,
222        index: CellId,
223    ) -> Result<TypedCellContent> {
224        self.read_own_task_cell(current_task, index)
225    }
226
227    fn try_read_local_output(
228        &self,
229        _execution_id: ExecutionId,
230        _local_task_id: LocalTaskId,
231    ) -> Result<Result<RawVc, EventListener>> {
232        unimplemented!()
233    }
234
235    fn emit_collectible(&self, _trait_type: turbo_tasks::TraitTypeId, _collectible: RawVc) {
236        unimplemented!()
237    }
238
239    fn unemit_collectible(
240        &self,
241        _trait_type: turbo_tasks::TraitTypeId,
242        _collectible: RawVc,
243        _count: u32,
244    ) {
245        unimplemented!()
246    }
247
248    fn unemit_collectibles(
249        &self,
250        _trait_type: turbo_tasks::TraitTypeId,
251        _collectibles: &TaskCollectiblesMap,
252    ) {
253        unimplemented!()
254    }
255
256    fn read_task_collectibles(&self, _task: TaskId, _trait_id: TraitTypeId) -> TaskCollectiblesMap {
257        unimplemented!()
258    }
259
260    fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result<TypedCellContent> {
261        let map = self.cells.lock().unwrap();
262        Ok(if let Some(cell) = map.get(&(task, index)) {
263            cell.to_owned()
264        } else {
265            Default::default()
266        }
267        .into_typed(index.type_id))
268    }
269
270    fn update_own_task_cell(
271        &self,
272        task: TaskId,
273        index: CellId,
274        content: CellContent,
275        _updated_key_hashes: Option<SmallVec<[u64; 2]>>,
276        _content_hash: Option<[u8; 16]>,
277        _verification_mode: VerificationMode,
278    ) {
279        let mut map = self.cells.lock().unwrap();
280        let cell = map.entry((task, index)).or_default();
281        *cell = content;
282    }
283
284    fn connect_task(&self, _task: TaskId) {
285        // no-op
286    }
287
288    fn mark_own_task_as_finished(&self, _task: TaskId) {
289        // no-op
290    }
291
292    fn mark_own_task_as_session_dependent(&self, _task: TaskId) {
293        // no-op
294    }
295
296    fn spawn_detached_for_testing(
297        &self,
298        _f: std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
299    ) {
300        unimplemented!()
301    }
302
303    fn task_statistics(&self) -> &turbo_tasks::task_statistics::TaskStatisticsApi {
304        unimplemented!()
305    }
306
307    fn stop_and_wait(&self) -> std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
308        Box::pin(async {})
309    }
310
311    /// Should not be called on the testing VcStorage. These methods are only implemented for
312    /// structs with access to a `MessageQueue` like `TurboTasks`.
313    fn subscribe_to_compilation_events(
314        &self,
315        _event_types: Option<Vec<String>>,
316    ) -> Receiver<Arc<dyn CompilationEvent>> {
317        unimplemented!()
318    }
319
320    fn is_tracking_dependencies(&self) -> bool {
321        false
322    }
323}
324
325impl VcStorage {
326    pub fn with<T>(f: impl Future<Output = T>) -> impl Future<Output = T> {
327        with_turbo_tasks_for_testing(
328            Arc::new_cyclic(|weak| VcStorage {
329                this: weak.clone(),
330                ..Default::default()
331            }),
332            TaskId::MAX,
333            ExecutionId::MIN,
334            f,
335        )
336    }
337}