1use std::{
2 fmt::{Debug, Display},
3 future::Future,
4 pin::Pin,
5 task::Poll,
6};
7
8use anyhow::Result;
9use auto_hash_map::AutoSet;
10use serde::{Deserialize, Serialize};
11use thiserror::Error;
12
13use crate::{
14 CollectiblesSource, ReadCellOptions, ReadConsistency, ResolvedVc, TaskId, TaskPersistence,
15 TraitTypeId, ValueType, ValueTypeId, VcValueTrait,
16 backend::{CellContent, TypedCellContent},
17 event::EventListener,
18 id::{ExecutionId, LocalTaskId},
19 manager::{read_local_output, read_task_cell, read_task_output, with_turbo_tasks},
20 registry::{self, get_value_type},
21 turbo_tasks,
22};
23
24#[derive(Error, Debug)]
25pub enum ResolveTypeError {
26 #[error("no content in the cell")]
27 NoContent,
28 #[error("the content in the cell has no type")]
29 UntypedContent,
30 #[error("content is not available as task execution failed")]
31 TaskError { source: anyhow::Error },
32 #[error("reading the cell content failed")]
33 ReadError { source: anyhow::Error },
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
37pub struct CellId {
38 pub type_id: ValueTypeId,
39 pub index: u32,
40}
41
42impl Display for CellId {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 write!(
45 f,
46 "{}#{}",
47 registry::get_value_type(self.type_id).name,
48 self.index
49 )
50 }
51}
52
53#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
63pub enum RawVc {
64 TaskOutput(TaskId),
67 TaskCell(TaskId, CellId),
72 LocalOutput(ExecutionId, LocalTaskId, TaskPersistence),
80}
81
82impl Debug for RawVc {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 match self {
85 RawVc::TaskOutput(task_id) => f
86 .debug_tuple("RawVc::TaskOutput")
87 .field(&**task_id)
88 .finish(),
89 RawVc::TaskCell(task_id, cell_id) => f
90 .debug_tuple("RawVc::TaskCell")
91 .field(&**task_id)
92 .field(&cell_id.to_string())
93 .finish(),
94 RawVc::LocalOutput(execution_id, local_task_id, task_persistence) => f
95 .debug_tuple("RawVc::LocalOutput")
96 .field(&**execution_id)
97 .field(&**local_task_id)
98 .field(task_persistence)
99 .finish(),
100 }
101 }
102}
103
104impl RawVc {
105 pub fn is_resolved(&self) -> bool {
106 match self {
107 RawVc::TaskOutput(..) => false,
108 RawVc::TaskCell(..) => true,
109 RawVc::LocalOutput(..) => false,
110 }
111 }
112
113 pub fn is_local(&self) -> bool {
114 match self {
115 RawVc::TaskOutput(..) => false,
116 RawVc::TaskCell(..) => false,
117 RawVc::LocalOutput(..) => true,
118 }
119 }
120
121 pub fn is_transient(&self) -> bool {
126 match self {
127 RawVc::TaskOutput(task) | RawVc::TaskCell(task, ..) => task.is_transient(),
128 RawVc::LocalOutput(_, _, persistence) => *persistence == TaskPersistence::Transient,
129 }
130 }
131
132 pub(crate) fn into_read(self) -> ReadRawVcFuture {
133 ReadRawVcFuture::new(self)
136 }
137
138 pub(crate) async fn resolve_trait(
139 self,
140 trait_type: TraitTypeId,
141 ) -> Result<Option<RawVc>, ResolveTypeError> {
142 self.resolve_type_inner(|value_type_id| {
143 let value_type = get_value_type(value_type_id);
144 (value_type.has_trait(&trait_type), Some(value_type))
145 })
146 .await
147 }
148
149 pub(crate) async fn resolve_value(
150 self,
151 value_type: ValueTypeId,
152 ) -> Result<Option<RawVc>, ResolveTypeError> {
153 self.resolve_type_inner(|cell_value_type| (cell_value_type == value_type, None))
154 .await
155 }
156
157 async fn resolve_type_inner(
165 self,
166 conditional: impl FnOnce(ValueTypeId) -> (bool, Option<&'static ValueType>),
167 ) -> Result<Option<RawVc>, ResolveTypeError> {
168 let tt = turbo_tasks();
169 tt.notify_scheduled_tasks();
170 let mut current = self;
171 loop {
172 match current {
173 RawVc::TaskOutput(task) => {
174 current = read_task_output(&*tt, task, ReadConsistency::Eventual)
175 .await
176 .map_err(|source| ResolveTypeError::TaskError { source })?;
177 }
178 RawVc::TaskCell(task, index) => {
179 let content = read_task_cell(&*tt, task, index, ReadCellOptions::default())
180 .await
181 .map_err(|source| ResolveTypeError::ReadError { source })?;
182 if let TypedCellContent(value_type, CellContent(Some(_))) = content {
183 return Ok(if conditional(value_type).0 {
184 Some(RawVc::TaskCell(task, index))
185 } else {
186 None
187 });
188 } else {
189 return Err(ResolveTypeError::NoContent);
190 }
191 }
192 RawVc::LocalOutput(execution_id, local_task_id, ..) => {
193 current = read_local_output(&*tt, execution_id, local_task_id)
194 .await
195 .map_err(|source| ResolveTypeError::TaskError { source })?;
196 }
197 }
198 }
199 }
200
201 pub(crate) async fn resolve(self) -> Result<RawVc> {
203 self.resolve_inner(ReadConsistency::Eventual).await
204 }
205
206 pub(crate) async fn resolve_strongly_consistent(self) -> Result<RawVc> {
208 self.resolve_inner(ReadConsistency::Strong).await
209 }
210
211 async fn resolve_inner(self, mut consistency: ReadConsistency) -> Result<RawVc> {
212 let tt = turbo_tasks();
213 let mut current = self;
214 let mut notified = false;
215 loop {
216 match current {
217 RawVc::TaskOutput(task) => {
218 if !notified {
219 tt.notify_scheduled_tasks();
220 notified = true;
221 }
222 current = read_task_output(&*tt, task, consistency).await?;
223 consistency = ReadConsistency::Eventual;
227 }
228 RawVc::TaskCell(_, _) => return Ok(current),
229 RawVc::LocalOutput(execution_id, local_task_id, ..) => {
230 debug_assert_eq!(consistency, ReadConsistency::Eventual);
231 current = read_local_output(&*tt, execution_id, local_task_id).await?;
232 }
233 }
234 }
235 }
236
237 pub(crate) async fn to_non_local(self) -> Result<RawVc> {
240 let tt = turbo_tasks();
241 let mut current = self;
242 loop {
243 match current {
244 RawVc::LocalOutput(execution_id, local_task_id, ..) => {
245 current = read_local_output(&*tt, execution_id, local_task_id).await?;
246 }
247 non_local => return Ok(non_local),
248 }
249 }
250 }
251
252 pub(crate) fn connect(&self) {
253 let RawVc::TaskOutput(task_id) = self else {
254 panic!("RawVc::connect() must only be called on a RawVc::TaskOutput");
255 };
256 let tt = turbo_tasks();
257 tt.connect_task(*task_id);
258 }
259
260 pub fn try_get_task_id(&self) -> Option<TaskId> {
261 match self {
262 RawVc::TaskOutput(t) | RawVc::TaskCell(t, ..) => Some(*t),
263 RawVc::LocalOutput(..) => None,
264 }
265 }
266
267 pub fn try_get_type_id(&self) -> Option<ValueTypeId> {
268 match self {
269 RawVc::TaskCell(_, CellId { type_id, .. }) => Some(*type_id),
270 RawVc::TaskOutput(..) | RawVc::LocalOutput(..) => None,
271 }
272 }
273
274 pub(crate) fn resolved_has_trait(&self, trait_id: TraitTypeId) -> bool {
277 match self {
278 RawVc::TaskCell(_task_id, cell_id) => {
279 get_value_type(cell_id.type_id).has_trait(&trait_id)
280 }
281 _ => unreachable!("resolved_has_trait must be called with a RawVc::TaskCell"),
282 }
283 }
284
285 pub(crate) fn resolved_is_type(&self, type_id: ValueTypeId) -> bool {
288 match self {
289 RawVc::TaskCell(_task_id, cell_id) => cell_id.type_id == type_id,
290 _ => unreachable!("resolved_is_type must be called with a RawVc::TaskCell"),
291 }
292 }
293}
294
295impl CollectiblesSource for RawVc {
297 fn peek_collectibles<T: VcValueTrait + ?Sized>(self) -> AutoSet<ResolvedVc<T>> {
298 let RawVc::TaskOutput(task_id) = self else {
299 panic!(
300 "<RawVc as CollectiblesSource>::peek_collectibles() must only be called on a \
301 RawVc::TaskOutput"
302 );
303 };
304 let tt = turbo_tasks();
305 tt.notify_scheduled_tasks();
306 let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
307 map.into_iter()
308 .filter_map(|(raw, count)| (count > 0).then_some(raw.try_into().unwrap()))
309 .collect()
310 }
311
312 fn take_collectibles<T: VcValueTrait + ?Sized>(self) -> AutoSet<ResolvedVc<T>> {
313 let RawVc::TaskOutput(task_id) = self else {
314 panic!(
315 "<RawVc as CollectiblesSource>::take_collectibles() must only be called on a \
316 RawVc::TaskOutput"
317 );
318 };
319 let tt = turbo_tasks();
320 tt.notify_scheduled_tasks();
321 let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
322 tt.unemit_collectibles(T::get_trait_type_id(), &map);
323 map.into_iter()
324 .filter_map(|(raw, count)| (count > 0).then_some(raw.try_into().unwrap()))
325 .collect()
326 }
327}
328
329pub struct ReadRawVcFuture {
330 consistency: ReadConsistency,
331 current: RawVc,
332 untracked: bool,
333 read_cell_options: ReadCellOptions,
334 listener: Option<EventListener>,
335}
336
337impl ReadRawVcFuture {
338 pub(crate) fn new(vc: RawVc) -> Self {
339 ReadRawVcFuture {
340 consistency: ReadConsistency::Eventual,
341 current: vc,
342 untracked: false,
343 read_cell_options: ReadCellOptions::default(),
344 listener: None,
345 }
346 }
347
348 pub fn strongly_consistent(mut self) -> Self {
349 self.consistency = ReadConsistency::Strong;
350 self
351 }
352
353 pub fn untracked(mut self) -> Self {
356 self.untracked = true;
357 self
358 }
359
360 pub fn final_read_hint(mut self) -> Self {
361 self.read_cell_options.final_read_hint = true;
362 self
363 }
364}
365
366impl Future for ReadRawVcFuture {
367 type Output = Result<TypedCellContent>;
368
369 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
370 with_turbo_tasks(|tt| {
371 tt.notify_scheduled_tasks();
372 let this = unsafe { self.get_unchecked_mut() };
374 'outer: loop {
375 if let Some(listener) = &mut this.listener {
376 let listener = unsafe { Pin::new_unchecked(listener) };
378 if listener.poll(cx).is_pending() {
379 return Poll::Pending;
380 }
381 this.listener = None;
382 }
383 let mut listener = match this.current {
384 RawVc::TaskOutput(task) => {
385 let read_result = if this.untracked {
386 tt.try_read_task_output_untracked(task, this.consistency)
387 } else {
388 tt.try_read_task_output(task, this.consistency)
389 };
390 match read_result {
391 Ok(Ok(vc)) => {
392 this.consistency = ReadConsistency::Eventual;
396 this.current = vc;
397 continue 'outer;
398 }
399 Ok(Err(listener)) => listener,
400 Err(err) => return Poll::Ready(Err(err)),
401 }
402 }
403 RawVc::TaskCell(task, index) => {
404 let read_result = if this.untracked {
405 tt.try_read_task_cell_untracked(task, index, this.read_cell_options)
406 } else {
407 tt.try_read_task_cell(task, index, this.read_cell_options)
408 };
409 match read_result {
410 Ok(Ok(content)) => {
411 return Poll::Ready(Ok(content));
413 }
414 Ok(Err(listener)) => listener,
415 Err(err) => return Poll::Ready(Err(err)),
416 }
417 }
418 RawVc::LocalOutput(execution_id, local_output_id, ..) => {
419 debug_assert_eq!(this.consistency, ReadConsistency::Eventual);
420 let read_result = tt.try_read_local_output(execution_id, local_output_id);
421 match read_result {
422 Ok(Ok(vc)) => {
423 this.current = vc;
424 continue 'outer;
425 }
426 Ok(Err(listener)) => listener,
427 Err(err) => return Poll::Ready(Err(err)),
428 }
429 }
430 };
431 match unsafe { Pin::new_unchecked(&mut listener) }.poll(cx) {
433 Poll::Ready(_) => continue,
434 Poll::Pending => {
435 this.listener = Some(listener);
436 return Poll::Pending;
437 }
438 };
439 }
440 })
441 }
442}
443
444impl Unpin for ReadRawVcFuture {}