1use std::{
2 fmt::{Debug, Display},
3 future::Future,
4 pin::Pin,
5 task::Poll,
6};
7
8use anyhow::Result;
9use auto_hash_map::AutoSet;
10use bincode::{Decode, Encode};
11use serde::{Deserialize, Serialize};
12
13use crate::{
14 CollectiblesSource, ReadCellOptions, ReadConsistency, ReadOutputOptions, ResolvedVc, TaskId,
15 TaskPersistence, TraitTypeId, ValueTypeId, VcValueTrait,
16 backend::TypedCellContent,
17 event::EventListener,
18 id::{ExecutionId, LocalTaskId},
19 manager::{
20 ReadCellTracking, ReadTracking, read_local_output, read_task_output, with_turbo_tasks,
21 },
22 registry::{self, get_value_type},
23 turbo_tasks,
24};
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
27pub struct CellId {
28 pub type_id: ValueTypeId,
29 pub index: u32,
30}
31
32impl Display for CellId {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 write!(
35 f,
36 "{}#{}",
37 registry::get_value_type(self.type_id).name,
38 self.index
39 )
40 }
41}
42
43#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
53pub enum RawVc {
54 TaskOutput(TaskId),
57 TaskCell(TaskId, CellId),
62 LocalOutput(ExecutionId, LocalTaskId, TaskPersistence),
70}
71
72impl Debug for RawVc {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 match self {
75 RawVc::TaskOutput(task_id) => f
76 .debug_tuple("RawVc::TaskOutput")
77 .field(&**task_id)
78 .finish(),
79 RawVc::TaskCell(task_id, cell_id) => f
80 .debug_tuple("RawVc::TaskCell")
81 .field(&**task_id)
82 .field(&cell_id.to_string())
83 .finish(),
84 RawVc::LocalOutput(execution_id, local_task_id, task_persistence) => f
85 .debug_tuple("RawVc::LocalOutput")
86 .field(&**execution_id)
87 .field(&**local_task_id)
88 .field(task_persistence)
89 .finish(),
90 }
91 }
92}
93
94impl Display for RawVc {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 match self {
97 RawVc::TaskOutput(task_id) => write!(f, "output of task {}", **task_id),
98 RawVc::TaskCell(task_id, cell_id) => {
99 write!(f, "{} of task {}", cell_id, **task_id)
100 }
101 RawVc::LocalOutput(execution_id, local_task_id, task_persistence) => write!(
102 f,
103 "output of local task {} ({}, {})",
104 **local_task_id, **execution_id, task_persistence
105 ),
106 }
107 }
108}
109
110impl RawVc {
111 pub fn is_resolved(&self) -> bool {
112 match self {
113 RawVc::TaskOutput(..) => false,
114 RawVc::TaskCell(..) => true,
115 RawVc::LocalOutput(..) => false,
116 }
117 }
118
119 pub fn is_local(&self) -> bool {
120 match self {
121 RawVc::TaskOutput(..) => false,
122 RawVc::TaskCell(..) => false,
123 RawVc::LocalOutput(..) => true,
124 }
125 }
126
127 pub fn is_transient(&self) -> bool {
132 match self {
133 RawVc::TaskOutput(task) | RawVc::TaskCell(task, ..) => task.is_transient(),
134 RawVc::LocalOutput(_, _, persistence) => *persistence == TaskPersistence::Transient,
135 }
136 }
137
138 pub(crate) fn into_read(self, is_serializable_cell_content: bool) -> ReadRawVcFuture {
139 ReadRawVcFuture::new(self, Some(is_serializable_cell_content))
142 }
143
144 pub(crate) fn into_read_with_unknown_is_serializable_cell_content(self) -> ReadRawVcFuture {
145 ReadRawVcFuture::new(self, None)
148 }
149
150 pub(crate) async fn resolve(self) -> Result<RawVc> {
152 self.resolve_inner(ReadOutputOptions {
153 consistency: ReadConsistency::Eventual,
154 ..Default::default()
155 })
156 .await
157 }
158
159 pub(crate) async fn resolve_strongly_consistent(self) -> Result<RawVc> {
161 self.resolve_inner(ReadOutputOptions {
162 consistency: ReadConsistency::Strong,
163 ..Default::default()
164 })
165 .await
166 }
167
168 async fn resolve_inner(self, mut options: ReadOutputOptions) -> Result<RawVc> {
169 let tt = turbo_tasks();
170 let mut current = self;
171 loop {
172 match current {
173 RawVc::TaskOutput(task) => {
174 current = read_task_output(&*tt, task, options).await?;
175 options.consistency = ReadConsistency::Eventual;
179 }
180 RawVc::TaskCell(_, _) => return Ok(current),
181 RawVc::LocalOutput(execution_id, local_task_id, ..) => {
182 debug_assert_eq!(options.consistency, ReadConsistency::Eventual);
183 current = read_local_output(&*tt, execution_id, local_task_id).await?;
184 }
185 }
186 }
187 }
188
189 pub(crate) async fn to_non_local(self) -> Result<RawVc> {
192 Ok(match self {
193 RawVc::LocalOutput(execution_id, local_task_id, ..) => {
194 let tt = turbo_tasks();
195 let local_output = read_local_output(&*tt, execution_id, local_task_id).await?;
196 debug_assert!(
197 !matches!(local_output, RawVc::LocalOutput(_, _, _)),
198 "a LocalOutput cannot point at other LocalOutputs"
199 );
200 local_output
201 }
202 non_local => non_local,
203 })
204 }
205
206 pub(crate) fn connect(&self) {
207 let RawVc::TaskOutput(task_id) = self else {
208 panic!("RawVc::connect() must only be called on a RawVc::TaskOutput");
209 };
210 let tt = turbo_tasks();
211 tt.connect_task(*task_id);
212 }
213
214 pub fn try_get_task_id(&self) -> Option<TaskId> {
215 match self {
216 RawVc::TaskOutput(t) | RawVc::TaskCell(t, ..) => Some(*t),
217 RawVc::LocalOutput(..) => None,
218 }
219 }
220
221 pub fn try_get_type_id(&self) -> Option<ValueTypeId> {
222 match self {
223 RawVc::TaskCell(_, CellId { type_id, .. }) => Some(*type_id),
224 RawVc::TaskOutput(..) | RawVc::LocalOutput(..) => None,
225 }
226 }
227
228 pub(crate) fn resolved_has_trait(&self, trait_id: TraitTypeId) -> bool {
231 match self {
232 RawVc::TaskCell(_task_id, cell_id) => {
233 get_value_type(cell_id.type_id).has_trait(&trait_id)
234 }
235 _ => unreachable!("resolved_has_trait must be called with a RawVc::TaskCell"),
236 }
237 }
238
239 pub(crate) fn resolved_is_type(&self, type_id: ValueTypeId) -> bool {
242 match self {
243 RawVc::TaskCell(_task_id, cell_id) => cell_id.type_id == type_id,
244 _ => unreachable!("resolved_is_type must be called with a RawVc::TaskCell"),
245 }
246 }
247}
248
249impl CollectiblesSource for RawVc {
251 fn peek_collectibles<T: VcValueTrait + ?Sized>(self) -> AutoSet<ResolvedVc<T>> {
252 let RawVc::TaskOutput(task_id) = self else {
253 panic!(
254 "<RawVc as CollectiblesSource>::peek_collectibles() must only be called on a \
255 RawVc::TaskOutput"
256 );
257 };
258 let tt = turbo_tasks();
259 let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
260 map.into_iter()
261 .filter_map(|(raw, count)| (count > 0).then_some(raw.try_into().unwrap()))
262 .collect()
263 }
264
265 fn take_collectibles<T: VcValueTrait + ?Sized>(self) -> AutoSet<ResolvedVc<T>> {
266 let RawVc::TaskOutput(task_id) = self else {
267 panic!(
268 "<RawVc as CollectiblesSource>::take_collectibles() must only be called on a \
269 RawVc::TaskOutput"
270 );
271 };
272 let tt = turbo_tasks();
273 let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
274 tt.unemit_collectibles(T::get_trait_type_id(), &map);
275 map.into_iter()
276 .filter_map(|(raw, count)| (count > 0).then_some(raw.try_into().unwrap()))
277 .collect()
278 }
279
280 fn drop_collectibles<T: VcValueTrait + ?Sized>(self) {
281 let RawVc::TaskOutput(task_id) = self else {
282 panic!(
283 "<RawVc as CollectiblesSource>::drop_collectibles() must only be called on a \
284 RawVc::TaskOutput"
285 );
286 };
287 let tt = turbo_tasks();
288 let map = tt.read_task_collectibles(task_id, T::get_trait_type_id());
289 tt.unemit_collectibles(T::get_trait_type_id(), &map);
290 }
291}
292
293pub struct ReadRawVcFuture {
294 current: RawVc,
295 read_output_options: ReadOutputOptions,
296 read_cell_options: ReadCellOptions,
297 is_serializable_cell_content_unknown: bool,
298 listener: Option<EventListener>,
299}
300
301impl ReadRawVcFuture {
302 pub(crate) fn new(vc: RawVc, is_serializable_cell_content: Option<bool>) -> Self {
303 ReadRawVcFuture {
304 current: vc,
305 read_output_options: ReadOutputOptions::default(),
306 read_cell_options: ReadCellOptions {
307 is_serializable_cell_content: is_serializable_cell_content.unwrap_or(false),
308 ..Default::default()
309 },
310 is_serializable_cell_content_unknown: is_serializable_cell_content.is_none(),
311 listener: None,
312 }
313 }
314
315 pub fn strongly_consistent(mut self) -> Self {
317 self.read_output_options.consistency = ReadConsistency::Strong;
318 self
319 }
320
321 pub fn track_with_key(mut self, key: u64) -> Self {
323 self.read_output_options.tracking = ReadTracking::Tracked;
324 self.read_cell_options.tracking = ReadCellTracking::Tracked { key: Some(key) };
325 self
326 }
327
328 pub fn untracked(mut self) -> Self {
334 self.read_output_options.tracking = ReadTracking::TrackOnlyError;
335 self.read_cell_options.tracking = ReadCellTracking::TrackOnlyError;
336 self
337 }
338
339 pub fn final_read_hint(mut self) -> Self {
341 self.read_cell_options.final_read_hint = true;
342 self
343 }
344}
345
346impl Future for ReadRawVcFuture {
347 type Output = Result<TypedCellContent>;
348
349 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
350 with_turbo_tasks(|tt| {
351 let this = unsafe { self.get_unchecked_mut() };
353 'outer: loop {
354 if let Some(listener) = &mut this.listener {
355 let listener = unsafe { Pin::new_unchecked(listener) };
357 if listener.poll(cx).is_pending() {
358 return Poll::Pending;
359 }
360 this.listener = None;
361 }
362 let mut listener = match this.current {
363 RawVc::TaskOutput(task) => {
364 let read_result = tt.try_read_task_output(task, this.read_output_options);
365 match read_result {
366 Ok(Ok(vc)) => {
367 this.read_output_options.consistency = ReadConsistency::Eventual;
371 this.current = vc;
372 continue 'outer;
373 }
374 Ok(Err(listener)) => listener,
375 Err(err) => return Poll::Ready(Err(err)),
376 }
377 }
378 RawVc::TaskCell(task, index) => {
379 if this.is_serializable_cell_content_unknown {
380 let value_type = registry::get_value_type(index.type_id);
381 this.read_cell_options.is_serializable_cell_content =
382 value_type.bincode.is_some();
383 }
384 let read_result =
385 tt.try_read_task_cell(task, index, this.read_cell_options);
386 match read_result {
387 Ok(Ok(content)) => {
388 return Poll::Ready(Ok(content));
390 }
391 Ok(Err(listener)) => listener,
392 Err(err) => return Poll::Ready(Err(err)),
393 }
394 }
395 RawVc::LocalOutput(execution_id, local_output_id, ..) => {
396 debug_assert_eq!(
397 this.read_output_options.consistency,
398 ReadConsistency::Eventual
399 );
400 let read_result = tt.try_read_local_output(execution_id, local_output_id);
401 match read_result {
402 Ok(Ok(vc)) => {
403 this.current = vc;
404 continue 'outer;
405 }
406 Ok(Err(listener)) => listener,
407 Err(err) => return Poll::Ready(Err(err)),
408 }
409 }
410 };
411 match unsafe { Pin::new_unchecked(&mut listener) }.poll(cx) {
413 Poll::Ready(_) => continue,
414 Poll::Pending => {
415 this.listener = Some(listener);
416 return Poll::Pending;
417 }
418 };
419 }
420 })
421 }
422}
423
424impl Unpin for ReadRawVcFuture {}