1use std::{fmt::Display, future::Future, pin::Pin, task::Poll};
2
3use anyhow::Result;
4use auto_hash_map::AutoSet;
5use serde::{Deserialize, Serialize};
6use thiserror::Error;
7
8use crate::{
9 CollectiblesSource, ReadCellOptions, ReadConsistency, ResolvedVc, TaskId, TaskPersistence,
10 TraitTypeId, ValueType, ValueTypeId, VcValueTrait,
11 backend::{CellContent, TypedCellContent},
12 event::EventListener,
13 id::{ExecutionId, LocalTaskId},
14 manager::{read_local_output, read_task_cell, read_task_output, with_turbo_tasks},
15 registry::{self, get_value_type},
16 turbo_tasks,
17};
18
19#[derive(Error, Debug)]
20pub enum ResolveTypeError {
21 #[error("no content in the cell")]
22 NoContent,
23 #[error("the content in the cell has no type")]
24 UntypedContent,
25 #[error("content is not available as task execution failed")]
26 TaskError { source: anyhow::Error },
27 #[error("reading the cell content failed")]
28 ReadError { source: anyhow::Error },
29}
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
32pub struct CellId {
33 pub type_id: ValueTypeId,
34 pub index: u32,
35}
36
37impl Display for CellId {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 write!(
40 f,
41 "{}#{}",
42 registry::get_value_type(self.type_id).name,
43 self.index
44 )
45 }
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
58pub enum RawVc {
59 TaskOutput(TaskId),
62 TaskCell(TaskId, CellId),
67 LocalOutput(ExecutionId, LocalTaskId, TaskPersistence),
75}
76
77impl RawVc {
78 pub fn is_resolved(&self) -> bool {
79 match self {
80 RawVc::TaskOutput(..) => false,
81 RawVc::TaskCell(..) => true,
82 RawVc::LocalOutput(..) => false,
83 }
84 }
85
86 pub fn is_local(&self) -> bool {
87 match self {
88 RawVc::TaskOutput(..) => false,
89 RawVc::TaskCell(..) => false,
90 RawVc::LocalOutput(..) => true,
91 }
92 }
93
94 pub fn is_transient(&self) -> bool {
99 match self {
100 RawVc::TaskOutput(task) | RawVc::TaskCell(task, ..) => task.is_transient(),
101 RawVc::LocalOutput(_, _, persistence) => *persistence == TaskPersistence::Transient,
102 }
103 }
104
105 pub(crate) fn into_read(self) -> ReadRawVcFuture {
106 ReadRawVcFuture::new(self)
109 }
110
111 pub(crate) async fn resolve_trait(
112 self,
113 trait_type: TraitTypeId,
114 ) -> Result<Option<RawVc>, ResolveTypeError> {
115 self.resolve_type_inner(|value_type_id| {
116 let value_type = get_value_type(value_type_id);
117 (value_type.has_trait(&trait_type), Some(value_type))
118 })
119 .await
120 }
121
122 pub(crate) async fn resolve_value(
123 self,
124 value_type: ValueTypeId,
125 ) -> Result<Option<RawVc>, ResolveTypeError> {
126 self.resolve_type_inner(|cell_value_type| (cell_value_type == value_type, None))
127 .await
128 }
129
130 async fn resolve_type_inner(
138 self,
139 conditional: impl FnOnce(ValueTypeId) -> (bool, Option<&'static ValueType>),
140 ) -> Result<Option<RawVc>, ResolveTypeError> {
141 let tt = turbo_tasks();
142 tt.notify_scheduled_tasks();
143 let mut current = self;
144 loop {
145 match current {
146 RawVc::TaskOutput(task) => {
147 current = read_task_output(&*tt, task, ReadConsistency::Eventual)
148 .await
149 .map_err(|source| ResolveTypeError::TaskError { source })?;
150 }
151 RawVc::TaskCell(task, index) => {
152 let content = read_task_cell(&*tt, task, index, ReadCellOptions::default())
153 .await
154 .map_err(|source| ResolveTypeError::ReadError { source })?;
155 if let TypedCellContent(value_type, CellContent(Some(_))) = content {
156 return Ok(if conditional(value_type).0 {
157 Some(RawVc::TaskCell(task, index))
158 } else {
159 None
160 });
161 } else {
162 return Err(ResolveTypeError::NoContent);
163 }
164 }
165 RawVc::LocalOutput(execution_id, local_task_id, ..) => {
166 current = read_local_output(&*tt, execution_id, local_task_id)
167 .await
168 .map_err(|source| ResolveTypeError::TaskError { source })?;
169 }
170 }
171 }
172 }
173
174 pub(crate) async fn resolve(self) -> Result<RawVc> {
176 self.resolve_inner(ReadConsistency::Eventual).await
177 }
178
179 pub(crate) async fn resolve_strongly_consistent(self) -> Result<RawVc> {
181 self.resolve_inner(ReadConsistency::Strong).await
182 }
183
184 async fn resolve_inner(self, mut consistency: ReadConsistency) -> Result<RawVc> {
185 let tt = turbo_tasks();
186 let mut current = self;
187 let mut notified = false;
188 loop {
189 match current {
190 RawVc::TaskOutput(task) => {
191 if !notified {
192 tt.notify_scheduled_tasks();
193 notified = true;
194 }
195 current = read_task_output(&*tt, task, consistency).await?;
196 consistency = ReadConsistency::Eventual;
200 }
201 RawVc::TaskCell(_, _) => return Ok(current),
202 RawVc::LocalOutput(execution_id, local_task_id, ..) => {
203 debug_assert_eq!(consistency, ReadConsistency::Eventual);
204 current = read_local_output(&*tt, execution_id, local_task_id).await?;
205 }
206 }
207 }
208 }
209
210 pub(crate) async fn to_non_local(self) -> Result<RawVc> {
213 let tt = turbo_tasks();
214 let mut current = self;
215 loop {
216 match current {
217 RawVc::LocalOutput(execution_id, local_task_id, ..) => {
218 current = read_local_output(&*tt, execution_id, local_task_id).await?;
219 }
220 non_local => return Ok(non_local),
221 }
222 }
223 }
224
225 pub(crate) fn connect(&self) {
226 let RawVc::TaskOutput(task_id) = self else {
227 panic!("RawVc::connect() must only be called on a RawVc::TaskOutput");
228 };
229 let tt = turbo_tasks();
230 tt.connect_task(*task_id);
231 }
232
233 pub fn try_get_task_id(&self) -> Option<TaskId> {
234 match self {
235 RawVc::TaskOutput(t) | RawVc::TaskCell(t, ..) => Some(*t),
236 RawVc::LocalOutput(..) => None,
237 }
238 }
239
240 pub fn try_get_type_id(&self) -> Option<ValueTypeId> {
241 match self {
242 RawVc::TaskCell(_, CellId { type_id, .. }) => Some(*type_id),
243 RawVc::TaskOutput(..) | RawVc::LocalOutput(..) => None,
244 }
245 }
246
247 pub(crate) fn resolved_has_trait(&self, trait_id: TraitTypeId) -> bool {
250 match self {
251 RawVc::TaskCell(_task_id, cell_id) => {
252 get_value_type(cell_id.type_id).has_trait(&trait_id)
253 }
254 _ => unreachable!("resolved_has_trait must be called with a RawVc::TaskCell"),
255 }
256 }
257
258 pub(crate) fn resolved_is_type(&self, type_id: ValueTypeId) -> bool {
261 match self {
262 RawVc::TaskCell(_task_id, cell_id) => cell_id.type_id == type_id,
263 _ => unreachable!("resolved_is_type must be called with a RawVc::TaskCell"),
264 }
265 }
266}
267
268impl CollectiblesSource for RawVc {
270 fn peek_collectibles<T: VcValueTrait + ?Sized>(self) -> AutoSet<ResolvedVc<T>> {
271 let RawVc::TaskOutput(task_id) = self else {
272 panic!(
273 "<RawVc as CollectiblesSource>::peek_collectibles() must only be called on a \
274 RawVc::TaskOutput"
275 );
276 };
277 let tt = turbo_tasks();
278 tt.notify_scheduled_tasks();
279 let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
280 map.into_iter()
281 .filter_map(|(raw, count)| (count > 0).then_some(raw.try_into().unwrap()))
282 .collect()
283 }
284
285 fn take_collectibles<T: VcValueTrait + ?Sized>(self) -> AutoSet<ResolvedVc<T>> {
286 let RawVc::TaskOutput(task_id) = self else {
287 panic!(
288 "<RawVc as CollectiblesSource>::take_collectibles() must only be called on a \
289 RawVc::TaskOutput"
290 );
291 };
292 let tt = turbo_tasks();
293 tt.notify_scheduled_tasks();
294 let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
295 tt.unemit_collectibles(T::get_trait_type_id(), &map);
296 map.into_iter()
297 .filter_map(|(raw, count)| (count > 0).then_some(raw.try_into().unwrap()))
298 .collect()
299 }
300}
301
302pub struct ReadRawVcFuture {
303 consistency: ReadConsistency,
304 current: RawVc,
305 untracked: bool,
306 read_cell_options: ReadCellOptions,
307 listener: Option<EventListener>,
308}
309
310impl ReadRawVcFuture {
311 pub(crate) fn new(vc: RawVc) -> Self {
312 ReadRawVcFuture {
313 consistency: ReadConsistency::Eventual,
314 current: vc,
315 untracked: false,
316 read_cell_options: ReadCellOptions::default(),
317 listener: None,
318 }
319 }
320
321 pub fn strongly_consistent(mut self) -> Self {
322 self.consistency = ReadConsistency::Strong;
323 self
324 }
325
326 pub fn untracked(mut self) -> Self {
329 self.untracked = true;
330 self
331 }
332
333 pub fn final_read_hint(mut self) -> Self {
334 self.read_cell_options.final_read_hint = true;
335 self
336 }
337}
338
339impl Future for ReadRawVcFuture {
340 type Output = Result<TypedCellContent>;
341
342 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
343 with_turbo_tasks(|tt| {
344 tt.notify_scheduled_tasks();
345 let this = unsafe { self.get_unchecked_mut() };
347 'outer: loop {
348 if let Some(listener) = &mut this.listener {
349 let listener = unsafe { Pin::new_unchecked(listener) };
351 if listener.poll(cx).is_pending() {
352 return Poll::Pending;
353 }
354 this.listener = None;
355 }
356 let mut listener = match this.current {
357 RawVc::TaskOutput(task) => {
358 let read_result = if this.untracked {
359 tt.try_read_task_output_untracked(task, this.consistency)
360 } else {
361 tt.try_read_task_output(task, this.consistency)
362 };
363 match read_result {
364 Ok(Ok(vc)) => {
365 this.consistency = ReadConsistency::Eventual;
369 this.current = vc;
370 continue 'outer;
371 }
372 Ok(Err(listener)) => listener,
373 Err(err) => return Poll::Ready(Err(err)),
374 }
375 }
376 RawVc::TaskCell(task, index) => {
377 let read_result = if this.untracked {
378 tt.try_read_task_cell_untracked(task, index, this.read_cell_options)
379 } else {
380 tt.try_read_task_cell(task, index, this.read_cell_options)
381 };
382 match read_result {
383 Ok(Ok(content)) => {
384 return Poll::Ready(Ok(content));
386 }
387 Ok(Err(listener)) => listener,
388 Err(err) => return Poll::Ready(Err(err)),
389 }
390 }
391 RawVc::LocalOutput(execution_id, local_output_id, ..) => {
392 debug_assert_eq!(this.consistency, ReadConsistency::Eventual);
393 let read_result = tt.try_read_local_output(execution_id, local_output_id);
394 match read_result {
395 Ok(Ok(vc)) => {
396 this.current = vc;
397 continue 'outer;
398 }
399 Ok(Err(listener)) => listener,
400 Err(err) => return Poll::Ready(Err(err)),
401 }
402 }
403 };
404 match unsafe { Pin::new_unchecked(&mut listener) }.poll(cx) {
406 Poll::Ready(_) => continue,
407 Poll::Pending => {
408 this.listener = Some(listener);
409 return Poll::Pending;
410 }
411 };
412 }
413 })
414 }
415}
416
417impl Unpin for ReadRawVcFuture {}