1use std::{
2 fmt::{Debug, Display},
3 future::Future,
4 pin::Pin,
5 task::Poll,
6};
7
8use anyhow::Result;
9use auto_hash_map::AutoSet;
10use bincode::{Decode, Encode};
11use serde::{Deserialize, Serialize};
12use thiserror::Error;
13
14use crate::{
15 CollectiblesSource, ReadCellOptions, ReadConsistency, ReadOutputOptions, ResolvedVc, TaskId,
16 TaskPersistence, TraitTypeId, ValueType, ValueTypeId, VcValueTrait,
17 backend::{CellContent, TypedCellContent},
18 event::EventListener,
19 id::{ExecutionId, LocalTaskId},
20 manager::{
21 ReadTracking, read_local_output, read_task_cell, read_task_output, with_turbo_tasks,
22 },
23 registry::{self, get_value_type},
24 turbo_tasks,
25};
26
27#[derive(Error, Debug)]
28pub enum ResolveTypeError {
29 #[error("no content in the cell")]
30 NoContent,
31 #[error("the content in the cell has no type")]
32 UntypedContent,
33 #[error("content is not available as task execution failed")]
34 TaskError { source: anyhow::Error },
35 #[error("reading the cell content failed")]
36 ReadError { source: anyhow::Error },
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
40pub struct CellId {
41 pub type_id: ValueTypeId,
42 pub index: u32,
43}
44
45impl Display for CellId {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 write!(
48 f,
49 "{}#{}",
50 registry::get_value_type(self.type_id).name,
51 self.index
52 )
53 }
54}
55
56#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
66pub enum RawVc {
67 TaskOutput(TaskId),
70 TaskCell(TaskId, CellId),
75 LocalOutput(ExecutionId, LocalTaskId, TaskPersistence),
83}
84
85impl Debug for RawVc {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 match self {
88 RawVc::TaskOutput(task_id) => f
89 .debug_tuple("RawVc::TaskOutput")
90 .field(&**task_id)
91 .finish(),
92 RawVc::TaskCell(task_id, cell_id) => f
93 .debug_tuple("RawVc::TaskCell")
94 .field(&**task_id)
95 .field(&cell_id.to_string())
96 .finish(),
97 RawVc::LocalOutput(execution_id, local_task_id, task_persistence) => f
98 .debug_tuple("RawVc::LocalOutput")
99 .field(&**execution_id)
100 .field(&**local_task_id)
101 .field(task_persistence)
102 .finish(),
103 }
104 }
105}
106
107impl Display for RawVc {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 match self {
110 RawVc::TaskOutput(task_id) => write!(f, "output of task {}", **task_id),
111 RawVc::TaskCell(task_id, cell_id) => {
112 write!(f, "{} of task {}", cell_id, **task_id)
113 }
114 RawVc::LocalOutput(execution_id, local_task_id, task_persistence) => write!(
115 f,
116 "output of local task {} ({}, {})",
117 **local_task_id, **execution_id, task_persistence
118 ),
119 }
120 }
121}
122
123impl RawVc {
124 pub fn is_resolved(&self) -> bool {
125 match self {
126 RawVc::TaskOutput(..) => false,
127 RawVc::TaskCell(..) => true,
128 RawVc::LocalOutput(..) => false,
129 }
130 }
131
132 pub fn is_local(&self) -> bool {
133 match self {
134 RawVc::TaskOutput(..) => false,
135 RawVc::TaskCell(..) => false,
136 RawVc::LocalOutput(..) => true,
137 }
138 }
139
140 pub fn is_transient(&self) -> bool {
145 match self {
146 RawVc::TaskOutput(task) | RawVc::TaskCell(task, ..) => task.is_transient(),
147 RawVc::LocalOutput(_, _, persistence) => *persistence == TaskPersistence::Transient,
148 }
149 }
150
151 pub(crate) fn into_read(self, is_serializable_cell_content: bool) -> ReadRawVcFuture {
152 ReadRawVcFuture::new(self, Some(is_serializable_cell_content))
155 }
156
157 pub(crate) fn into_read_with_unknown_is_serializable_cell_content(self) -> ReadRawVcFuture {
158 ReadRawVcFuture::new(self, None)
161 }
162
163 pub(crate) async fn resolve_trait(
164 self,
165 trait_type: TraitTypeId,
166 ) -> Result<Option<RawVc>, ResolveTypeError> {
167 self.resolve_type_inner(|value_type_id| {
168 let value_type = get_value_type(value_type_id);
169 (value_type.has_trait(&trait_type), Some(value_type))
170 })
171 .await
172 }
173
174 pub(crate) async fn resolve_value(
175 self,
176 value_type: ValueTypeId,
177 ) -> Result<Option<RawVc>, ResolveTypeError> {
178 self.resolve_type_inner(|cell_value_type| (cell_value_type == value_type, None))
179 .await
180 }
181
182 async fn resolve_type_inner(
190 self,
191 conditional: impl FnOnce(ValueTypeId) -> (bool, Option<&'static ValueType>),
192 ) -> Result<Option<RawVc>, ResolveTypeError> {
193 let tt = turbo_tasks();
194 let mut current = self;
195 loop {
196 match current {
197 RawVc::TaskOutput(task) => {
198 current = read_task_output(&*tt, task, ReadOutputOptions::default())
199 .await
200 .map_err(|source| ResolveTypeError::TaskError { source })?;
201 }
202 RawVc::TaskCell(task, index) => {
203 let (ok, value_type) = conditional(index.type_id);
204 if !ok {
205 return Ok(None);
206 }
207 let value_type =
208 value_type.unwrap_or_else(|| registry::get_value_type(index.type_id));
209 let content = read_task_cell(
210 &*tt,
211 task,
212 index,
213 ReadCellOptions {
214 is_serializable_cell_content: value_type.bincode.is_some(),
215 final_read_hint: false,
216 tracking: ReadTracking::default(),
217 },
218 )
219 .await
220 .map_err(|source| ResolveTypeError::ReadError { source })?;
221 if let TypedCellContent(_, CellContent(Some(_))) = content {
222 return Ok(Some(RawVc::TaskCell(task, index)));
223 } else {
224 return Err(ResolveTypeError::NoContent);
225 }
226 }
227 RawVc::LocalOutput(execution_id, local_task_id, ..) => {
228 current = read_local_output(&*tt, execution_id, local_task_id)
229 .await
230 .map_err(|source| ResolveTypeError::TaskError { source })?;
231 }
232 }
233 }
234 }
235
236 pub(crate) async fn resolve(self) -> Result<RawVc> {
238 self.resolve_inner(ReadOutputOptions {
239 tracking: ReadTracking::default(),
240 consistency: ReadConsistency::Eventual,
241 })
242 .await
243 }
244
245 pub(crate) async fn resolve_strongly_consistent(self) -> Result<RawVc> {
247 self.resolve_inner(ReadOutputOptions {
248 tracking: ReadTracking::default(),
249 consistency: ReadConsistency::Strong,
250 })
251 .await
252 }
253
254 async fn resolve_inner(self, mut options: ReadOutputOptions) -> Result<RawVc> {
255 let tt = turbo_tasks();
256 let mut current = self;
257 loop {
258 match current {
259 RawVc::TaskOutput(task) => {
260 current = read_task_output(&*tt, task, options).await?;
261 options.consistency = ReadConsistency::Eventual;
265 }
266 RawVc::TaskCell(_, _) => return Ok(current),
267 RawVc::LocalOutput(execution_id, local_task_id, ..) => {
268 debug_assert_eq!(options.consistency, ReadConsistency::Eventual);
269 current = read_local_output(&*tt, execution_id, local_task_id).await?;
270 }
271 }
272 }
273 }
274
275 pub(crate) async fn to_non_local(self) -> Result<RawVc> {
278 Ok(match self {
279 RawVc::LocalOutput(execution_id, local_task_id, ..) => {
280 let tt = turbo_tasks();
281 let local_output = read_local_output(&*tt, execution_id, local_task_id).await?;
282 debug_assert!(
283 !matches!(local_output, RawVc::LocalOutput(_, _, _)),
284 "a LocalOutput cannot point at other LocalOutputs"
285 );
286 local_output
287 }
288 non_local => non_local,
289 })
290 }
291
292 pub(crate) fn connect(&self) {
293 let RawVc::TaskOutput(task_id) = self else {
294 panic!("RawVc::connect() must only be called on a RawVc::TaskOutput");
295 };
296 let tt = turbo_tasks();
297 tt.connect_task(*task_id);
298 }
299
300 pub fn try_get_task_id(&self) -> Option<TaskId> {
301 match self {
302 RawVc::TaskOutput(t) | RawVc::TaskCell(t, ..) => Some(*t),
303 RawVc::LocalOutput(..) => None,
304 }
305 }
306
307 pub fn try_get_type_id(&self) -> Option<ValueTypeId> {
308 match self {
309 RawVc::TaskCell(_, CellId { type_id, .. }) => Some(*type_id),
310 RawVc::TaskOutput(..) | RawVc::LocalOutput(..) => None,
311 }
312 }
313
314 pub(crate) fn resolved_has_trait(&self, trait_id: TraitTypeId) -> bool {
317 match self {
318 RawVc::TaskCell(_task_id, cell_id) => {
319 get_value_type(cell_id.type_id).has_trait(&trait_id)
320 }
321 _ => unreachable!("resolved_has_trait must be called with a RawVc::TaskCell"),
322 }
323 }
324
325 pub(crate) fn resolved_is_type(&self, type_id: ValueTypeId) -> bool {
328 match self {
329 RawVc::TaskCell(_task_id, cell_id) => cell_id.type_id == type_id,
330 _ => unreachable!("resolved_is_type must be called with a RawVc::TaskCell"),
331 }
332 }
333}
334
335impl CollectiblesSource for RawVc {
337 fn peek_collectibles<T: VcValueTrait + ?Sized>(self) -> AutoSet<ResolvedVc<T>> {
338 let RawVc::TaskOutput(task_id) = self else {
339 panic!(
340 "<RawVc as CollectiblesSource>::peek_collectibles() must only be called on a \
341 RawVc::TaskOutput"
342 );
343 };
344 let tt = turbo_tasks();
345 let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
346 map.into_iter()
347 .filter_map(|(raw, count)| (count > 0).then_some(raw.try_into().unwrap()))
348 .collect()
349 }
350
351 fn take_collectibles<T: VcValueTrait + ?Sized>(self) -> AutoSet<ResolvedVc<T>> {
352 let RawVc::TaskOutput(task_id) = self else {
353 panic!(
354 "<RawVc as CollectiblesSource>::take_collectibles() must only be called on a \
355 RawVc::TaskOutput"
356 );
357 };
358 let tt = turbo_tasks();
359 let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
360 tt.unemit_collectibles(T::get_trait_type_id(), &map);
361 map.into_iter()
362 .filter_map(|(raw, count)| (count > 0).then_some(raw.try_into().unwrap()))
363 .collect()
364 }
365
366 fn drop_collectibles<T: VcValueTrait + ?Sized>(self) {
367 let RawVc::TaskOutput(task_id) = self else {
368 panic!(
369 "<RawVc as CollectiblesSource>::drop_collectibles() must only be called on a \
370 RawVc::TaskOutput"
371 );
372 };
373 let tt = turbo_tasks();
374 let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
375 tt.unemit_collectibles(T::get_trait_type_id(), &map);
376 }
377}
378
379pub struct ReadRawVcFuture {
380 current: RawVc,
381 read_output_options: ReadOutputOptions,
382 read_cell_options: ReadCellOptions,
383 is_serializable_cell_content_unknown: bool,
384 listener: Option<EventListener>,
385}
386
387impl ReadRawVcFuture {
388 pub(crate) fn new(vc: RawVc, is_serializable_cell_content: Option<bool>) -> Self {
389 ReadRawVcFuture {
390 current: vc,
391 read_output_options: ReadOutputOptions::default(),
392 read_cell_options: ReadCellOptions {
393 is_serializable_cell_content: is_serializable_cell_content.unwrap_or(false),
394 ..Default::default()
395 },
396 is_serializable_cell_content_unknown: is_serializable_cell_content.is_none(),
397 listener: None,
398 }
399 }
400
401 pub fn strongly_consistent(mut self) -> Self {
402 self.read_output_options.consistency = ReadConsistency::Strong;
403 self
404 }
405
406 pub fn untracked(mut self) -> Self {
412 self.read_output_options.tracking = ReadTracking::TrackOnlyError;
413 self.read_cell_options.tracking = ReadTracking::TrackOnlyError;
414 self
415 }
416
417 pub fn untracked_including_errors(mut self) -> Self {
423 self.read_output_options.tracking = ReadTracking::Untracked;
424 self.read_cell_options.tracking = ReadTracking::Untracked;
425 self
426 }
427
428 pub fn final_read_hint(mut self) -> Self {
429 self.read_cell_options.final_read_hint = true;
430 self
431 }
432}
433
434impl Future for ReadRawVcFuture {
435 type Output = Result<TypedCellContent>;
436
437 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
438 with_turbo_tasks(|tt| {
439 let this = unsafe { self.get_unchecked_mut() };
441 'outer: loop {
442 if let Some(listener) = &mut this.listener {
443 let listener = unsafe { Pin::new_unchecked(listener) };
445 if listener.poll(cx).is_pending() {
446 return Poll::Pending;
447 }
448 this.listener = None;
449 }
450 let mut listener = match this.current {
451 RawVc::TaskOutput(task) => {
452 let read_result = tt.try_read_task_output(task, this.read_output_options);
453 match read_result {
454 Ok(Ok(vc)) => {
455 this.read_output_options.consistency = ReadConsistency::Eventual;
459 this.current = vc;
460 continue 'outer;
461 }
462 Ok(Err(listener)) => listener,
463 Err(err) => return Poll::Ready(Err(err)),
464 }
465 }
466 RawVc::TaskCell(task, index) => {
467 if this.is_serializable_cell_content_unknown {
468 let value_type = registry::get_value_type(index.type_id);
469 this.read_cell_options.is_serializable_cell_content =
470 value_type.bincode.is_some();
471 }
472 let read_result =
473 tt.try_read_task_cell(task, index, this.read_cell_options);
474 match read_result {
475 Ok(Ok(content)) => {
476 return Poll::Ready(Ok(content));
478 }
479 Ok(Err(listener)) => listener,
480 Err(err) => return Poll::Ready(Err(err)),
481 }
482 }
483 RawVc::LocalOutput(execution_id, local_output_id, ..) => {
484 debug_assert_eq!(
485 this.read_output_options.consistency,
486 ReadConsistency::Eventual
487 );
488 let read_result = tt.try_read_local_output(execution_id, local_output_id);
489 match read_result {
490 Ok(Ok(vc)) => {
491 this.current = vc;
492 continue 'outer;
493 }
494 Ok(Err(listener)) => listener,
495 Err(err) => return Poll::Ready(Err(err)),
496 }
497 }
498 };
499 match unsafe { Pin::new_unchecked(&mut listener) }.poll(cx) {
501 Poll::Ready(_) => continue,
502 Poll::Pending => {
503 this.listener = Some(listener);
504 return Poll::Pending;
505 }
506 };
507 }
508 })
509 }
510}
511
512impl Unpin for ReadRawVcFuture {}