1use std::{
2 fmt::{Debug, Display},
3 future::Future,
4 pin::Pin,
5 sync::Arc,
6 task::Poll,
7};
8
9use anyhow::Result;
10use auto_hash_map::AutoSet;
11use bincode::{Decode, Encode};
12use serde::{Deserialize, Serialize};
13
14use crate::{
15 CollectiblesSource, ReadCellOptions, ReadConsistency, ReadOutputOptions, ResolvedVc, TaskId,
16 TaskPersistence, TraitTypeId, ValueTypeId, VcValueTrait,
17 backend::TypedCellContent,
18 event::EventListener,
19 id::{ExecutionId, LocalTaskId},
20 manager::{
21 ReadCellTracking, ReadTracking, SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK,
22 TurboTasksApi, read_local_output, read_task_output, with_turbo_tasks,
23 },
24 registry::{self, get_value_type},
25 turbo_tasks,
26};
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
29pub struct CellId {
30 pub type_id: ValueTypeId,
31 pub index: u32,
32}
33
34impl Display for CellId {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 write!(
37 f,
38 "{}#{}",
39 registry::get_value_type(self.type_id).name,
40 self.index
41 )
42 }
43}
44
45#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
55pub enum RawVc {
56 TaskOutput(TaskId),
59 TaskCell(TaskId, CellId),
64 LocalOutput(ExecutionId, LocalTaskId, TaskPersistence),
72}
73
74impl Debug for RawVc {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 match self {
77 RawVc::TaskOutput(task_id) => f
78 .debug_tuple("RawVc::TaskOutput")
79 .field(&**task_id)
80 .finish(),
81 RawVc::TaskCell(task_id, cell_id) => f
82 .debug_tuple("RawVc::TaskCell")
83 .field(&**task_id)
84 .field(&cell_id.to_string())
85 .finish(),
86 RawVc::LocalOutput(execution_id, local_task_id, task_persistence) => f
87 .debug_tuple("RawVc::LocalOutput")
88 .field(&**execution_id)
89 .field(&**local_task_id)
90 .field(task_persistence)
91 .finish(),
92 }
93 }
94}
95
96impl Display for RawVc {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 match self {
99 RawVc::TaskOutput(task_id) => write!(f, "output of task {}", **task_id),
100 RawVc::TaskCell(task_id, cell_id) => {
101 write!(f, "{} of task {}", cell_id, **task_id)
102 }
103 RawVc::LocalOutput(execution_id, local_task_id, task_persistence) => write!(
104 f,
105 "output of local task {} ({}, {})",
106 **local_task_id, **execution_id, task_persistence
107 ),
108 }
109 }
110}
111
112impl RawVc {
113 pub fn is_resolved(&self) -> bool {
114 match self {
115 RawVc::TaskOutput(..) => false,
116 RawVc::TaskCell(..) => true,
117 RawVc::LocalOutput(..) => false,
118 }
119 }
120
121 pub fn is_local(&self) -> bool {
122 match self {
123 RawVc::TaskOutput(..) => false,
124 RawVc::TaskCell(..) => false,
125 RawVc::LocalOutput(..) => true,
126 }
127 }
128
129 pub fn is_transient(&self) -> bool {
134 match self {
135 RawVc::TaskOutput(task) | RawVc::TaskCell(task, ..) => task.is_transient(),
136 RawVc::LocalOutput(_, _, persistence) => *persistence == TaskPersistence::Transient,
137 }
138 }
139
140 pub(crate) fn into_read(self, is_serializable_cell_content: bool) -> ReadRawVcFuture {
141 ReadRawVcFuture::new(self, Some(is_serializable_cell_content))
144 }
145
146 pub(crate) fn into_read_with_unknown_is_serializable_cell_content(self) -> ReadRawVcFuture {
147 ReadRawVcFuture::new(self, None)
150 }
151
152 pub(crate) async fn resolve(self) -> Result<RawVc> {
154 self.resolve_inner(ReadOutputOptions {
155 consistency: ReadConsistency::Eventual,
156 ..Default::default()
157 })
158 .await
159 }
160
161 pub(crate) async fn resolve_strongly_consistent(self) -> Result<RawVc> {
163 SuppressTopLevelTaskCheckFuture {
164 inner: self.resolve_inner(ReadOutputOptions {
165 consistency: ReadConsistency::Strong,
166 ..Default::default()
167 }),
168 }
169 .await
170 }
171
172 async fn resolve_inner(self, mut options: ReadOutputOptions) -> Result<RawVc> {
173 let tt = turbo_tasks();
174 let mut current = self;
175 loop {
176 match current {
177 RawVc::TaskOutput(task) => {
178 current = read_task_output(&*tt, task, options).await?;
179 options.consistency = ReadConsistency::Eventual;
183 }
184 RawVc::TaskCell(_, _) => return Ok(current),
185 RawVc::LocalOutput(execution_id, local_task_id, ..) => {
186 debug_assert_eq!(options.consistency, ReadConsistency::Eventual);
187 current = read_local_output(&*tt, execution_id, local_task_id).await?;
188 }
189 }
190 }
191 }
192
193 pub(crate) async fn to_non_local(self) -> Result<RawVc> {
196 Ok(match self {
197 RawVc::LocalOutput(execution_id, local_task_id, ..) => {
198 let tt = turbo_tasks();
199 let local_output = read_local_output(&*tt, execution_id, local_task_id).await?;
200 debug_assert!(
201 !matches!(local_output, RawVc::LocalOutput(_, _, _)),
202 "a LocalOutput cannot point at other LocalOutputs"
203 );
204 local_output
205 }
206 non_local => non_local,
207 })
208 }
209
210 pub(crate) fn connect(&self) {
211 let RawVc::TaskOutput(task_id) = self else {
212 panic!("RawVc::connect() must only be called on a RawVc::TaskOutput");
213 };
214 let tt = turbo_tasks();
215 tt.connect_task(*task_id);
216 }
217
218 pub fn try_get_task_id(&self) -> Option<TaskId> {
219 match self {
220 RawVc::TaskOutput(t) | RawVc::TaskCell(t, ..) => Some(*t),
221 RawVc::LocalOutput(..) => None,
222 }
223 }
224
225 pub fn try_get_type_id(&self) -> Option<ValueTypeId> {
226 match self {
227 RawVc::TaskCell(_, CellId { type_id, .. }) => Some(*type_id),
228 RawVc::TaskOutput(..) | RawVc::LocalOutput(..) => None,
229 }
230 }
231
232 pub(crate) fn resolved_has_trait(&self, trait_id: TraitTypeId) -> bool {
235 match self {
236 RawVc::TaskCell(_task_id, cell_id) => {
237 get_value_type(cell_id.type_id).has_trait(&trait_id)
238 }
239 _ => unreachable!("resolved_has_trait must be called with a RawVc::TaskCell"),
240 }
241 }
242
243 pub(crate) fn resolved_is_type(&self, type_id: ValueTypeId) -> bool {
246 match self {
247 RawVc::TaskCell(_task_id, cell_id) => cell_id.type_id == type_id,
248 _ => unreachable!("resolved_is_type must be called with a RawVc::TaskCell"),
249 }
250 }
251}
252
253impl CollectiblesSource for RawVc {
255 fn peek_collectibles<T: VcValueTrait + ?Sized>(self) -> AutoSet<ResolvedVc<T>> {
256 let RawVc::TaskOutput(task_id) = self else {
257 panic!(
258 "<RawVc as CollectiblesSource>::peek_collectibles() must only be called on a \
259 RawVc::TaskOutput"
260 );
261 };
262 let tt = turbo_tasks();
263 let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
264 map.into_iter()
265 .filter_map(|(raw, count)| (count > 0).then_some(raw.try_into().unwrap()))
266 .collect()
267 }
268
269 fn take_collectibles<T: VcValueTrait + ?Sized>(self) -> AutoSet<ResolvedVc<T>> {
270 let RawVc::TaskOutput(task_id) = self else {
271 panic!(
272 "<RawVc as CollectiblesSource>::take_collectibles() must only be called on a \
273 RawVc::TaskOutput"
274 );
275 };
276 let tt = turbo_tasks();
277 let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
278 tt.unemit_collectibles(T::get_trait_type_id(), &map);
279 map.into_iter()
280 .filter_map(|(raw, count)| (count > 0).then_some(raw.try_into().unwrap()))
281 .collect()
282 }
283
284 fn drop_collectibles<T: VcValueTrait + ?Sized>(self) {
285 let RawVc::TaskOutput(task_id) = self else {
286 panic!(
287 "<RawVc as CollectiblesSource>::drop_collectibles() must only be called on a \
288 RawVc::TaskOutput"
289 );
290 };
291 let tt = turbo_tasks();
292 let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
293 tt.unemit_collectibles(T::get_trait_type_id(), &map);
294 }
295}
296
297struct SuppressTopLevelTaskCheckFuture<F> {
302 inner: F,
303}
304
305impl<F: Future> Future for SuppressTopLevelTaskCheckFuture<F> {
306 type Output = F::Output;
307
308 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
309 let inner = unsafe { self.map_unchecked_mut(|this| &mut this.inner) };
311 if cfg!(debug_assertions) {
312 SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK.sync_scope(true, || inner.poll(cx))
313 } else {
314 inner.poll(cx)
315 }
316 }
317}
318
319pub struct ReadRawVcFuture {
320 current: RawVc,
321 read_output_options: ReadOutputOptions,
322 read_cell_options: ReadCellOptions,
323 is_serializable_cell_content_unknown: bool,
324 strongly_consistent: bool,
327 listener: Option<EventListener>,
328}
329
330impl ReadRawVcFuture {
331 pub(crate) fn new(vc: RawVc, is_serializable_cell_content: Option<bool>) -> Self {
332 ReadRawVcFuture {
333 current: vc,
334 read_output_options: ReadOutputOptions::default(),
335 read_cell_options: ReadCellOptions {
336 is_serializable_cell_content: is_serializable_cell_content.unwrap_or(false),
337 ..Default::default()
338 },
339 is_serializable_cell_content_unknown: is_serializable_cell_content.is_none(),
340 strongly_consistent: false,
341 listener: None,
342 }
343 }
344
345 pub fn strongly_consistent(mut self) -> Self {
347 self.read_output_options.consistency = ReadConsistency::Strong;
348 self.strongly_consistent = true;
349 self
350 }
351
352 pub fn track_with_key(mut self, key: u64) -> Self {
354 self.read_output_options.tracking = ReadTracking::Tracked;
355 self.read_cell_options.tracking = ReadCellTracking::Tracked { key: Some(key) };
356 self
357 }
358
359 pub fn untracked(mut self) -> Self {
365 self.read_output_options.tracking = ReadTracking::TrackOnlyError;
366 self.read_cell_options.tracking = ReadCellTracking::TrackOnlyError;
367 self
368 }
369
370 pub fn final_read_hint(mut self) -> Self {
372 self.read_cell_options.final_read_hint = true;
373 self
374 }
375}
376
377impl Future for ReadRawVcFuture {
378 type Output = Result<TypedCellContent>;
379
380 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
381 let this = unsafe { self.get_unchecked_mut() };
383
384 let poll_fn = |tt: &Arc<dyn TurboTasksApi>| -> Poll<Self::Output> {
386 'outer: loop {
387 if let Some(listener) = &mut this.listener {
388 let listener = unsafe { Pin::new_unchecked(listener) };
390 if listener.poll(cx).is_pending() {
391 return Poll::Pending;
392 }
393 this.listener = None;
394 }
395 let mut listener = match this.current {
396 RawVc::TaskOutput(task) => {
397 let read_result = tt.try_read_task_output(task, this.read_output_options);
398 match read_result {
399 Ok(Ok(vc)) => {
400 this.read_output_options.consistency = ReadConsistency::Eventual;
409 this.current = vc;
410 continue 'outer;
411 }
412 Ok(Err(listener)) => listener,
413 Err(err) => return Poll::Ready(Err(err)),
414 }
415 }
416 RawVc::TaskCell(task, index) => {
417 if this.is_serializable_cell_content_unknown {
418 let value_type = registry::get_value_type(index.type_id);
419 this.read_cell_options.is_serializable_cell_content =
420 value_type.bincode.is_some();
421 }
422 let read_result =
423 tt.try_read_task_cell(task, index, this.read_cell_options);
424 match read_result {
425 Ok(Ok(content)) => {
426 return Poll::Ready(Ok(content));
428 }
429 Ok(Err(listener)) => listener,
430 Err(err) => return Poll::Ready(Err(err)),
431 }
432 }
433 RawVc::LocalOutput(execution_id, local_output_id, ..) => {
434 debug_assert_eq!(
435 this.read_output_options.consistency,
436 ReadConsistency::Eventual
437 );
438 let read_result = tt.try_read_local_output(execution_id, local_output_id);
439 match read_result {
440 Ok(Ok(vc)) => {
441 this.current = vc;
442 continue 'outer;
443 }
444 Ok(Err(listener)) => listener,
445 Err(err) => return Poll::Ready(Err(err)),
446 }
447 }
448 };
449 match unsafe { Pin::new_unchecked(&mut listener) }.poll(cx) {
451 Poll::Ready(_) => continue,
452 Poll::Pending => {
453 this.listener = Some(listener);
454 return Poll::Pending;
455 }
456 };
457 }
458 };
459
460 fn suppress_top_level_task_check<R>(strongly_consistent: bool, f: impl FnOnce() -> R) -> R {
461 if cfg!(debug_assertions) && strongly_consistent {
462 SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK.sync_scope(true, f)
464 } else {
465 f()
466 }
467 }
468
469 suppress_top_level_task_check(this.strongly_consistent, || with_turbo_tasks(poll_fn))
476 }
477}
478
479impl Unpin for ReadRawVcFuture {}