1use std::{
2 fmt::{Debug, Display},
3 future::Future,
4 pin::Pin,
5 task::Poll,
6};
7
8use anyhow::Result;
9use auto_hash_map::AutoSet;
10use serde::{Deserialize, Serialize};
11use thiserror::Error;
12
13use crate::{
14 CollectiblesSource, ReadCellOptions, ReadConsistency, ReadOutputOptions, ResolvedVc, TaskId,
15 TaskPersistence, TraitTypeId, ValueType, ValueTypeId, VcValueTrait,
16 backend::{CellContent, TypedCellContent},
17 event::EventListener,
18 id::{ExecutionId, LocalTaskId},
19 manager::{
20 ReadTracking, read_local_output, read_task_cell, read_task_output, with_turbo_tasks,
21 },
22 registry::{self, get_value_type},
23 turbo_tasks,
24};
25
26#[derive(Error, Debug)]
27pub enum ResolveTypeError {
28 #[error("no content in the cell")]
29 NoContent,
30 #[error("the content in the cell has no type")]
31 UntypedContent,
32 #[error("content is not available as task execution failed")]
33 TaskError { source: anyhow::Error },
34 #[error("reading the cell content failed")]
35 ReadError { source: anyhow::Error },
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
39pub struct CellId {
40 pub type_id: ValueTypeId,
41 pub index: u32,
42}
43
44impl Display for CellId {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 write!(
47 f,
48 "{}#{}",
49 registry::get_value_type(self.type_id).name,
50 self.index
51 )
52 }
53}
54
55#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
65pub enum RawVc {
66 TaskOutput(TaskId),
69 TaskCell(TaskId, CellId),
74 LocalOutput(ExecutionId, LocalTaskId, TaskPersistence),
82}
83
84impl Debug for RawVc {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 match self {
87 RawVc::TaskOutput(task_id) => f
88 .debug_tuple("RawVc::TaskOutput")
89 .field(&**task_id)
90 .finish(),
91 RawVc::TaskCell(task_id, cell_id) => f
92 .debug_tuple("RawVc::TaskCell")
93 .field(&**task_id)
94 .field(&cell_id.to_string())
95 .finish(),
96 RawVc::LocalOutput(execution_id, local_task_id, task_persistence) => f
97 .debug_tuple("RawVc::LocalOutput")
98 .field(&**execution_id)
99 .field(&**local_task_id)
100 .field(task_persistence)
101 .finish(),
102 }
103 }
104}
105
106impl RawVc {
107 pub fn is_resolved(&self) -> bool {
108 match self {
109 RawVc::TaskOutput(..) => false,
110 RawVc::TaskCell(..) => true,
111 RawVc::LocalOutput(..) => false,
112 }
113 }
114
115 pub fn is_local(&self) -> bool {
116 match self {
117 RawVc::TaskOutput(..) => false,
118 RawVc::TaskCell(..) => false,
119 RawVc::LocalOutput(..) => true,
120 }
121 }
122
123 pub fn is_transient(&self) -> bool {
128 match self {
129 RawVc::TaskOutput(task) | RawVc::TaskCell(task, ..) => task.is_transient(),
130 RawVc::LocalOutput(_, _, persistence) => *persistence == TaskPersistence::Transient,
131 }
132 }
133
134 pub(crate) fn into_read(self) -> ReadRawVcFuture {
135 ReadRawVcFuture::new(self)
138 }
139
140 pub(crate) async fn resolve_trait(
141 self,
142 trait_type: TraitTypeId,
143 ) -> Result<Option<RawVc>, ResolveTypeError> {
144 self.resolve_type_inner(|value_type_id| {
145 let value_type = get_value_type(value_type_id);
146 (value_type.has_trait(&trait_type), Some(value_type))
147 })
148 .await
149 }
150
151 pub(crate) async fn resolve_value(
152 self,
153 value_type: ValueTypeId,
154 ) -> Result<Option<RawVc>, ResolveTypeError> {
155 self.resolve_type_inner(|cell_value_type| (cell_value_type == value_type, None))
156 .await
157 }
158
159 async fn resolve_type_inner(
167 self,
168 conditional: impl FnOnce(ValueTypeId) -> (bool, Option<&'static ValueType>),
169 ) -> Result<Option<RawVc>, ResolveTypeError> {
170 let tt = turbo_tasks();
171 let mut current = self;
172 loop {
173 match current {
174 RawVc::TaskOutput(task) => {
175 current = read_task_output(&*tt, task, ReadOutputOptions::default())
176 .await
177 .map_err(|source| ResolveTypeError::TaskError { source })?;
178 }
179 RawVc::TaskCell(task, index) => {
180 let content = read_task_cell(&*tt, task, index, ReadCellOptions::default())
181 .await
182 .map_err(|source| ResolveTypeError::ReadError { source })?;
183 if let TypedCellContent(value_type, CellContent(Some(_))) = content {
184 return Ok(if conditional(value_type).0 {
185 Some(RawVc::TaskCell(task, index))
186 } else {
187 None
188 });
189 } else {
190 return Err(ResolveTypeError::NoContent);
191 }
192 }
193 RawVc::LocalOutput(execution_id, local_task_id, ..) => {
194 current = read_local_output(&*tt, execution_id, local_task_id)
195 .await
196 .map_err(|source| ResolveTypeError::TaskError { source })?;
197 }
198 }
199 }
200 }
201
202 pub(crate) async fn resolve(self) -> Result<RawVc> {
204 self.resolve_inner(ReadOutputOptions {
205 tracking: ReadTracking::default(),
206 consistency: ReadConsistency::Eventual,
207 })
208 .await
209 }
210
211 pub(crate) async fn resolve_strongly_consistent(self) -> Result<RawVc> {
213 self.resolve_inner(ReadOutputOptions {
214 tracking: ReadTracking::default(),
215 consistency: ReadConsistency::Strong,
216 })
217 .await
218 }
219
220 async fn resolve_inner(self, mut options: ReadOutputOptions) -> Result<RawVc> {
221 let tt = turbo_tasks();
222 let mut current = self;
223 loop {
224 match current {
225 RawVc::TaskOutput(task) => {
226 current = read_task_output(&*tt, task, options).await?;
227 options.consistency = ReadConsistency::Eventual;
231 }
232 RawVc::TaskCell(_, _) => return Ok(current),
233 RawVc::LocalOutput(execution_id, local_task_id, ..) => {
234 debug_assert_eq!(options.consistency, ReadConsistency::Eventual);
235 current = read_local_output(&*tt, execution_id, local_task_id).await?;
236 }
237 }
238 }
239 }
240
241 pub(crate) async fn to_non_local(self) -> Result<RawVc> {
244 let tt = turbo_tasks();
245 let mut current = self;
246 loop {
247 match current {
248 RawVc::LocalOutput(execution_id, local_task_id, ..) => {
249 current = read_local_output(&*tt, execution_id, local_task_id).await?;
250 }
251 non_local => return Ok(non_local),
252 }
253 }
254 }
255
256 pub(crate) fn connect(&self) {
257 let RawVc::TaskOutput(task_id) = self else {
258 panic!("RawVc::connect() must only be called on a RawVc::TaskOutput");
259 };
260 let tt = turbo_tasks();
261 tt.connect_task(*task_id);
262 }
263
264 pub fn try_get_task_id(&self) -> Option<TaskId> {
265 match self {
266 RawVc::TaskOutput(t) | RawVc::TaskCell(t, ..) => Some(*t),
267 RawVc::LocalOutput(..) => None,
268 }
269 }
270
271 pub fn try_get_type_id(&self) -> Option<ValueTypeId> {
272 match self {
273 RawVc::TaskCell(_, CellId { type_id, .. }) => Some(*type_id),
274 RawVc::TaskOutput(..) | RawVc::LocalOutput(..) => None,
275 }
276 }
277
278 pub(crate) fn resolved_has_trait(&self, trait_id: TraitTypeId) -> bool {
281 match self {
282 RawVc::TaskCell(_task_id, cell_id) => {
283 get_value_type(cell_id.type_id).has_trait(&trait_id)
284 }
285 _ => unreachable!("resolved_has_trait must be called with a RawVc::TaskCell"),
286 }
287 }
288
289 pub(crate) fn resolved_is_type(&self, type_id: ValueTypeId) -> bool {
292 match self {
293 RawVc::TaskCell(_task_id, cell_id) => cell_id.type_id == type_id,
294 _ => unreachable!("resolved_is_type must be called with a RawVc::TaskCell"),
295 }
296 }
297}
298
299impl CollectiblesSource for RawVc {
301 fn peek_collectibles<T: VcValueTrait + ?Sized>(self) -> AutoSet<ResolvedVc<T>> {
302 let RawVc::TaskOutput(task_id) = self else {
303 panic!(
304 "<RawVc as CollectiblesSource>::peek_collectibles() must only be called on a \
305 RawVc::TaskOutput"
306 );
307 };
308 let tt = turbo_tasks();
309 let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
310 map.into_iter()
311 .filter_map(|(raw, count)| (count > 0).then_some(raw.try_into().unwrap()))
312 .collect()
313 }
314
315 fn take_collectibles<T: VcValueTrait + ?Sized>(self) -> AutoSet<ResolvedVc<T>> {
316 let RawVc::TaskOutput(task_id) = self else {
317 panic!(
318 "<RawVc as CollectiblesSource>::take_collectibles() must only be called on a \
319 RawVc::TaskOutput"
320 );
321 };
322 let tt = turbo_tasks();
323 let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
324 tt.unemit_collectibles(T::get_trait_type_id(), &map);
325 map.into_iter()
326 .filter_map(|(raw, count)| (count > 0).then_some(raw.try_into().unwrap()))
327 .collect()
328 }
329
330 fn drop_collectibles<T: VcValueTrait + ?Sized>(self) {
331 let RawVc::TaskOutput(task_id) = self else {
332 panic!(
333 "<RawVc as CollectiblesSource>::drop_collectibles() must only be called on a \
334 RawVc::TaskOutput"
335 );
336 };
337 let tt = turbo_tasks();
338 let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
339 tt.unemit_collectibles(T::get_trait_type_id(), &map);
340 }
341}
342
343pub struct ReadRawVcFuture {
344 current: RawVc,
345 read_output_options: ReadOutputOptions,
346 read_cell_options: ReadCellOptions,
347 listener: Option<EventListener>,
348}
349
350impl ReadRawVcFuture {
351 pub(crate) fn new(vc: RawVc) -> Self {
352 ReadRawVcFuture {
353 current: vc,
354 read_output_options: ReadOutputOptions::default(),
355 read_cell_options: ReadCellOptions::default(),
356 listener: None,
357 }
358 }
359
360 pub fn strongly_consistent(mut self) -> Self {
361 self.read_output_options.consistency = ReadConsistency::Strong;
362 self
363 }
364
365 pub fn untracked(mut self) -> Self {
371 self.read_output_options.tracking = ReadTracking::TrackOnlyError;
372 self.read_cell_options.tracking = ReadTracking::TrackOnlyError;
373 self
374 }
375
376 pub fn untracked_including_errors(mut self) -> Self {
382 self.read_output_options.tracking = ReadTracking::Untracked;
383 self.read_cell_options.tracking = ReadTracking::Untracked;
384 self
385 }
386
387 pub fn final_read_hint(mut self) -> Self {
388 self.read_cell_options.final_read_hint = true;
389 self
390 }
391}
392
393impl Future for ReadRawVcFuture {
394 type Output = Result<TypedCellContent>;
395
396 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
397 with_turbo_tasks(|tt| {
398 let this = unsafe { self.get_unchecked_mut() };
400 'outer: loop {
401 if let Some(listener) = &mut this.listener {
402 let listener = unsafe { Pin::new_unchecked(listener) };
404 if listener.poll(cx).is_pending() {
405 return Poll::Pending;
406 }
407 this.listener = None;
408 }
409 let mut listener = match this.current {
410 RawVc::TaskOutput(task) => {
411 let read_result = tt.try_read_task_output(task, this.read_output_options);
412 match read_result {
413 Ok(Ok(vc)) => {
414 this.read_output_options.consistency = ReadConsistency::Eventual;
418 this.current = vc;
419 continue 'outer;
420 }
421 Ok(Err(listener)) => listener,
422 Err(err) => return Poll::Ready(Err(err)),
423 }
424 }
425 RawVc::TaskCell(task, index) => {
426 let read_result =
427 tt.try_read_task_cell(task, index, this.read_cell_options);
428 match read_result {
429 Ok(Ok(content)) => {
430 return Poll::Ready(Ok(content));
432 }
433 Ok(Err(listener)) => listener,
434 Err(err) => return Poll::Ready(Err(err)),
435 }
436 }
437 RawVc::LocalOutput(execution_id, local_output_id, ..) => {
438 debug_assert_eq!(
439 this.read_output_options.consistency,
440 ReadConsistency::Eventual
441 );
442 let read_result = tt.try_read_local_output(execution_id, local_output_id);
443 match read_result {
444 Ok(Ok(vc)) => {
445 this.current = vc;
446 continue 'outer;
447 }
448 Ok(Err(listener)) => listener,
449 Err(err) => return Poll::Ready(Err(err)),
450 }
451 }
452 };
453 match unsafe { Pin::new_unchecked(&mut listener) }.poll(cx) {
455 Poll::Ready(_) => continue,
456 Poll::Pending => {
457 this.listener = Some(listener);
458 return Poll::Pending;
459 }
460 };
461 }
462 })
463 }
464}
465
466impl Unpin for ReadRawVcFuture {}