1pub mod retry;
4mod run;
5
6use std::{
7 future::Future,
8 mem::replace,
9 panic::AssertUnwindSafe,
10 pin::Pin,
11 sync::{Arc, Mutex, Weak},
12};
13
14use anyhow::{Result, anyhow};
15use futures::FutureExt;
16use rustc_hash::FxHashMap;
17use tokio::sync::mpsc::Receiver;
18use turbo_tasks::{
19 CellId, ExecutionId, InvalidationReason, LocalTaskId, MagicAny, RawVc, ReadCellOptions,
20 ReadOutputOptions, TaskId, TaskPersistence, TraitTypeId, TurboTasksApi, TurboTasksCallApi,
21 backend::{CellContent, TaskCollectiblesMap, TypedCellContent, VerificationMode},
22 event::{Event, EventListener},
23 message_queue::CompilationEvent,
24 test_helpers::with_turbo_tasks_for_testing,
25 util::{SharedError, StaticOrArc},
26};
27
28pub use crate::run::{
29 Registration, run, run_once, run_once_without_cache_check, run_with_tt, run_without_cache_check,
30};
31
32enum Task {
33 Spawned(Event),
34 Finished(Result<RawVc, SharedError>),
35}
36
37#[derive(Default)]
38pub struct VcStorage {
39 this: Weak<Self>,
40 cells: Mutex<FxHashMap<(TaskId, CellId), CellContent>>,
41 tasks: Mutex<Vec<Task>>,
42}
43
44impl VcStorage {
45 fn dynamic_call(
46 &self,
47 func: &'static turbo_tasks::macro_helpers::NativeFunction,
48 this_arg: Option<RawVc>,
49 arg: Box<dyn MagicAny>,
50 ) -> RawVc {
51 let this = self.this.upgrade().unwrap();
52 let handle = tokio::runtime::Handle::current();
53 let future = func.execute(this_arg, &*arg);
54 let i = {
55 let mut tasks = self.tasks.lock().unwrap();
56 let i = tasks.len();
57 tasks.push(Task::Spawned(Event::new(move || {
58 move || format!("Task({i})::event")
59 })));
60 i
61 };
62 let task_id = TaskId::try_from(u32::try_from(i + 1).unwrap()).unwrap();
63 let execution_id = ExecutionId::try_from(u16::try_from(i + 1).unwrap()).unwrap();
64 handle.spawn(with_turbo_tasks_for_testing(
65 this.clone(),
66 task_id,
67 execution_id,
68 async move {
69 let result = AssertUnwindSafe(future).catch_unwind().await;
70
71 let result = result
73 .map_err(|any| match any.downcast::<String>() {
74 Ok(owned) => anyhow!(owned),
75 Err(any) => match any.downcast::<&'static str>() {
76 Ok(str) => anyhow!(str),
77 Err(_) => anyhow!("unknown panic"),
78 },
79 })
80 .and_then(|r| r)
81 .map_err(SharedError::new);
82
83 let mut tasks = this.tasks.lock().unwrap();
84 if let Task::Spawned(event) = replace(&mut tasks[i], Task::Finished(result)) {
85 event.notify(usize::MAX);
86 }
87 },
88 ));
89 RawVc::TaskOutput(task_id)
90 }
91}
92
93impl TurboTasksCallApi for VcStorage {
94 fn dynamic_call(
95 &self,
96 func: &'static turbo_tasks::macro_helpers::NativeFunction,
97 this: Option<RawVc>,
98 arg: Box<dyn MagicAny>,
99 _persistence: TaskPersistence,
100 ) -> RawVc {
101 self.dynamic_call(func, this, arg)
102 }
103 fn native_call(
104 &self,
105 _func: &'static turbo_tasks::macro_helpers::NativeFunction,
106 _this: Option<RawVc>,
107 _arg: Box<dyn MagicAny>,
108 _persistence: TaskPersistence,
109 ) -> RawVc {
110 unreachable!()
111 }
112
113 fn trait_call(
114 &self,
115 _trait_type: &'static turbo_tasks::TraitMethod,
116 _this: RawVc,
117 _arg: Box<dyn MagicAny>,
118 _persistence: TaskPersistence,
119 ) -> RawVc {
120 unreachable!()
121 }
122
123 fn run(
124 &self,
125 _future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
126 ) -> Pin<
127 Box<dyn Future<Output = Result<(), turbo_tasks::backend::TurboTasksExecutionError>> + Send>,
128 > {
129 unreachable!()
130 }
131
132 fn run_once(
133 &self,
134 _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
135 ) -> Pin<
136 Box<dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static>,
137 > {
138 unreachable!()
139 }
140
141 fn run_once_with_reason(
142 &self,
143 _reason: StaticOrArc<dyn InvalidationReason>,
144 _future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
145 ) -> Pin<
146 Box<dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static>,
147 > {
148 unreachable!()
149 }
150
151 fn start_once_process(
152 &self,
153 _future: std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
154 ) {
155 unreachable!()
156 }
157}
158
159impl TurboTasksApi for VcStorage {
160 fn invalidate(&self, _task: TaskId) {
161 unreachable!()
162 }
163
164 fn invalidate_with_reason(
165 &self,
166 _task: TaskId,
167 _reason: turbo_tasks::util::StaticOrArc<dyn turbo_tasks::InvalidationReason>,
168 ) {
169 unreachable!()
170 }
171
172 fn invalidate_serialization(&self, _task: TaskId) {
173 }
175
176 fn try_read_task_output(
177 &self,
178 id: TaskId,
179 _options: ReadOutputOptions,
180 ) -> Result<Result<RawVc, EventListener>> {
181 let tasks = self.tasks.lock().unwrap();
182 let i = *id - 1;
183 let task = tasks.get(i as usize).unwrap();
184 match task {
185 Task::Spawned(event) => Ok(Err(event.listen())),
186 Task::Finished(result) => match result {
187 Ok(vc) => Ok(Ok(*vc)),
188 Err(err) => Err(anyhow!(err.clone())),
189 },
190 }
191 }
192
193 fn try_read_task_cell(
194 &self,
195 task: TaskId,
196 index: CellId,
197 _options: ReadCellOptions,
198 ) -> Result<Result<TypedCellContent, EventListener>> {
199 let map = self.cells.lock().unwrap();
200 Ok(Ok(if let Some(cell) = map.get(&(task, index)) {
201 cell.clone()
202 } else {
203 Default::default()
204 }
205 .into_typed(index.type_id)))
206 }
207 fn try_read_own_task_cell(
208 &self,
209 current_task: TaskId,
210 index: CellId,
211 options: ReadCellOptions,
212 ) -> Result<TypedCellContent> {
213 self.read_own_task_cell(current_task, index, options)
214 }
215
216 fn try_read_local_output(
217 &self,
218 _execution_id: ExecutionId,
219 _local_task_id: LocalTaskId,
220 ) -> Result<Result<RawVc, EventListener>> {
221 unimplemented!()
222 }
223
224 fn emit_collectible(&self, _trait_type: turbo_tasks::TraitTypeId, _collectible: RawVc) {
225 unimplemented!()
226 }
227
228 fn unemit_collectible(
229 &self,
230 _trait_type: turbo_tasks::TraitTypeId,
231 _collectible: RawVc,
232 _count: u32,
233 ) {
234 unimplemented!()
235 }
236
237 fn unemit_collectibles(
238 &self,
239 _trait_type: turbo_tasks::TraitTypeId,
240 _collectibles: &TaskCollectiblesMap,
241 ) {
242 unimplemented!()
243 }
244
245 fn read_task_collectibles(&self, _task: TaskId, _trait_id: TraitTypeId) -> TaskCollectiblesMap {
246 unimplemented!()
247 }
248
249 fn read_own_task_cell(
250 &self,
251 task: TaskId,
252 index: CellId,
253 _options: ReadCellOptions,
254 ) -> Result<TypedCellContent> {
255 let map = self.cells.lock().unwrap();
256 Ok(if let Some(cell) = map.get(&(task, index)) {
257 cell.to_owned()
258 } else {
259 Default::default()
260 }
261 .into_typed(index.type_id))
262 }
263
264 fn update_own_task_cell(
265 &self,
266 task: TaskId,
267 index: CellId,
268 content: CellContent,
269 _verification_mode: VerificationMode,
270 ) {
271 let mut map = self.cells.lock().unwrap();
272 let cell = map.entry((task, index)).or_default();
273 *cell = content;
274 }
275
276 fn connect_task(&self, _task: TaskId) {
277 }
279
280 fn mark_own_task_as_finished(&self, _task: TaskId) {
281 }
283
284 fn mark_own_task_as_session_dependent(&self, _task: TaskId) {
285 }
287
288 fn set_own_task_aggregation_number(&self, _task: TaskId, _aggregation_number: u32) {
289 }
291
292 fn detached_for_testing(
293 &self,
294 _f: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
295 ) -> std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>> {
296 unimplemented!()
297 }
298
299 fn task_statistics(&self) -> &turbo_tasks::task_statistics::TaskStatisticsApi {
300 unimplemented!()
301 }
302
303 fn stop_and_wait(&self) -> std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
304 Box::pin(async {})
305 }
306
307 fn subscribe_to_compilation_events(
310 &self,
311 _event_types: Option<Vec<String>>,
312 ) -> Receiver<Arc<dyn CompilationEvent>> {
313 unimplemented!()
314 }
315
316 fn send_compilation_event(&self, _event: Arc<dyn CompilationEvent>) {
319 unimplemented!()
320 }
321
322 fn is_tracking_dependencies(&self) -> bool {
323 false
324 }
325}
326
327impl VcStorage {
328 pub fn with<T>(f: impl Future<Output = T>) -> impl Future<Output = T> {
329 with_turbo_tasks_for_testing(
330 Arc::new_cyclic(|weak| VcStorage {
331 this: weak.clone(),
332 ..Default::default()
333 }),
334 TaskId::MAX,
335 ExecutionId::MIN,
336 f,
337 )
338 }
339}