turbo_tasks_testing/
lib.rs

1//! Testing utilities and macros for turbo-tasks and applications based on it.
2
3pub mod retry;
4mod run;
5
6use std::{
7    borrow::Cow,
8    future::Future,
9    mem::replace,
10    panic::AssertUnwindSafe,
11    sync::{Arc, Mutex, Weak},
12};
13
14use anyhow::{Result, anyhow};
15use futures::FutureExt;
16use rustc_hash::FxHashMap;
17use tokio::sync::mpsc::Receiver;
18use turbo_tasks::{
19    CellId, ExecutionId, InvalidationReason, LocalTaskId, MagicAny, RawVc, ReadCellOptions,
20    ReadConsistency, TaskId, TaskPersistence, TraitTypeId, TurboTasksApi, TurboTasksCallApi,
21    backend::{CellContent, TaskCollectiblesMap, TypedCellContent},
22    event::{Event, EventListener},
23    message_queue::CompilationEvent,
24    registry,
25    test_helpers::with_turbo_tasks_for_testing,
26    util::{SharedError, StaticOrArc},
27};
28
29pub use crate::run::{Registration, run, run_with_tt, run_without_cache_check};
30
31enum Task {
32    Spawned(Event),
33    Finished(Result<RawVc, SharedError>),
34}
35
36#[derive(Default)]
37pub struct VcStorage {
38    this: Weak<Self>,
39    cells: Mutex<FxHashMap<(TaskId, CellId), CellContent>>,
40    tasks: Mutex<Vec<Task>>,
41}
42
43impl VcStorage {
44    fn dynamic_call(
45        &self,
46        func: turbo_tasks::FunctionId,
47        this_arg: Option<RawVc>,
48        arg: Box<dyn MagicAny>,
49    ) -> RawVc {
50        let this = self.this.upgrade().unwrap();
51        let handle = tokio::runtime::Handle::current();
52        let future = registry::get_function(func).execute(this_arg, &*arg);
53        let i = {
54            let mut tasks = self.tasks.lock().unwrap();
55            let i = tasks.len();
56            tasks.push(Task::Spawned(Event::new(move || {
57                format!("Task({i})::event")
58            })));
59            i
60        };
61        let task_id = TaskId::try_from(u32::try_from(i + 1).unwrap()).unwrap();
62        let execution_id = ExecutionId::try_from(u16::try_from(i + 1).unwrap()).unwrap();
63        handle.spawn(with_turbo_tasks_for_testing(
64            this.clone(),
65            task_id,
66            execution_id,
67            async move {
68                let result = AssertUnwindSafe(future).catch_unwind().await;
69
70                // Convert the unwind panic to an anyhow error that can be cloned.
71                let result = result
72                    .map_err(|any| match any.downcast::<String>() {
73                        Ok(owned) => anyhow!(owned),
74                        Err(any) => match any.downcast::<&'static str>() {
75                            Ok(str) => anyhow!(str),
76                            Err(_) => anyhow!("unknown panic"),
77                        },
78                    })
79                    .and_then(|r| r)
80                    .map_err(SharedError::new);
81
82                let mut tasks = this.tasks.lock().unwrap();
83                if let Task::Spawned(event) = replace(&mut tasks[i], Task::Finished(result)) {
84                    event.notify(usize::MAX);
85                }
86            },
87        ));
88        RawVc::TaskOutput(task_id)
89    }
90}
91
92impl TurboTasksCallApi for VcStorage {
93    fn dynamic_call(
94        &self,
95        func: turbo_tasks::FunctionId,
96        this: Option<RawVc>,
97        arg: Box<dyn MagicAny>,
98        _persistence: TaskPersistence,
99    ) -> RawVc {
100        self.dynamic_call(func, this, arg)
101    }
102    fn native_call(
103        &self,
104        _func: turbo_tasks::FunctionId,
105        _this: Option<RawVc>,
106        _arg: Box<dyn MagicAny>,
107        _persistence: TaskPersistence,
108    ) -> RawVc {
109        unreachable!()
110    }
111
112    fn trait_call(
113        &self,
114        _trait_type: turbo_tasks::TraitTypeId,
115        _trait_fn_name: Cow<'static, str>,
116        _this: RawVc,
117        _arg: Box<dyn MagicAny>,
118        _persistence: TaskPersistence,
119    ) -> RawVc {
120        unreachable!()
121    }
122
123    fn run_once(
124        &self,
125        _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
126    ) -> TaskId {
127        unreachable!()
128    }
129
130    fn run_once_with_reason(
131        &self,
132        _reason: StaticOrArc<dyn InvalidationReason>,
133        _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
134    ) -> TaskId {
135        unreachable!()
136    }
137
138    fn run_once_process(
139        &self,
140        _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
141    ) -> TaskId {
142        unreachable!()
143    }
144}
145
146impl TurboTasksApi for VcStorage {
147    fn pin(&self) -> Arc<dyn TurboTasksApi> {
148        self.this.upgrade().unwrap()
149    }
150
151    fn invalidate(&self, _task: TaskId) {
152        unreachable!()
153    }
154
155    fn invalidate_with_reason(
156        &self,
157        _task: TaskId,
158        _reason: turbo_tasks::util::StaticOrArc<dyn turbo_tasks::InvalidationReason>,
159    ) {
160        unreachable!()
161    }
162
163    fn invalidate_serialization(&self, _task: TaskId) {
164        // ingore
165    }
166
167    fn notify_scheduled_tasks(&self) {
168        // ignore
169    }
170
171    fn try_read_task_output(
172        &self,
173        id: TaskId,
174        _consistency: ReadConsistency,
175    ) -> Result<Result<RawVc, EventListener>> {
176        let tasks = self.tasks.lock().unwrap();
177        let i = *id - 1;
178        let task = tasks.get(i as usize).unwrap();
179        match task {
180            Task::Spawned(event) => Ok(Err(event.listen())),
181            Task::Finished(result) => match result {
182                Ok(vc) => Ok(Ok(*vc)),
183                Err(err) => Err(anyhow!(err.clone())),
184            },
185        }
186    }
187
188    fn try_read_task_output_untracked(
189        &self,
190        task: TaskId,
191        consistency: ReadConsistency,
192    ) -> Result<Result<RawVc, EventListener>> {
193        self.try_read_task_output(task, consistency)
194    }
195
196    fn try_read_task_cell(
197        &self,
198        task: TaskId,
199        index: CellId,
200        _options: ReadCellOptions,
201    ) -> Result<Result<TypedCellContent, EventListener>> {
202        let map = self.cells.lock().unwrap();
203        Ok(Ok(if let Some(cell) = map.get(&(task, index)) {
204            cell.clone()
205        } else {
206            Default::default()
207        }
208        .into_typed(index.type_id)))
209    }
210
211    fn try_read_task_cell_untracked(
212        &self,
213        task: TaskId,
214        index: CellId,
215        _options: ReadCellOptions,
216    ) -> Result<Result<TypedCellContent, EventListener>> {
217        let map = self.cells.lock().unwrap();
218        Ok(Ok(if let Some(cell) = map.get(&(task, index)) {
219            cell.to_owned()
220        } else {
221            Default::default()
222        }
223        .into_typed(index.type_id)))
224    }
225
226    fn try_read_own_task_cell_untracked(
227        &self,
228        current_task: TaskId,
229        index: CellId,
230        options: ReadCellOptions,
231    ) -> Result<TypedCellContent> {
232        self.read_own_task_cell(current_task, index, options)
233    }
234
235    fn try_read_local_output(
236        &self,
237        _execution_id: ExecutionId,
238        _local_task_id: LocalTaskId,
239    ) -> Result<Result<RawVc, EventListener>> {
240        unimplemented!()
241    }
242
243    fn emit_collectible(&self, _trait_type: turbo_tasks::TraitTypeId, _collectible: RawVc) {
244        unimplemented!()
245    }
246
247    fn unemit_collectible(
248        &self,
249        _trait_type: turbo_tasks::TraitTypeId,
250        _collectible: RawVc,
251        _count: u32,
252    ) {
253        unimplemented!()
254    }
255
256    fn unemit_collectibles(
257        &self,
258        _trait_type: turbo_tasks::TraitTypeId,
259        _collectibles: &TaskCollectiblesMap,
260    ) {
261        unimplemented!()
262    }
263
264    fn read_task_collectibles(&self, _task: TaskId, _trait_id: TraitTypeId) -> TaskCollectiblesMap {
265        unimplemented!()
266    }
267
268    fn read_own_task_cell(
269        &self,
270        task: TaskId,
271        index: CellId,
272        _options: ReadCellOptions,
273    ) -> Result<TypedCellContent> {
274        let map = self.cells.lock().unwrap();
275        Ok(if let Some(cell) = map.get(&(task, index)) {
276            cell.to_owned()
277        } else {
278            Default::default()
279        }
280        .into_typed(index.type_id))
281    }
282
283    fn update_own_task_cell(&self, task: TaskId, index: CellId, content: CellContent) {
284        let mut map = self.cells.lock().unwrap();
285        let cell = map.entry((task, index)).or_default();
286        *cell = content;
287    }
288
289    fn connect_task(&self, _task: TaskId) {
290        // no-op
291    }
292
293    fn mark_own_task_as_finished(&self, _task: TaskId) {
294        // no-op
295    }
296
297    fn mark_own_task_as_session_dependent(&self, _task: TaskId) {
298        // no-op
299    }
300
301    fn set_own_task_aggregation_number(&self, _task: TaskId, _aggregation_number: u32) {
302        // no-op
303    }
304
305    fn detached_for_testing(
306        &self,
307        _f: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
308    ) -> std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
309        unimplemented!()
310    }
311
312    fn task_statistics(&self) -> &turbo_tasks::task_statistics::TaskStatisticsApi {
313        unimplemented!()
314    }
315
316    fn stop_and_wait(&self) -> std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
317        Box::pin(async {})
318    }
319
320    /// Should not be called on the testing VcStorage. These methods are only implemented for
321    /// structs with access to a `MessageQueue` like `TurboTasks`.
322    fn subscribe_to_compilation_events(
323        &self,
324        _event_types: Option<Vec<String>>,
325    ) -> Receiver<Arc<dyn CompilationEvent>> {
326        unimplemented!()
327    }
328
329    /// Should not be called on the testing VcStorage. These methods are only implemented for
330    /// structs with access to a `MessageQueue` like `TurboTasks`.
331    fn send_compilation_event(&self, _event: Arc<dyn CompilationEvent>) {
332        unimplemented!()
333    }
334}
335
336impl VcStorage {
337    pub fn with<T>(f: impl Future<Output = T>) -> impl Future<Output = T> {
338        with_turbo_tasks_for_testing(
339            Arc::new_cyclic(|weak| VcStorage {
340                this: weak.clone(),
341                ..Default::default()
342            }),
343            TaskId::MAX,
344            ExecutionId::MIN,
345            f,
346        )
347    }
348}