1use std::{
2 fmt::{Debug, Display},
3 future::Future,
4 pin::Pin,
5 sync::Arc,
6 task::{Poll, ready},
7};
8
9use anyhow::Result;
10use auto_hash_map::AutoSet;
11use bincode::{Decode, Encode};
12use serde::{Deserialize, Serialize};
13
14use crate::{
15 CollectiblesSource, ReadCellOptions, ReadConsistency, ReadOutputOptions, ResolvedVc, TaskId,
16 TaskPersistence, TraitTypeId, ValueTypeId, VcValueTrait,
17 backend::TypedCellContent,
18 event::EventListener,
19 id::{ExecutionId, LocalTaskId},
20 manager::{
21 ReadCellTracking, ReadTracking, SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK,
22 TurboTasksApi, read_local_output, with_turbo_tasks,
23 },
24 registry::get_value_type,
25 turbo_tasks,
26};
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
29pub struct CellId {
30 pub type_id: ValueTypeId,
31 pub index: u32,
32}
33
34impl Display for CellId {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 write!(f, "{}#{}", get_value_type(self.type_id).ty.name, self.index)
37 }
38}
39
40#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
51pub enum RawVc {
52 TaskOutput(TaskId),
55 TaskCell(TaskId, CellId),
60 LocalOutput(ExecutionId, LocalTaskId, TaskPersistence),
70}
71
72impl Debug for RawVc {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 match self {
75 RawVc::TaskOutput(task_id) => f
76 .debug_tuple("RawVc::TaskOutput")
77 .field(&**task_id)
78 .finish(),
79 RawVc::TaskCell(task_id, cell_id) => f
80 .debug_tuple("RawVc::TaskCell")
81 .field(&**task_id)
82 .field(&cell_id.to_string())
83 .finish(),
84 RawVc::LocalOutput(execution_id, local_task_id, task_persistence) => f
85 .debug_tuple("RawVc::LocalOutput")
86 .field(&**execution_id)
87 .field(&**local_task_id)
88 .field(task_persistence)
89 .finish(),
90 }
91 }
92}
93
94impl Display for RawVc {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 match self {
97 RawVc::TaskOutput(task_id) => write!(f, "output of task {}", **task_id),
98 RawVc::TaskCell(task_id, cell_id) => {
99 write!(f, "{} of task {}", cell_id, **task_id)
100 }
101 RawVc::LocalOutput(execution_id, local_task_id, task_persistence) => write!(
102 f,
103 "output of local task {} ({}, {})",
104 **local_task_id, **execution_id, task_persistence
105 ),
106 }
107 }
108}
109
110impl RawVc {
111 pub fn is_resolved(&self) -> bool {
112 match self {
113 RawVc::TaskOutput(..) => false,
114 RawVc::TaskCell(..) => true,
115 RawVc::LocalOutput(..) => false,
116 }
117 }
118
119 pub fn is_local(&self) -> bool {
120 match self {
121 RawVc::TaskOutput(..) => false,
122 RawVc::TaskCell(..) => false,
123 RawVc::LocalOutput(..) => true,
124 }
125 }
126
127 pub fn is_transient(&self) -> bool {
132 match self {
133 RawVc::TaskOutput(task) | RawVc::TaskCell(task, ..) => task.is_transient(),
134 RawVc::LocalOutput(_, _, persistence) => *persistence == TaskPersistence::Transient,
135 }
136 }
137
138 pub(crate) fn into_read(self) -> ReadRawVcFuture {
139 ReadRawVcFuture::new(self)
142 }
143
144 pub(crate) fn resolve(self) -> ResolveRawVcFuture {
146 ResolveRawVcFuture::new(self)
147 }
148
149 pub async fn to_non_local(self) -> Result<RawVc> {
152 Ok(match self {
153 RawVc::LocalOutput(execution_id, local_task_id, ..) => {
154 let tt = turbo_tasks();
155 let local_output = read_local_output(&*tt, execution_id, local_task_id).await?;
156 debug_assert!(
157 !matches!(local_output, RawVc::LocalOutput(_, _, _)),
158 "a LocalOutput cannot point at other LocalOutputs"
159 );
160 local_output
161 }
162 non_local => non_local,
163 })
164 }
165
166 pub(crate) fn to_non_local_unchecked_sync(self, tt: &dyn TurboTasksApi) -> Result<RawVc> {
172 Ok(match self {
173 RawVc::LocalOutput(execution_id, local_task_id, ..) => {
174 let local_output = match tt.try_read_local_output(execution_id, local_task_id)? {
175 Ok(raw_vc) => raw_vc,
176 Err(_event_listener) => unreachable!("local output is not ready yet"),
177 };
178 debug_assert!(
179 !matches!(local_output, RawVc::LocalOutput(_, _, _)),
180 "a LocalOutput cannot point at other LocalOutputs"
181 );
182 local_output
183 }
184 non_local => non_local,
185 })
186 }
187
188 pub(crate) fn connect(&self) {
189 let RawVc::TaskOutput(task_id) = self else {
190 panic!("RawVc::connect() must only be called on a RawVc::TaskOutput");
191 };
192 let tt = turbo_tasks();
193 tt.connect_task(*task_id);
194 }
195
196 pub fn try_get_task_id(&self) -> Option<TaskId> {
197 match self {
198 RawVc::TaskOutput(t) | RawVc::TaskCell(t, ..) => Some(*t),
199 RawVc::LocalOutput(..) => None,
200 }
201 }
202
203 pub fn try_get_type_id(&self) -> Option<ValueTypeId> {
204 match self {
205 RawVc::TaskCell(_, CellId { type_id, .. }) => Some(*type_id),
206 RawVc::TaskOutput(..) | RawVc::LocalOutput(..) => None,
207 }
208 }
209
210 pub(crate) fn resolved_has_trait(&self, trait_id: TraitTypeId) -> bool {
213 match self {
214 RawVc::TaskCell(_task_id, cell_id) => {
215 get_value_type(cell_id.type_id).has_trait(&trait_id)
216 }
217 _ => unreachable!("resolved_has_trait must be called with a RawVc::TaskCell"),
218 }
219 }
220
221 pub(crate) fn resolved_is_type(&self, type_id: ValueTypeId) -> bool {
224 match self {
225 RawVc::TaskCell(_task_id, cell_id) => cell_id.type_id == type_id,
226 _ => unreachable!("resolved_is_type must be called with a RawVc::TaskCell"),
227 }
228 }
229}
230
231impl CollectiblesSource for RawVc {
233 fn peek_collectibles<T: VcValueTrait + ?Sized>(self) -> AutoSet<ResolvedVc<T>> {
234 let RawVc::TaskOutput(task_id) = self else {
235 panic!(
236 "<RawVc as CollectiblesSource>::peek_collectibles() must only be called on a \
237 RawVc::TaskOutput"
238 );
239 };
240 let tt = turbo_tasks();
241 let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
242 map.into_iter()
243 .filter_map(|(raw, count)| (count > 0).then_some(raw.try_into().unwrap()))
244 .collect()
245 }
246
247 fn take_collectibles<T: VcValueTrait + ?Sized>(self) -> AutoSet<ResolvedVc<T>> {
248 let RawVc::TaskOutput(task_id) = self else {
249 panic!(
250 "<RawVc as CollectiblesSource>::take_collectibles() must only be called on a \
251 RawVc::TaskOutput"
252 );
253 };
254 let tt = turbo_tasks();
255 let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
256 tt.unemit_collectibles(T::get_trait_type_id(), &map);
257 map.into_iter()
258 .filter_map(|(raw, count)| (count > 0).then_some(raw.try_into().unwrap()))
259 .collect()
260 }
261
262 fn drop_collectibles<T: VcValueTrait + ?Sized>(self) {
263 let RawVc::TaskOutput(task_id) = self else {
264 panic!(
265 "<RawVc as CollectiblesSource>::drop_collectibles() must only be called on a \
266 RawVc::TaskOutput"
267 );
268 };
269 let tt = turbo_tasks();
270 let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
271 tt.unemit_collectibles(T::get_trait_type_id(), &map);
272 }
273}
274
275fn poll_listener(
278 listener: &mut Option<EventListener>,
279 cx: &mut std::task::Context<'_>,
280) -> Poll<()> {
281 if let Some(l) = listener {
282 ready!(Pin::new(l).poll(cx));
283 *listener = None;
284 }
285 Poll::Ready(())
286}
287
288fn suppress_top_level_task_check<R>(strongly_consistent: bool, f: impl FnOnce() -> R) -> R {
295 if cfg!(debug_assertions) && strongly_consistent {
296 SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK.sync_scope(true, f)
298 } else {
299 f()
300 }
301}
302
303#[must_use]
304pub struct ResolveRawVcFuture {
305 current: RawVc,
306 read_output_options: ReadOutputOptions,
307 strongly_consistent: bool,
310 listener: Option<EventListener>,
311}
312
313impl ResolveRawVcFuture {
314 fn new(vc: RawVc) -> Self {
315 ResolveRawVcFuture {
316 current: vc,
317 read_output_options: ReadOutputOptions::default(),
318 strongly_consistent: false,
319 listener: None,
320 }
321 }
322
323 pub fn strongly_consistent(mut self) -> Self {
324 self.strongly_consistent = true;
325 self.read_output_options.consistency = ReadConsistency::Strong;
326 self
327 }
328
329 pub(crate) fn track_with_key(mut self) -> Self {
332 self.read_output_options.tracking = ReadTracking::Tracked;
333 self
334 }
335
336 pub(crate) fn untracked(mut self) -> Self {
339 self.read_output_options.tracking = ReadTracking::TrackOnlyError;
340 self
341 }
342}
343
344impl Future for ResolveRawVcFuture {
345 type Output = Result<RawVc>;
346
347 #[inline(never)]
348 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
349 let this = unsafe { self.get_unchecked_mut() };
351
352 let poll_fn = |tt: &Arc<dyn TurboTasksApi>| -> Poll<Self::Output> {
353 'outer: loop {
354 ready!(poll_listener(&mut this.listener, cx));
355 let listener = match this.current {
356 RawVc::TaskOutput(task) => {
357 let read_result = tt.try_read_task_output(task, this.read_output_options);
358 match read_result {
359 Ok(Ok(vc)) => {
360 this.read_output_options.consistency = ReadConsistency::Eventual;
369 this.current = vc;
370 continue 'outer;
371 }
372 Ok(Err(listener)) => listener,
373 Err(err) => return Poll::Ready(Err(err)),
374 }
375 }
376 RawVc::TaskCell(_, _) => return Poll::Ready(Ok(this.current)),
377 RawVc::LocalOutput(execution_id, local_task_id, ..) => {
378 debug_assert_eq!(
379 this.read_output_options.consistency,
380 ReadConsistency::Eventual
381 );
382 let read_result = tt.try_read_local_output(execution_id, local_task_id);
383 match read_result {
384 Ok(Ok(vc)) => {
385 this.current = vc;
386 continue 'outer;
387 }
388 Ok(Err(listener)) => listener,
389 Err(err) => return Poll::Ready(Err(err)),
390 }
391 }
392 };
393 this.listener = Some(listener);
394 }
395 };
396
397 suppress_top_level_task_check(this.strongly_consistent, || with_turbo_tasks(poll_fn))
404 }
405}
406
407impl Unpin for ResolveRawVcFuture {}
408
409#[must_use]
410pub struct ReadRawVcFuture {
411 read_cell_options: ReadCellOptions,
414 strongly_consistent: bool,
418 state: ReadRawVcState,
419}
420
421enum ReadRawVcState {
424 Resolving(ResolveRawVcFuture),
426 Reading {
428 task: TaskId,
429 index: CellId,
430 listener: Option<EventListener>,
431 },
432}
433
434impl ReadRawVcFuture {
435 pub(crate) fn new(vc: RawVc) -> Self {
436 ReadRawVcFuture {
437 read_cell_options: ReadCellOptions::default(),
438 strongly_consistent: false,
439 state: ReadRawVcState::Resolving(ResolveRawVcFuture::new(vc)),
440 }
441 }
442
443 fn map_resolve(mut self, f: impl FnOnce(ResolveRawVcFuture) -> ResolveRawVcFuture) -> Self {
444 match self.state {
445 ReadRawVcState::Resolving(resolve) => {
446 self.state = ReadRawVcState::Resolving(f(resolve));
447 }
448 ReadRawVcState::Reading { .. } => {
449 unreachable!("builder methods are only called before polling");
450 }
451 }
452 self
453 }
454
455 pub fn strongly_consistent(mut self) -> Self {
457 self.strongly_consistent = true;
458 self.map_resolve(|r| r.strongly_consistent())
459 }
460
461 pub fn track_with_key(mut self, key: u64) -> Self {
463 self.read_cell_options.tracking = ReadCellTracking::Tracked { key: Some(key) };
464 self.map_resolve(|r| r.track_with_key())
465 }
466
467 pub fn untracked(mut self) -> Self {
473 self.read_cell_options.tracking = ReadCellTracking::TrackOnlyError;
474 self.map_resolve(|r| r.untracked())
475 }
476
477 pub fn final_read_hint(mut self) -> Self {
479 self.read_cell_options.final_read_hint = true;
480 self
481 }
482}
483
484impl Future for ReadRawVcFuture {
485 type Output = Result<TypedCellContent>;
486
487 #[inline(never)]
488 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
489 let this = unsafe { self.get_unchecked_mut() };
491
492 if let ReadRawVcState::Resolving(resolve) = &mut this.state {
497 match ready!(Pin::new(resolve).poll(cx)) {
498 Err(err) => return Poll::Ready(Err(err)),
499 Ok(RawVc::TaskCell(task, index)) => {
500 this.state = ReadRawVcState::Reading {
501 task,
502 index,
503 listener: None,
504 };
505 }
506 Ok(_) => unreachable!("ResolveRawVcFuture always resolves to a TaskCell"),
507 }
508 }
509
510 let ReadRawVcState::Reading {
512 task,
513 index,
514 listener,
515 } = &mut this.state
516 else {
517 unreachable!("phase 1 transitioned to Reading above");
518 };
519 let task = *task;
520 let index = *index;
521 let read_cell_options = this.read_cell_options;
522
523 let poll_fn = |tt: &Arc<dyn TurboTasksApi>| -> Poll<Self::Output> {
524 loop {
525 ready!(poll_listener(listener, cx));
526 let new_listener = match tt.try_read_task_cell(task, index, read_cell_options) {
527 Ok(Ok(content)) => return Poll::Ready(Ok(content)),
528 Ok(Err(l)) => l,
529 Err(err) => return Poll::Ready(Err(err)),
530 };
531 *listener = Some(new_listener);
532 }
533 };
534
535 suppress_top_level_task_check(this.strongly_consistent, || with_turbo_tasks(poll_fn))
540 }
541}
542
543impl Unpin for ReadRawVcFuture {}