1use std::{
2 fmt::{Debug, Display},
3 future::Future,
4 pin::Pin,
5 sync::Arc,
6 task::Poll,
7};
8
9use anyhow::Result;
10use auto_hash_map::AutoSet;
11use bincode::{Decode, Encode};
12use serde::{Deserialize, Serialize};
13
14use crate::{
15 CollectiblesSource, ReadCellOptions, ReadConsistency, ReadOutputOptions, ResolvedVc, TaskId,
16 TaskPersistence, TraitTypeId, ValueTypeId, VcValueTrait,
17 backend::TypedCellContent,
18 event::EventListener,
19 id::{ExecutionId, LocalTaskId},
20 manager::{
21 ReadCellTracking, ReadTracking, SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK,
22 TurboTasksApi, read_local_output, read_task_output, with_turbo_tasks,
23 },
24 registry::{self, get_value_type},
25 turbo_tasks,
26};
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
29pub struct CellId {
30 pub type_id: ValueTypeId,
31 pub index: u32,
32}
33
34impl Display for CellId {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 write!(
37 f,
38 "{}#{}",
39 registry::get_value_type(self.type_id).ty.name,
40 self.index
41 )
42 }
43}
44
45#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
56pub enum RawVc {
57 TaskOutput(TaskId),
60 TaskCell(TaskId, CellId),
65 LocalOutput(ExecutionId, LocalTaskId, TaskPersistence),
75}
76
77impl Debug for RawVc {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 match self {
80 RawVc::TaskOutput(task_id) => f
81 .debug_tuple("RawVc::TaskOutput")
82 .field(&**task_id)
83 .finish(),
84 RawVc::TaskCell(task_id, cell_id) => f
85 .debug_tuple("RawVc::TaskCell")
86 .field(&**task_id)
87 .field(&cell_id.to_string())
88 .finish(),
89 RawVc::LocalOutput(execution_id, local_task_id, task_persistence) => f
90 .debug_tuple("RawVc::LocalOutput")
91 .field(&**execution_id)
92 .field(&**local_task_id)
93 .field(task_persistence)
94 .finish(),
95 }
96 }
97}
98
99impl Display for RawVc {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 match self {
102 RawVc::TaskOutput(task_id) => write!(f, "output of task {}", **task_id),
103 RawVc::TaskCell(task_id, cell_id) => {
104 write!(f, "{} of task {}", cell_id, **task_id)
105 }
106 RawVc::LocalOutput(execution_id, local_task_id, task_persistence) => write!(
107 f,
108 "output of local task {} ({}, {})",
109 **local_task_id, **execution_id, task_persistence
110 ),
111 }
112 }
113}
114
115impl RawVc {
116 pub fn is_resolved(&self) -> bool {
117 match self {
118 RawVc::TaskOutput(..) => false,
119 RawVc::TaskCell(..) => true,
120 RawVc::LocalOutput(..) => false,
121 }
122 }
123
124 pub fn is_local(&self) -> bool {
125 match self {
126 RawVc::TaskOutput(..) => false,
127 RawVc::TaskCell(..) => false,
128 RawVc::LocalOutput(..) => true,
129 }
130 }
131
132 pub fn is_transient(&self) -> bool {
137 match self {
138 RawVc::TaskOutput(task) | RawVc::TaskCell(task, ..) => task.is_transient(),
139 RawVc::LocalOutput(_, _, persistence) => *persistence == TaskPersistence::Transient,
140 }
141 }
142
143 pub(crate) fn into_read(self, is_serializable_cell_content: bool) -> ReadRawVcFuture {
144 ReadRawVcFuture::new(self, Some(is_serializable_cell_content))
147 }
148
149 pub(crate) fn into_read_with_unknown_is_serializable_cell_content(self) -> ReadRawVcFuture {
150 ReadRawVcFuture::new(self, None)
153 }
154
155 pub(crate) async fn resolve(self) -> Result<RawVc> {
157 self.resolve_inner(ReadOutputOptions {
158 consistency: ReadConsistency::Eventual,
159 ..Default::default()
160 })
161 .await
162 }
163
164 pub(crate) async fn resolve_strongly_consistent(self) -> Result<RawVc> {
166 SuppressTopLevelTaskCheckFuture {
167 inner: self.resolve_inner(ReadOutputOptions {
168 consistency: ReadConsistency::Strong,
169 ..Default::default()
170 }),
171 }
172 .await
173 }
174
175 async fn resolve_inner(self, mut options: ReadOutputOptions) -> Result<RawVc> {
176 let tt = turbo_tasks();
177 let mut current = self;
178 loop {
179 match current {
180 RawVc::TaskOutput(task) => {
181 current = read_task_output(&*tt, task, options).await?;
182 options.consistency = ReadConsistency::Eventual;
186 }
187 RawVc::TaskCell(_, _) => return Ok(current),
188 RawVc::LocalOutput(execution_id, local_task_id, ..) => {
189 debug_assert_eq!(options.consistency, ReadConsistency::Eventual);
190 current = read_local_output(&*tt, execution_id, local_task_id).await?;
191 }
192 }
193 }
194 }
195
196 pub(crate) async fn to_non_local(self) -> Result<RawVc> {
199 Ok(match self {
200 RawVc::LocalOutput(execution_id, local_task_id, ..) => {
201 let tt = turbo_tasks();
202 let local_output = read_local_output(&*tt, execution_id, local_task_id).await?;
203 debug_assert!(
204 !matches!(local_output, RawVc::LocalOutput(_, _, _)),
205 "a LocalOutput cannot point at other LocalOutputs"
206 );
207 local_output
208 }
209 non_local => non_local,
210 })
211 }
212
213 pub(crate) fn connect(&self) {
214 let RawVc::TaskOutput(task_id) = self else {
215 panic!("RawVc::connect() must only be called on a RawVc::TaskOutput");
216 };
217 let tt = turbo_tasks();
218 tt.connect_task(*task_id);
219 }
220
221 pub fn try_get_task_id(&self) -> Option<TaskId> {
222 match self {
223 RawVc::TaskOutput(t) | RawVc::TaskCell(t, ..) => Some(*t),
224 RawVc::LocalOutput(..) => None,
225 }
226 }
227
228 pub fn try_get_type_id(&self) -> Option<ValueTypeId> {
229 match self {
230 RawVc::TaskCell(_, CellId { type_id, .. }) => Some(*type_id),
231 RawVc::TaskOutput(..) | RawVc::LocalOutput(..) => None,
232 }
233 }
234
235 pub(crate) fn resolved_has_trait(&self, trait_id: TraitTypeId) -> bool {
238 match self {
239 RawVc::TaskCell(_task_id, cell_id) => {
240 get_value_type(cell_id.type_id).has_trait(&trait_id)
241 }
242 _ => unreachable!("resolved_has_trait must be called with a RawVc::TaskCell"),
243 }
244 }
245
246 pub(crate) fn resolved_is_type(&self, type_id: ValueTypeId) -> bool {
249 match self {
250 RawVc::TaskCell(_task_id, cell_id) => cell_id.type_id == type_id,
251 _ => unreachable!("resolved_is_type must be called with a RawVc::TaskCell"),
252 }
253 }
254}
255
256impl CollectiblesSource for RawVc {
258 fn peek_collectibles<T: VcValueTrait + ?Sized>(self) -> AutoSet<ResolvedVc<T>> {
259 let RawVc::TaskOutput(task_id) = self else {
260 panic!(
261 "<RawVc as CollectiblesSource>::peek_collectibles() must only be called on a \
262 RawVc::TaskOutput"
263 );
264 };
265 let tt = turbo_tasks();
266 let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
267 map.into_iter()
268 .filter_map(|(raw, count)| (count > 0).then_some(raw.try_into().unwrap()))
269 .collect()
270 }
271
272 fn take_collectibles<T: VcValueTrait + ?Sized>(self) -> AutoSet<ResolvedVc<T>> {
273 let RawVc::TaskOutput(task_id) = self else {
274 panic!(
275 "<RawVc as CollectiblesSource>::take_collectibles() must only be called on a \
276 RawVc::TaskOutput"
277 );
278 };
279 let tt = turbo_tasks();
280 let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
281 tt.unemit_collectibles(T::get_trait_type_id(), &map);
282 map.into_iter()
283 .filter_map(|(raw, count)| (count > 0).then_some(raw.try_into().unwrap()))
284 .collect()
285 }
286
287 fn drop_collectibles<T: VcValueTrait + ?Sized>(self) {
288 let RawVc::TaskOutput(task_id) = self else {
289 panic!(
290 "<RawVc as CollectiblesSource>::drop_collectibles() must only be called on a \
291 RawVc::TaskOutput"
292 );
293 };
294 let tt = turbo_tasks();
295 let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
296 tt.unemit_collectibles(T::get_trait_type_id(), &map);
297 }
298}
299
300struct SuppressTopLevelTaskCheckFuture<F> {
305 inner: F,
306}
307
308impl<F: Future> Future for SuppressTopLevelTaskCheckFuture<F> {
309 type Output = F::Output;
310
311 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
312 let inner = unsafe { self.map_unchecked_mut(|this| &mut this.inner) };
314 if cfg!(debug_assertions) {
315 SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK.sync_scope(true, || inner.poll(cx))
316 } else {
317 inner.poll(cx)
318 }
319 }
320}
321
322#[must_use]
323pub struct ReadRawVcFuture {
324 current: RawVc,
325 read_output_options: ReadOutputOptions,
326 read_cell_options: ReadCellOptions,
327 is_serializable_cell_content_unknown: bool,
328 strongly_consistent: bool,
331 listener: Option<EventListener>,
332}
333
334impl ReadRawVcFuture {
335 pub(crate) fn new(vc: RawVc, is_serializable_cell_content: Option<bool>) -> Self {
336 ReadRawVcFuture {
337 current: vc,
338 read_output_options: ReadOutputOptions::default(),
339 read_cell_options: ReadCellOptions {
340 is_serializable_cell_content: is_serializable_cell_content.unwrap_or(false),
341 ..Default::default()
342 },
343 is_serializable_cell_content_unknown: is_serializable_cell_content.is_none(),
344 strongly_consistent: false,
345 listener: None,
346 }
347 }
348
349 pub fn strongly_consistent(mut self) -> Self {
351 self.read_output_options.consistency = ReadConsistency::Strong;
352 self.strongly_consistent = true;
353 self
354 }
355
356 pub fn track_with_key(mut self, key: u64) -> Self {
358 self.read_output_options.tracking = ReadTracking::Tracked;
359 self.read_cell_options.tracking = ReadCellTracking::Tracked { key: Some(key) };
360 self
361 }
362
363 pub fn untracked(mut self) -> Self {
369 self.read_output_options.tracking = ReadTracking::TrackOnlyError;
370 self.read_cell_options.tracking = ReadCellTracking::TrackOnlyError;
371 self
372 }
373
374 pub fn final_read_hint(mut self) -> Self {
376 self.read_cell_options.final_read_hint = true;
377 self
378 }
379}
380
381impl Future for ReadRawVcFuture {
382 type Output = Result<TypedCellContent>;
383
384 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
385 let this = unsafe { self.get_unchecked_mut() };
387
388 let poll_fn = |tt: &Arc<dyn TurboTasksApi>| -> Poll<Self::Output> {
390 'outer: loop {
391 if let Some(listener) = &mut this.listener {
392 let listener = unsafe { Pin::new_unchecked(listener) };
394 if listener.poll(cx).is_pending() {
395 return Poll::Pending;
396 }
397 this.listener = None;
398 }
399 let mut listener = match this.current {
400 RawVc::TaskOutput(task) => {
401 let read_result = tt.try_read_task_output(task, this.read_output_options);
402 match read_result {
403 Ok(Ok(vc)) => {
404 this.read_output_options.consistency = ReadConsistency::Eventual;
413 this.current = vc;
414 continue 'outer;
415 }
416 Ok(Err(listener)) => listener,
417 Err(err) => return Poll::Ready(Err(err)),
418 }
419 }
420 RawVc::TaskCell(task, index) => {
421 if this.is_serializable_cell_content_unknown {
422 let value_type = registry::get_value_type(index.type_id);
423 this.read_cell_options.is_serializable_cell_content =
424 value_type.bincode.is_some();
425 }
426 let read_result =
427 tt.try_read_task_cell(task, index, this.read_cell_options);
428 match read_result {
429 Ok(Ok(content)) => {
430 return Poll::Ready(Ok(content));
432 }
433 Ok(Err(listener)) => listener,
434 Err(err) => return Poll::Ready(Err(err)),
435 }
436 }
437 RawVc::LocalOutput(execution_id, local_output_id, ..) => {
438 debug_assert_eq!(
439 this.read_output_options.consistency,
440 ReadConsistency::Eventual
441 );
442 let read_result = tt.try_read_local_output(execution_id, local_output_id);
443 match read_result {
444 Ok(Ok(vc)) => {
445 this.current = vc;
446 continue 'outer;
447 }
448 Ok(Err(listener)) => listener,
449 Err(err) => return Poll::Ready(Err(err)),
450 }
451 }
452 };
453 match unsafe { Pin::new_unchecked(&mut listener) }.poll(cx) {
455 Poll::Ready(_) => continue,
456 Poll::Pending => {
457 this.listener = Some(listener);
458 return Poll::Pending;
459 }
460 };
461 }
462 };
463
464 fn suppress_top_level_task_check<R>(strongly_consistent: bool, f: impl FnOnce() -> R) -> R {
465 if cfg!(debug_assertions) && strongly_consistent {
466 SUPPRESS_EVENTUAL_CONSISTENCY_TOP_LEVEL_TASK_CHECK.sync_scope(true, f)
468 } else {
469 f()
470 }
471 }
472
473 suppress_top_level_task_check(this.strongly_consistent, || with_turbo_tasks(poll_fn))
480 }
481}
482
483impl Unpin for ReadRawVcFuture {}