1use std::{
2 fmt::{Debug, Display},
3 future::Future,
4 pin::Pin,
5 task::Poll,
6};
7
8use anyhow::Result;
9use auto_hash_map::AutoSet;
10use bincode::{Decode, Encode};
11use serde::{Deserialize, Serialize};
12use thiserror::Error;
13
14use crate::{
15 CollectiblesSource, ReadCellOptions, ReadConsistency, ReadOutputOptions, ResolvedVc, TaskId,
16 TaskPersistence, TraitTypeId, ValueType, ValueTypeId, VcValueTrait,
17 backend::{CellContent, TypedCellContent},
18 event::EventListener,
19 id::{ExecutionId, LocalTaskId},
20 manager::{
21 ReadCellTracking, ReadTracking, read_local_output, read_task_cell, read_task_output,
22 with_turbo_tasks,
23 },
24 registry::{self, get_value_type},
25 turbo_tasks,
26};
27
28#[derive(Error, Debug)]
29pub enum ResolveTypeError {
30 #[error("no content in the cell")]
31 NoContent,
32 #[error("the content in the cell has no type")]
33 UntypedContent,
34 #[error("content is not available as task execution failed")]
35 TaskError { source: anyhow::Error },
36 #[error("reading the cell content failed")]
37 ReadError { source: anyhow::Error },
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
41pub struct CellId {
42 pub type_id: ValueTypeId,
43 pub index: u32,
44}
45
46impl Display for CellId {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 write!(
49 f,
50 "{}#{}",
51 registry::get_value_type(self.type_id).name,
52 self.index
53 )
54 }
55}
56
57#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
67pub enum RawVc {
68 TaskOutput(TaskId),
71 TaskCell(TaskId, CellId),
76 LocalOutput(ExecutionId, LocalTaskId, TaskPersistence),
84}
85
86impl Debug for RawVc {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 match self {
89 RawVc::TaskOutput(task_id) => f
90 .debug_tuple("RawVc::TaskOutput")
91 .field(&**task_id)
92 .finish(),
93 RawVc::TaskCell(task_id, cell_id) => f
94 .debug_tuple("RawVc::TaskCell")
95 .field(&**task_id)
96 .field(&cell_id.to_string())
97 .finish(),
98 RawVc::LocalOutput(execution_id, local_task_id, task_persistence) => f
99 .debug_tuple("RawVc::LocalOutput")
100 .field(&**execution_id)
101 .field(&**local_task_id)
102 .field(task_persistence)
103 .finish(),
104 }
105 }
106}
107
108impl Display for RawVc {
109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110 match self {
111 RawVc::TaskOutput(task_id) => write!(f, "output of task {}", **task_id),
112 RawVc::TaskCell(task_id, cell_id) => {
113 write!(f, "{} of task {}", cell_id, **task_id)
114 }
115 RawVc::LocalOutput(execution_id, local_task_id, task_persistence) => write!(
116 f,
117 "output of local task {} ({}, {})",
118 **local_task_id, **execution_id, task_persistence
119 ),
120 }
121 }
122}
123
124impl RawVc {
125 pub fn is_resolved(&self) -> bool {
126 match self {
127 RawVc::TaskOutput(..) => false,
128 RawVc::TaskCell(..) => true,
129 RawVc::LocalOutput(..) => false,
130 }
131 }
132
133 pub fn is_local(&self) -> bool {
134 match self {
135 RawVc::TaskOutput(..) => false,
136 RawVc::TaskCell(..) => false,
137 RawVc::LocalOutput(..) => true,
138 }
139 }
140
141 pub fn is_transient(&self) -> bool {
146 match self {
147 RawVc::TaskOutput(task) | RawVc::TaskCell(task, ..) => task.is_transient(),
148 RawVc::LocalOutput(_, _, persistence) => *persistence == TaskPersistence::Transient,
149 }
150 }
151
152 pub(crate) fn into_read(self, is_serializable_cell_content: bool) -> ReadRawVcFuture {
153 ReadRawVcFuture::new(self, Some(is_serializable_cell_content))
156 }
157
158 pub(crate) fn into_read_with_unknown_is_serializable_cell_content(self) -> ReadRawVcFuture {
159 ReadRawVcFuture::new(self, None)
162 }
163
164 pub(crate) async fn resolve_trait(
165 self,
166 trait_type: TraitTypeId,
167 ) -> Result<Option<RawVc>, ResolveTypeError> {
168 self.resolve_type_inner(|value_type_id| {
169 let value_type = get_value_type(value_type_id);
170 (value_type.has_trait(&trait_type), Some(value_type))
171 })
172 .await
173 }
174
175 pub(crate) async fn resolve_value(
176 self,
177 value_type: ValueTypeId,
178 ) -> Result<Option<RawVc>, ResolveTypeError> {
179 self.resolve_type_inner(|cell_value_type| (cell_value_type == value_type, None))
180 .await
181 }
182
183 async fn resolve_type_inner(
191 self,
192 conditional: impl FnOnce(ValueTypeId) -> (bool, Option<&'static ValueType>),
193 ) -> Result<Option<RawVc>, ResolveTypeError> {
194 let tt = turbo_tasks();
195 let mut current = self;
196 loop {
197 match current {
198 RawVc::TaskOutput(task) => {
199 current = read_task_output(&*tt, task, ReadOutputOptions::default())
200 .await
201 .map_err(|source| ResolveTypeError::TaskError { source })?;
202 }
203 RawVc::TaskCell(task, index) => {
204 let (ok, value_type) = conditional(index.type_id);
205 if !ok {
206 return Ok(None);
207 }
208 let value_type =
209 value_type.unwrap_or_else(|| registry::get_value_type(index.type_id));
210 let content = read_task_cell(
211 &*tt,
212 task,
213 index,
214 ReadCellOptions {
215 is_serializable_cell_content: value_type.bincode.is_some(),
216 ..Default::default()
217 },
218 )
219 .await
220 .map_err(|source| ResolveTypeError::ReadError { source })?;
221 if let TypedCellContent(_, CellContent(Some(_))) = content {
222 return Ok(Some(RawVc::TaskCell(task, index)));
223 } else {
224 return Err(ResolveTypeError::NoContent);
225 }
226 }
227 RawVc::LocalOutput(execution_id, local_task_id, ..) => {
228 current = read_local_output(&*tt, execution_id, local_task_id)
229 .await
230 .map_err(|source| ResolveTypeError::TaskError { source })?;
231 }
232 }
233 }
234 }
235
236 pub(crate) async fn resolve(self) -> Result<RawVc> {
238 self.resolve_inner(ReadOutputOptions {
239 consistency: ReadConsistency::Eventual,
240 ..Default::default()
241 })
242 .await
243 }
244
245 pub(crate) async fn resolve_strongly_consistent(self) -> Result<RawVc> {
247 self.resolve_inner(ReadOutputOptions {
248 consistency: ReadConsistency::Strong,
249 ..Default::default()
250 })
251 .await
252 }
253
254 async fn resolve_inner(self, mut options: ReadOutputOptions) -> Result<RawVc> {
255 let tt = turbo_tasks();
256 let mut current = self;
257 loop {
258 match current {
259 RawVc::TaskOutput(task) => {
260 current = read_task_output(&*tt, task, options).await?;
261 options.consistency = ReadConsistency::Eventual;
265 }
266 RawVc::TaskCell(_, _) => return Ok(current),
267 RawVc::LocalOutput(execution_id, local_task_id, ..) => {
268 debug_assert_eq!(options.consistency, ReadConsistency::Eventual);
269 current = read_local_output(&*tt, execution_id, local_task_id).await?;
270 }
271 }
272 }
273 }
274
275 pub(crate) async fn to_non_local(self) -> Result<RawVc> {
278 Ok(match self {
279 RawVc::LocalOutput(execution_id, local_task_id, ..) => {
280 let tt = turbo_tasks();
281 let local_output = read_local_output(&*tt, execution_id, local_task_id).await?;
282 debug_assert!(
283 !matches!(local_output, RawVc::LocalOutput(_, _, _)),
284 "a LocalOutput cannot point at other LocalOutputs"
285 );
286 local_output
287 }
288 non_local => non_local,
289 })
290 }
291
292 pub(crate) fn connect(&self) {
293 let RawVc::TaskOutput(task_id) = self else {
294 panic!("RawVc::connect() must only be called on a RawVc::TaskOutput");
295 };
296 let tt = turbo_tasks();
297 tt.connect_task(*task_id);
298 }
299
300 pub fn try_get_task_id(&self) -> Option<TaskId> {
301 match self {
302 RawVc::TaskOutput(t) | RawVc::TaskCell(t, ..) => Some(*t),
303 RawVc::LocalOutput(..) => None,
304 }
305 }
306
307 pub fn try_get_type_id(&self) -> Option<ValueTypeId> {
308 match self {
309 RawVc::TaskCell(_, CellId { type_id, .. }) => Some(*type_id),
310 RawVc::TaskOutput(..) | RawVc::LocalOutput(..) => None,
311 }
312 }
313
314 pub(crate) fn resolved_has_trait(&self, trait_id: TraitTypeId) -> bool {
317 match self {
318 RawVc::TaskCell(_task_id, cell_id) => {
319 get_value_type(cell_id.type_id).has_trait(&trait_id)
320 }
321 _ => unreachable!("resolved_has_trait must be called with a RawVc::TaskCell"),
322 }
323 }
324
325 pub(crate) fn resolved_is_type(&self, type_id: ValueTypeId) -> bool {
328 match self {
329 RawVc::TaskCell(_task_id, cell_id) => cell_id.type_id == type_id,
330 _ => unreachable!("resolved_is_type must be called with a RawVc::TaskCell"),
331 }
332 }
333}
334
335impl CollectiblesSource for RawVc {
337 fn peek_collectibles<T: VcValueTrait + ?Sized>(self) -> AutoSet<ResolvedVc<T>> {
338 let RawVc::TaskOutput(task_id) = self else {
339 panic!(
340 "<RawVc as CollectiblesSource>::peek_collectibles() must only be called on a \
341 RawVc::TaskOutput"
342 );
343 };
344 let tt = turbo_tasks();
345 let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
346 map.into_iter()
347 .filter_map(|(raw, count)| (count > 0).then_some(raw.try_into().unwrap()))
348 .collect()
349 }
350
351 fn take_collectibles<T: VcValueTrait + ?Sized>(self) -> AutoSet<ResolvedVc<T>> {
352 let RawVc::TaskOutput(task_id) = self else {
353 panic!(
354 "<RawVc as CollectiblesSource>::take_collectibles() must only be called on a \
355 RawVc::TaskOutput"
356 );
357 };
358 let tt = turbo_tasks();
359 let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
360 tt.unemit_collectibles(T::get_trait_type_id(), &map);
361 map.into_iter()
362 .filter_map(|(raw, count)| (count > 0).then_some(raw.try_into().unwrap()))
363 .collect()
364 }
365
366 fn drop_collectibles<T: VcValueTrait + ?Sized>(self) {
367 let RawVc::TaskOutput(task_id) = self else {
368 panic!(
369 "<RawVc as CollectiblesSource>::drop_collectibles() must only be called on a \
370 RawVc::TaskOutput"
371 );
372 };
373 let tt = turbo_tasks();
374 let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
375 tt.unemit_collectibles(T::get_trait_type_id(), &map);
376 }
377}
378
379pub struct ReadRawVcFuture {
380 current: RawVc,
381 read_output_options: ReadOutputOptions,
382 read_cell_options: ReadCellOptions,
383 is_serializable_cell_content_unknown: bool,
384 listener: Option<EventListener>,
385}
386
387impl ReadRawVcFuture {
388 pub(crate) fn new(vc: RawVc, is_serializable_cell_content: Option<bool>) -> Self {
389 ReadRawVcFuture {
390 current: vc,
391 read_output_options: ReadOutputOptions::default(),
392 read_cell_options: ReadCellOptions {
393 is_serializable_cell_content: is_serializable_cell_content.unwrap_or(false),
394 ..Default::default()
395 },
396 is_serializable_cell_content_unknown: is_serializable_cell_content.is_none(),
397 listener: None,
398 }
399 }
400
401 pub fn strongly_consistent(mut self) -> Self {
403 self.read_output_options.consistency = ReadConsistency::Strong;
404 self
405 }
406
407 pub fn track_with_key(mut self, key: u64) -> Self {
409 self.read_output_options.tracking = ReadTracking::Tracked;
410 self.read_cell_options.tracking = ReadCellTracking::Tracked { key: Some(key) };
411 self
412 }
413
414 pub fn untracked(mut self) -> Self {
420 self.read_output_options.tracking = ReadTracking::TrackOnlyError;
421 self.read_cell_options.tracking = ReadCellTracking::TrackOnlyError;
422 self
423 }
424
425 pub fn final_read_hint(mut self) -> Self {
427 self.read_cell_options.final_read_hint = true;
428 self
429 }
430}
431
432impl Future for ReadRawVcFuture {
433 type Output = Result<TypedCellContent>;
434
435 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
436 with_turbo_tasks(|tt| {
437 let this = unsafe { self.get_unchecked_mut() };
439 'outer: loop {
440 if let Some(listener) = &mut this.listener {
441 let listener = unsafe { Pin::new_unchecked(listener) };
443 if listener.poll(cx).is_pending() {
444 return Poll::Pending;
445 }
446 this.listener = None;
447 }
448 let mut listener = match this.current {
449 RawVc::TaskOutput(task) => {
450 let read_result = tt.try_read_task_output(task, this.read_output_options);
451 match read_result {
452 Ok(Ok(vc)) => {
453 this.read_output_options.consistency = ReadConsistency::Eventual;
457 this.current = vc;
458 continue 'outer;
459 }
460 Ok(Err(listener)) => listener,
461 Err(err) => return Poll::Ready(Err(err)),
462 }
463 }
464 RawVc::TaskCell(task, index) => {
465 if this.is_serializable_cell_content_unknown {
466 let value_type = registry::get_value_type(index.type_id);
467 this.read_cell_options.is_serializable_cell_content =
468 value_type.bincode.is_some();
469 }
470 let read_result =
471 tt.try_read_task_cell(task, index, this.read_cell_options);
472 match read_result {
473 Ok(Ok(content)) => {
474 return Poll::Ready(Ok(content));
476 }
477 Ok(Err(listener)) => listener,
478 Err(err) => return Poll::Ready(Err(err)),
479 }
480 }
481 RawVc::LocalOutput(execution_id, local_output_id, ..) => {
482 debug_assert_eq!(
483 this.read_output_options.consistency,
484 ReadConsistency::Eventual
485 );
486 let read_result = tt.try_read_local_output(execution_id, local_output_id);
487 match read_result {
488 Ok(Ok(vc)) => {
489 this.current = vc;
490 continue 'outer;
491 }
492 Ok(Err(listener)) => listener,
493 Err(err) => return Poll::Ready(Err(err)),
494 }
495 }
496 };
497 match unsafe { Pin::new_unchecked(&mut listener) }.poll(cx) {
499 Poll::Ready(_) => continue,
500 Poll::Pending => {
501 this.listener = Some(listener);
502 return Poll::Pending;
503 }
504 };
505 }
506 })
507 }
508}
509
510impl Unpin for ReadRawVcFuture {}