1pub mod retry;
4mod run;
5
6use std::{
7 future::Future,
8 mem::replace,
9 panic::AssertUnwindSafe,
10 pin::Pin,
11 sync::{Arc, Mutex, Weak},
12};
13
14use anyhow::{Result, anyhow};
15use futures::FutureExt;
16use rustc_hash::FxHashMap;
17use smallvec::SmallVec;
18use tokio::sync::mpsc::Receiver;
19use turbo_tasks::{
20 CellId, ExecutionId, InvalidationReason, LocalTaskId, MagicAny, RawVc, ReadCellOptions,
21 ReadOutputOptions, TaskId, TaskPersistence, TraitTypeId, TurboTasksApi, TurboTasksCallApi,
22 backend::{CellContent, TaskCollectiblesMap, TypedCellContent, VerificationMode},
23 event::{Event, EventListener},
24 message_queue::CompilationEvent,
25 test_helpers::with_turbo_tasks_for_testing,
26 util::{SharedError, StaticOrArc},
27};
28
29pub use crate::run::{
30 Registration, run, run_once, run_once_without_cache_check, run_with_tt, run_without_cache_check,
31};
32
33enum Task {
34 Spawned(Event),
35 Finished(Result<RawVc, SharedError>),
36}
37
38#[derive(Default)]
39pub struct VcStorage {
40 this: Weak<Self>,
41 cells: Mutex<FxHashMap<(TaskId, CellId), CellContent>>,
42 tasks: Mutex<Vec<Task>>,
43}
44
45impl VcStorage {
46 fn dynamic_call(
47 &self,
48 func: &'static turbo_tasks::macro_helpers::NativeFunction,
49 this_arg: Option<RawVc>,
50 arg: Box<dyn MagicAny>,
51 ) -> RawVc {
52 let this = self.this.upgrade().unwrap();
53 let handle = tokio::runtime::Handle::current();
54 let future = func.execute(this_arg, &*arg);
55 let i = {
56 let mut tasks = self.tasks.lock().unwrap();
57 let i = tasks.len();
58 tasks.push(Task::Spawned(Event::new(move || {
59 move || format!("Task({i})::event")
60 })));
61 i
62 };
63 let task_id = TaskId::try_from(u32::try_from(i + 1).unwrap()).unwrap();
64 let execution_id = ExecutionId::try_from(u16::try_from(i + 1).unwrap()).unwrap();
65 handle.spawn(with_turbo_tasks_for_testing(
66 this.clone(),
67 task_id,
68 execution_id,
69 async move {
70 let result = AssertUnwindSafe(future).catch_unwind().await;
71
72 let result = result
74 .map_err(|any| match any.downcast::<String>() {
75 Ok(owned) => anyhow!(owned),
76 Err(any) => match any.downcast::<&'static str>() {
77 Ok(str) => anyhow!(str),
78 Err(_) => anyhow!("unknown panic"),
79 },
80 })
81 .and_then(|r| r)
82 .map_err(SharedError::new);
83
84 let mut tasks = this.tasks.lock().unwrap();
85 if let Task::Spawned(event) = replace(&mut tasks[i], Task::Finished(result)) {
86 event.notify(usize::MAX);
87 }
88 },
89 ));
90 RawVc::TaskOutput(task_id)
91 }
92}
93
94impl TurboTasksCallApi for VcStorage {
95 fn dynamic_call(
96 &self,
97 func: &'static turbo_tasks::macro_helpers::NativeFunction,
98 this: Option<RawVc>,
99 arg: Box<dyn MagicAny>,
100 _persistence: TaskPersistence,
101 ) -> RawVc {
102 self.dynamic_call(func, this, arg)
103 }
104 fn native_call(
105 &self,
106 _func: &'static turbo_tasks::macro_helpers::NativeFunction,
107 _this: Option<RawVc>,
108 _arg: Box<dyn MagicAny>,
109 _persistence: TaskPersistence,
110 ) -> RawVc {
111 unreachable!()
112 }
113
114 fn trait_call(
115 &self,
116 _trait_type: &'static turbo_tasks::TraitMethod,
117 _this: RawVc,
118 _arg: Box<dyn MagicAny>,
119 _persistence: TaskPersistence,
120 ) -> RawVc {
121 unreachable!()
122 }
123
124 fn run(
125 &self,
126 _future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
127 ) -> Pin<
128 Box<dyn Future<Output = Result<(), turbo_tasks::backend::TurboTasksExecutionError>> + Send>,
129 > {
130 unreachable!()
131 }
132
133 fn run_once(
134 &self,
135 _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
136 ) -> Pin<
137 Box<dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static>,
138 > {
139 unreachable!()
140 }
141
142 fn run_once_with_reason(
143 &self,
144 _reason: StaticOrArc<dyn InvalidationReason>,
145 _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
146 ) -> Pin<
147 Box<dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static>,
148 > {
149 unreachable!()
150 }
151
152 fn start_once_process(
153 &self,
154 _future: std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
155 ) {
156 unreachable!()
157 }
158}
159
160impl TurboTasksApi for VcStorage {
161 fn invalidate(&self, _task: TaskId) {
162 unreachable!()
163 }
164
165 fn invalidate_with_reason(
166 &self,
167 _task: TaskId,
168 _reason: turbo_tasks::util::StaticOrArc<dyn turbo_tasks::InvalidationReason>,
169 ) {
170 unreachable!()
171 }
172
173 fn invalidate_serialization(&self, _task: TaskId) {
174 }
176
177 fn try_read_task_output(
178 &self,
179 id: TaskId,
180 _options: ReadOutputOptions,
181 ) -> Result<Result<RawVc, EventListener>> {
182 let tasks = self.tasks.lock().unwrap();
183 let i = *id - 1;
184 let task = tasks.get(i as usize).unwrap();
185 match task {
186 Task::Spawned(event) => Ok(Err(event.listen())),
187 Task::Finished(result) => match result {
188 Ok(vc) => Ok(Ok(*vc)),
189 Err(err) => Err(anyhow!(err.clone())),
190 },
191 }
192 }
193
194 fn try_read_task_cell(
195 &self,
196 task: TaskId,
197 index: CellId,
198 _options: ReadCellOptions,
199 ) -> Result<Result<TypedCellContent, EventListener>> {
200 let map = self.cells.lock().unwrap();
201 Ok(Ok(if let Some(cell) = map.get(&(task, index)) {
202 cell.clone()
203 } else {
204 Default::default()
205 }
206 .into_typed(index.type_id)))
207 }
208 fn try_read_own_task_cell(
209 &self,
210 current_task: TaskId,
211 index: CellId,
212 options: ReadCellOptions,
213 ) -> Result<TypedCellContent> {
214 self.read_own_task_cell(current_task, index, options)
215 }
216
217 fn try_read_local_output(
218 &self,
219 _execution_id: ExecutionId,
220 _local_task_id: LocalTaskId,
221 ) -> Result<Result<RawVc, EventListener>> {
222 unimplemented!()
223 }
224
225 fn emit_collectible(&self, _trait_type: turbo_tasks::TraitTypeId, _collectible: RawVc) {
226 unimplemented!()
227 }
228
229 fn unemit_collectible(
230 &self,
231 _trait_type: turbo_tasks::TraitTypeId,
232 _collectible: RawVc,
233 _count: u32,
234 ) {
235 unimplemented!()
236 }
237
238 fn unemit_collectibles(
239 &self,
240 _trait_type: turbo_tasks::TraitTypeId,
241 _collectibles: &TaskCollectiblesMap,
242 ) {
243 unimplemented!()
244 }
245
246 fn read_task_collectibles(&self, _task: TaskId, _trait_id: TraitTypeId) -> TaskCollectiblesMap {
247 unimplemented!()
248 }
249
250 fn read_own_task_cell(
251 &self,
252 task: TaskId,
253 index: CellId,
254 _options: ReadCellOptions,
255 ) -> Result<TypedCellContent> {
256 let map = self.cells.lock().unwrap();
257 Ok(if let Some(cell) = map.get(&(task, index)) {
258 cell.to_owned()
259 } else {
260 Default::default()
261 }
262 .into_typed(index.type_id))
263 }
264
265 fn update_own_task_cell(
266 &self,
267 task: TaskId,
268 index: CellId,
269 _is_serializable_cell_content: bool,
270 content: CellContent,
271 _updated_key_hashes: Option<SmallVec<[u64; 2]>>,
272 _verification_mode: VerificationMode,
273 ) {
274 let mut map = self.cells.lock().unwrap();
275 let cell = map.entry((task, index)).or_default();
276 *cell = content;
277 }
278
279 fn connect_task(&self, _task: TaskId) {
280 }
282
283 fn mark_own_task_as_finished(&self, _task: TaskId) {
284 }
286
287 fn mark_own_task_as_session_dependent(&self, _task: TaskId) {
288 }
290
291 fn set_own_task_aggregation_number(&self, _task: TaskId, _aggregation_number: u32) {
292 }
294
295 fn spawn_detached_for_testing(
296 &self,
297 _f: std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
298 ) {
299 unimplemented!()
300 }
301
302 fn task_statistics(&self) -> &turbo_tasks::task_statistics::TaskStatisticsApi {
303 unimplemented!()
304 }
305
306 fn stop_and_wait(&self) -> std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
307 Box::pin(async {})
308 }
309
310 fn subscribe_to_compilation_events(
313 &self,
314 _event_types: Option<Vec<String>>,
315 ) -> Receiver<Arc<dyn CompilationEvent>> {
316 unimplemented!()
317 }
318
319 fn send_compilation_event(&self, _event: Arc<dyn CompilationEvent>) {
322 unimplemented!()
323 }
324
325 fn is_tracking_dependencies(&self) -> bool {
326 false
327 }
328}
329
330impl VcStorage {
331 pub fn with<T>(f: impl Future<Output = T>) -> impl Future<Output = T> {
332 with_turbo_tasks_for_testing(
333 Arc::new_cyclic(|weak| VcStorage {
334 this: weak.clone(),
335 ..Default::default()
336 }),
337 TaskId::MAX,
338 ExecutionId::MIN,
339 f,
340 )
341 }
342}