turbo_tasks/effect.rs
1use std::{
2 collections::hash_map,
3 error::Error as StdError,
4 future::Future,
5 mem::{forget, replace},
6 sync::Arc,
7};
8
9use anyhow::Result;
10use async_trait::async_trait;
11use futures::{StreamExt, TryStreamExt};
12use parking_lot::{Mutex, MutexGuard};
13use rustc_hash::FxHashMap;
14use tracing::Instrument;
15
16use crate::{
17 self as turbo_tasks, CollectiblesSource, NonLocalValue, OperationVc, ReadRef, ResolvedVc,
18 TryJoinIterExt, Upcast, VcRead, VcValueType, emit,
19 event::Event,
20 invalidation::{Invalidator, get_invalidator},
21 manager::{
22 debug_assert_in_top_level_task, debug_assert_not_in_top_level_task, mark_top_level_task,
23 unmark_top_level_task_may_leak_eventually_consistent_state, with_turbo_tasks,
24 },
25 spawn,
26 trace::TraceRawVcs,
27};
28
29const APPLY_EFFECTS_CONCURRENCY_LIMIT: usize = 1024;
30
31/// An IO Side effect to be computed by turbo tasks and then executed outside of turbo tasks.
32#[async_trait]
33#[turbo_tasks::value_trait]
34pub trait Effect {
35 /// Read any Vc data needed for `apply()` and return the [`CapturedEffect`] that performs it.
36 ///
37 /// An implementation may elect to elide capturing data if the `EffectStateStorage` state is
38 /// already up to date.
39 async fn capture(&self) -> Result<Box<dyn CapturedEffect>>;
40}
41
42pub trait EffectExt {
43 fn emit(self);
44}
45
46impl<T> EffectExt for ResolvedVc<T>
47where
48 T: Upcast<Box<dyn Effect>>,
49{
50 fn emit(self) {
51 emit::<Box<dyn Effect>>(ResolvedVc::upcast_non_strict(self));
52 }
53}
54
55/// Post-capture effect. Holds data needed to perform the actual side effect in a top level context.
56///
57/// `apply()` is responsible for coordinating with [`EffectStateStorage`] via
58/// [`EffectStateStorage::run_apply`] (which handles the per-key state machine, in-progress
59/// coordination, dedup-hit short-circuit, and panic recovery).
60#[async_trait]
61pub trait CapturedEffect: TraceRawVcs + NonLocalValue + Send + Sync + 'static {
62 /// Unique key identifying this effect's target (e.g., absolute path bytes).
63 fn key(&self) -> Box<[u8]>;
64
65 /// Extract the hash of the value part of this effect for comparison.
66 fn value_hash(&self) -> u128;
67
68 /// Perform the side effect
69 ///
70 /// Implementations typically dispatch into [`EffectStateStorage::run_apply`].
71 async fn apply(&self) -> Result<(), ApplyError>;
72}
73
74/// Outcome of [`CapturedEffect::apply`]. Distinguishes a side-effect failure (terminal) from a
75/// soft failure where the captured form had no content and storage state diverged between
76/// capture and apply (recoverable via [`Effects::apply`]'s invalidator path).
77#[derive(Debug)]
78pub enum ApplyError {
79 /// The side effect itself failed.
80 Failed(Arc<dyn EffectError>),
81 /// Capture short-circuited content materialization (observed `Applied { matching }` in
82 /// storage), but by apply time the storage state had diverged and we have no content to
83 /// re-apply. [`Effects::apply`] should invalidate the producing operation and return
84 /// [`EffectsError::Retry`].
85 Retry,
86}
87
88/// The error type that an effect can return. We use `dyn std::error::Error` (instead of
89/// [`anyhow::Error`] or [`SharedError`]) to encourage use of structured error types that can
90/// potentially be transformed into `Issue`s.
91///
92/// We can't require that the returned error implements `Issue`:
93/// - `Issue` uses `FileSystemPath`
94/// - `turbo-tasks-fs` returns effect errors that should be transformed into `Issue`s.
95/// - It logically doesn't make sense to define `Issue` in `turbo-tasks-fs`, `Issue` can't be
96/// defined in a base crate either because it would form a circular crate dependency.
97///
98/// So instead, we leave it up to the caller to figure out how to downcast these errors themselves.
99///
100/// [`SharedError`]: crate::util::SharedError
101pub trait EffectError: StdError + TraceRawVcs + NonLocalValue + Send + Sync + 'static {}
102impl<T> EffectError for T where T: StdError + TraceRawVcs + NonLocalValue + Send + Sync + 'static {}
103
104enum EffectLastApplied {
105 Unapplied,
106 InProgress {
107 write_event: Event,
108 },
109 Applied {
110 value_hash: u128,
111 result: Result<(), Arc<dyn EffectError>>,
112 },
113}
114
115/// Per-key entry in the effect state storage.
116type EffectStateEntry = Arc<Mutex<EffectLastApplied>>;
117/// Shared state storage for tracking applied effects. Stored on the filesystem implementation
118/// (e.g. DiskFileSystemInner).
119#[derive(Default)]
120pub struct EffectStateStorage {
121 effect_state: Mutex<FxHashMap<Box<[u8]>, EffectStateEntry>>,
122}
123
124impl EffectStateStorage {
125 /// Returns true if the per-key state holds `Applied { value_hash == target, result: Ok(()) }`.
126 ///
127 /// Intended for use by [`Effect::capture`] to elide content materialization when the apply
128 /// would dedup. Reading this from inside a turbo-tasks task is sound because
129 /// [`Effects::apply`] re-checks at apply time and fires the producing task's invalidator on
130 /// mismatch (via the [`ApplyError::Retry`] / [`EffectsError::Retry`] pathway).
131 pub fn matches_applied(&self, key: &[u8], target: u128) -> bool {
132 let entry = self.effect_state.lock().get(key).cloned();
133 let Some(entry) = entry else { return false };
134 matches!(
135 &*entry.lock(),
136 EffectLastApplied::Applied {
137 value_hash,
138 result: Ok(()),
139 } if *value_hash == target,
140 )
141 }
142
143 /// Look up or create the per-key state entry.
144 fn entry_for(&self, key: Box<[u8]>) -> EffectStateEntry {
145 self.effect_state
146 .lock()
147 .entry(key)
148 .or_insert_with(|| Arc::new(Mutex::new(EffectLastApplied::Unapplied)))
149 .clone()
150 }
151
152 /// Coordinate an apply for `(key, value_hash)` against the per-key state machine.
153 ///
154 /// Dedup hits (state already `Applied` with a matching hash) return the cached result without
155 /// running `body`. Otherwise `body` runs once under an `InProgress` guard and the result is
156 /// stored. A `None` `body` (capture elided content because storage matched, but it no longer
157 /// does) yields [`ApplyError::Retry`].
158 pub async fn run_apply<E, F, Fut>(
159 &self,
160 key: Box<[u8]>,
161 value_hash: u128,
162 body: Option<F>,
163 ) -> Result<(), ApplyError>
164 where
165 E: EffectError,
166 F: FnOnce() -> Fut + Send,
167 Fut: Future<Output = Result<(), E>> + Send,
168 {
169 let entry = self.entry_for(key);
170
171 // If `body` panics or the future is dropped before completion, the guard's drop impl
172 // resets the per-key state to `Unapplied` and notifies other waiters via the `Event` it
173 // recovers from the previous `InProgress`, so they retry rather than deadlock or observe
174 // a stale "panic" cache entry.
175 struct EventGuard<'a> {
176 entry: &'a EffectStateEntry,
177 }
178 impl Drop for EventGuard<'_> {
179 fn drop(&mut self) {
180 let prev_state = replace(&mut *self.entry.lock(), EffectLastApplied::Unapplied);
181 let EffectLastApplied::InProgress { write_event } = prev_state else {
182 unreachable!("EventGuard: prev_state must be InProgress");
183 };
184 write_event.notify(usize::MAX);
185 }
186 }
187
188 let begin_in_progress = |mut last_applied_guard: MutexGuard<'_, _>| {
189 *last_applied_guard = EffectLastApplied::InProgress {
190 write_event: Event::new(|| || "effect application in progress".to_string()),
191 };
192 EventGuard { entry: &entry }
193 };
194
195 let event_guard = loop {
196 let listener;
197 {
198 let last_applied_guard = entry.lock();
199 match &*last_applied_guard {
200 EffectLastApplied::Unapplied => {
201 break begin_in_progress(last_applied_guard);
202 }
203 EffectLastApplied::Applied {
204 value_hash: stored,
205 result,
206 } => {
207 if value_hash == *stored {
208 return result.clone().map_err(ApplyError::Failed);
209 } else {
210 break begin_in_progress(last_applied_guard);
211 }
212 }
213 EffectLastApplied::InProgress { write_event } => {
214 // Event::listen registers the listener immediately, so notifications
215 // fired after we drop last_applied_guard cannot be missed.
216 listener = write_event.listen();
217 }
218 }
219 };
220 listener.await;
221 };
222
223 // We hold the InProgress guard. Either run the body, or — if we have no content to
224 // apply — release the guard (resetting state to Unapplied + waking waiters) and Retry.
225 let Some(body) = body else {
226 drop(event_guard);
227 return Err(ApplyError::Retry);
228 };
229
230 // Erase the body's concrete error type to `Arc<dyn EffectError>` so the cached result
231 // type is uniform across all callers of the same key.
232 let effect_result: Result<(), Arc<dyn EffectError>> = body()
233 .await
234 .map_err(|err| Arc::new(err) as Arc<dyn EffectError>);
235
236 let prev_state = replace(
237 &mut *entry.lock(),
238 EffectLastApplied::Applied {
239 value_hash,
240 result: effect_result.clone(),
241 },
242 );
243 forget(event_guard);
244
245 let EffectLastApplied::InProgress { write_event } = prev_state else {
246 unreachable!("Effect applied: prev_state must be InProgress");
247 };
248 write_event.notify(usize::MAX);
249
250 effect_result.map_err(ApplyError::Failed)
251 }
252}
253
254/// Capture effects. Call this from within a [turbo-tasks operation][crate::OperationVc].
255///
256/// Collectibles are read from `ResolvedVc`s, so this function, and the return value of this
257/// function should be applied with [`Effects::apply`].
258///
259/// It's important to wrap calls to this function in an [operation with a strongly consistent
260/// read][crate::OperationVc::read_strongly_consistent] before applying the effects outside of the
261/// operation at the top-level (e.g. in a `run_once` closure) with [`Effects::apply`].
262///
263/// # Example
264///
265/// ```rust
266/// # #![feature(arbitrary_self_types_pointers)]
267/// #
268/// # use anyhow::Result;
269/// # use turbo_tasks::{Effects, ReadRef, Vc, run_once, take_effects};
270/// #
271/// # async fn _wrapper() -> Result<()> {
272/// # type Example = ();
273/// # type Args = ();
274/// # let args = ();
275/// # #[turbo_tasks::function(operation)]
276/// # fn some_turbo_tasks_operation(_args: Args) {}
277/// #
278/// #[turbo_tasks::value(serialization = "skip")]
279/// struct OutputWithEffects {
280/// output: ReadRef<Example>,
281/// effects: Effects,
282/// }
283///
284/// // ensure the return value and the collectibles match by using a single operation for both
285/// #[turbo_tasks::function(operation)]
286/// async fn some_turbo_tasks_operation_with_effects(args: Args) -> Result<Vc<OutputWithEffects>> {
287/// let operation = some_turbo_tasks_operation(args);
288/// // we must first read the operation to populate the collectibles
289/// let output = operation.connect().await?;
290/// // read the effects from the collectibles
291/// let effects = take_effects(operation).await?;
292/// Ok(OutputWithEffects { output, effects }.cell())
293/// }
294///
295/// // every operation must be read with strong consistency at the top-level
296/// let result_with_effects = some_turbo_tasks_operation_with_effects(args)
297/// .read_strongly_consistent()
298/// .await?;
299///
300/// // apply the effects once outside of a turbo_tasks::function at the top-level (e.g. `run_once`)
301/// result_with_effects.effects.apply().await?;
302/// # Ok(())
303/// # }
304/// ```
305pub async fn take_effects(source: impl CollectiblesSource) -> Result<Effects> {
306 debug_assert_not_in_top_level_task("take_effects");
307 let effects = source.take_collectibles::<Box<dyn Effect>>();
308
309 let captured: Vec<Box<dyn CapturedEffect>> = effects
310 .into_iter()
311 .map(async |effect_vc| effect_vc.into_trait_ref().await?.capture().await)
312 .try_join()
313 .await?;
314
315 // detect duplicate keys
316 let unique_keys = build_unique_keys(&captured);
317
318 let invalidator = get_invalidator()
319 .expect("take_effects must be called from within a turbo-tasks task context");
320
321 Ok(Effects::new(captured, unique_keys, invalidator))
322}
323
324#[derive(thiserror::Error, Debug, TraceRawVcs, NonLocalValue)]
325#[error("Conflicting effects for the same key (key length: {key_len} bytes)")]
326struct ConflictingEffectError {
327 key_len: usize,
328}
329
330const MAX_KEYS_TO_DISPLAY: usize = 10;
331/// Error returned by [`Effects::apply`]. Callers should retry on `Retry`; everything else is
332/// terminal.
333#[derive(thiserror::Error, Debug, Clone)]
334pub enum EffectsError {
335 /// A side effect failed during apply. Holds the first error encountered.
336 #[error(transparent)]
337 Apply(Arc<dyn EffectError>),
338
339 #[error("conflicting effects for the same key (key length: {0} bytes)")]
340 Conflict(usize),
341
342 #[error(
343 "effect state diverged before apply for {}{}; producing task invalidated, retry required",
344 keys.iter().take(MAX_KEYS_TO_DISPLAY).cloned().collect::<Vec<_>>().join(", "),
345 if keys.len() > MAX_KEYS_TO_DISPLAY { format!(", ... ({} more)", keys.len() - MAX_KEYS_TO_DISPLAY) } else { String::new() }
346 )]
347 Retry { keys: Vec<String> },
348}
349
350impl From<Arc<dyn EffectError>> for EffectsError {
351 fn from(err: Arc<dyn EffectError>) -> Self {
352 EffectsError::Apply(err)
353 }
354}
355
356/// Dedup'd indices into the captured Vec — one entry per unique key. Computed eagerly in
357/// [`take_effects`] purely from the captured effects (no [`EffectStateStorage`] interaction);
358/// the apply-side state machine in [`EffectStateStorage::run_apply`] handles per-key hash dedup.
359type UniqueKeys = Result<Vec<usize>, Arc<ConflictingEffectError>>;
360
361/// Slice of captured effects, individually Arc'd. Each effect is `Arc<dyn CapturedEffect>`
362/// so callers can cheaply clone a Send handle out across `.await` boundaries without holding
363/// the outer mutex.
364type CapturedSlice = Arc<[Arc<dyn CapturedEffect>]>;
365
366/// Captured effects from an operation. This struct can be used to return Effects from a turbo-tasks
367/// function and apply them later.
368///
369/// # Cell semantics
370///
371/// `Effects` uses `cell = "new"`: every producer re-execution allocates a fresh cell value and
372/// the prior cell is dropped. Cell-level dedup of `Effects` is given up; per-key dedup at apply
373/// time is provided by [`EffectStateStorage`]'s state machine (see
374/// [`EffectStateStorage::run_apply`]), which short-circuits when storage already holds
375/// `Applied { value_hash }` matching the new hash.
376///
377/// `Effects::apply` is idempotent and safe to call multiple times on the same value — the state
378/// machine in `run_apply` ensures each underlying side effect runs at most once per stored
379/// `(key, value_hash)` pair across all callers.
380#[turbo_tasks::value(shared, eq = "manual", serialization = "skip", cell = "new")]
381pub struct Effects {
382 /// Pre-resolved effects awaiting application. Lives for the lifetime of the cell — released
383 /// when the producer reruns and `cell = "new"` overwrites the cell, which is when any
384 /// upstream `ReadRef` strong-count cascades are naturally released.
385 #[turbo_tasks(debug_ignore, trace_ignore)]
386 captured: CapturedSlice,
387 /// Captured at `take_effects` time. `None` for `Effects::empty()` (nothing to retry).
388 #[turbo_tasks(debug_ignore, trace_ignore)]
389 invalidator: Option<Invalidator>,
390 /// Unique key info computed eagerly in `take_effects`. Holds one index into `captured` per
391 /// unique key, or a `ConflictingEffectError` if two captured effects share a key with
392 /// different hashes. No [`EffectStateStorage`] interaction here — that is deferred to
393 /// `apply()`.
394 #[turbo_tasks(debug_ignore, trace_ignore)]
395 unique_keys: Arc<UniqueKeys>,
396}
397
398/// `PartialEq`/`Eq` are compat shims so containing structs (which derive `PartialEq`/`Eq` via
399/// `turbo_tasks::value`) can still embed `Effects`. The actual cell-update strategy for `Effects`
400/// itself is `cell = "new"` — see the doc-comment above — so this `PartialEq` is not consulted
401/// for `Effects` cells. We always return `false` to match `cell = "new"` semantics for the
402/// wrapper structs (they should also refresh on every producer run).
403impl PartialEq for Effects {
404 fn eq(&self, _other: &Self) -> bool {
405 false
406 }
407}
408impl Eq for Effects {}
409
410impl Effects {
411 /// A test-only placeholder `Effects` value with no effects (and no producer to invalidate).
412 #[cfg(test)]
413 fn empty() -> Self {
414 Self {
415 captured: Arc::from(Vec::new()),
416 invalidator: None,
417 unique_keys: Arc::new(Ok(Vec::new())),
418 }
419 }
420
421 fn new(
422 captured: Vec<Box<dyn CapturedEffect>>,
423 unique_keys: UniqueKeys,
424 invalidator: Invalidator,
425 ) -> Self {
426 // Convert Box<dyn> into Arc<dyn> per slot. Each Arc is independently Send/Sync.
427 let captured: CapturedSlice = captured
428 .into_iter()
429 .map(Arc::<dyn CapturedEffect>::from)
430 .collect();
431 Self {
432 captured,
433 invalidator: Some(invalidator),
434 unique_keys: Arc::new(unique_keys),
435 }
436 }
437
438 /// Applies all effects that have been captured.
439 ///
440 /// Dispatch goes through each captured effect's [`CapturedEffect::apply`] (via
441 /// [`EffectStateStorage::run_apply`]) which handles the per-key state machine, dedup hits,
442 /// in-progress coordination, and panic recovery. The dispatch is idempotent — calling
443 /// `apply()` multiple times on the same `Effects` value runs each underlying side effect at
444 /// most once per stored `(key, value_hash)` pair.
445 ///
446 /// If any captured effect signals [`ApplyError::Retry`] (its content was elided at capture
447 /// time and storage state diverged between capture and apply), the producing task is
448 /// invalidated and [`EffectsError::Retry`] is returned after the remaining keys finish.
449 /// Side-effect failures (`ApplyError::Failed`) propagate as [`EffectsError::Apply`]; the
450 /// first such error wins.
451 ///
452 /// `apply` must only be used in a "top-level" task (e.g. [`run_once`][crate::run_once]),
453 /// after [`take_effects`] is called from an [operation read with strong
454 /// consistency][crate::OperationVc::read_strongly_consistent].
455 ///
456 /// See [`take_effects`] for example usage.
457 ///
458 /// **Do not call this directly.** External callers must go through
459 /// [`read_strongly_consistent_and_apply_effects`] or
460 /// [`resolve_strongly_consistent_and_take_and_apply_effects`], which own the read+apply+retry
461 /// loop required to recover from [`EffectsError::Retry`]. Exposed publicly only as
462 /// [`Effects::apply_for_testing`] (`#[doc(hidden)]`) so integration tests can drive the apply
463 /// state machine directly.
464 async fn apply(&self) -> Result<(), EffectsError> {
465 debug_assert_in_top_level_task(
466 "Effects::apply must be called from a top-level task to avoid unintended \
467 re-executions due to eventual consistency",
468 );
469 let unique = match self.unique_keys.as_ref() {
470 Ok(unique) => unique.as_slice(),
471 Err(err) => return Err(EffectsError::Conflict(err.key_len)),
472 };
473 if unique.is_empty() {
474 return Ok(());
475 }
476
477 let span = tracing::info_span!("apply effects", count = unique.len());
478 let captured = &self.captured;
479
480 async {
481 // Collect the keys of any effects that signaled `Retry` across the parallel apply so
482 // we invalidate at most once at the end of the batch and can report which outputs
483 // forced the retry. `Apply` errors still take precedence — they fail-fast through the
484 // `try_for_each_concurrent`.
485 let retry_keys = Mutex::new(Vec::<String>::new());
486 let result: Result<(), EffectsError> = futures::stream::iter(unique.iter())
487 .map(Ok::<_, EffectsError>)
488 .try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |idx| {
489 // Run each apply on its own spawned task so that pending effects execute in
490 // parallel rather than serially on this future (see #94140).
491 let effect = captured[*idx].clone();
492 match spawn(async move { effect.apply().await }).await {
493 Ok(()) => Ok(()),
494 Err(ApplyError::Failed(err)) => Err(EffectsError::Apply(err)),
495 Err(ApplyError::Retry) => {
496 let key = captured[*idx].key();
497 retry_keys
498 .lock()
499 .push(String::from_utf8_lossy(&key).into_owned());
500 Ok(())
501 }
502 }
503 })
504 .await;
505
506 match result {
507 Err(e) => Err(e),
508 Ok(()) => {
509 let retry_keys = retry_keys.into_inner();
510 if retry_keys.is_empty() {
511 Ok(())
512 } else {
513 self.signal_retry(retry_keys)
514 }
515 }
516 }
517 }
518 .instrument(span)
519 .await
520 }
521
522 /// Test-only public alias for [`Effects::apply`]. Lets integration tests in other crates drive
523 /// the per-key apply state machine directly (e.g. asserting dedup counts or the raw
524 /// [`EffectsError::Retry`] signal). Production code must use
525 /// [`read_strongly_consistent_and_apply_effects`] instead, which owns the retry loop.
526 #[doc(hidden)]
527 pub async fn apply_for_testing(&self) -> Result<(), EffectsError> {
528 self.apply().await
529 }
530
531 /// Invalidate the producing task (if any) and return [`EffectsError::Retry`] carrying the
532 /// `keys` that signaled [`ApplyError::Retry`] (their capture elided content materialization but
533 /// storage state diverged before apply).
534 fn signal_retry(&self, keys: Vec<String>) -> Result<(), EffectsError> {
535 if let Some(invalidator) = self.invalidator {
536 with_turbo_tasks(|tt| invalidator.invalidate(&**tt));
537 }
538 Err(EffectsError::Retry { keys })
539 }
540}
541
542/// Strongly-consistent read of `op`, then apply its effects, retrying the whole read+apply on
543/// [`EffectsError::Retry`].
544///
545/// `get_effects` extracts the [`Effects`] from the read value (for a wrapper struct this is
546/// `|v| &v.effects`; for an `OperationVc<Effects>` it is `|e| e`).
547///
548/// On [`EffectsError::Retry`] the producing operation has already been invalidated by
549/// [`Effects::apply`], so the next
550/// [`read_strongly_consistent`][OperationVc::read_strongly_consistent] re-runs the producer and
551/// yields a fresh [`Effects`] whose `capture()` re-materializes content. Retries are bounded to
552/// avoid livelock when two producers perpetually stomp the same key; after the first retry a
553/// warning is logged on each subsequent attempt, and on exhaustion the last `Retry` surfaces as an
554/// error.
555///
556/// This is one of two public entry points for applying effects (see also
557/// [`read_strongly_consistent_and_apply_effects_with`]) — [`Effects::apply`] is private so the
558/// retry contract cannot be bypassed.
559pub async fn read_strongly_consistent_and_apply_effects<T, F>(
560 op: OperationVc<T>,
561 get_effects: F,
562) -> Result<ReadRef<T>>
563where
564 T: VcValueType,
565 F: Fn(&<<T as VcValueType>::Read as VcRead<T>>::Target) -> &Effects,
566{
567 let mut attempts = 0usize;
568 loop {
569 let value = op.read_strongly_consistent().await?;
570 // Deref the `ReadRef<T>` to the read target (`T` for non-transparent types).
571 let effects = get_effects(&*value);
572 match effects.apply().await {
573 Ok(()) => return Ok(value),
574 Err(e) => handle_apply_retry(e, &mut attempts)?,
575 }
576 }
577}
578
579/// AVOID CALLING THIS UNLESS DEEPLY REQUIRED
580///
581/// Like [`read_strongly_consistent_and_apply_effects`], but the [`Effects`] directly accessed by
582/// calling [`take_effects`] on the supplied operation.
583///
584/// Unlike [`read_strongly_consistent_and_apply_effects`], this may be called from *inside* a
585/// turbo-tasks task (it owns the `mark`/`unmark` around `apply`). The consequence is that the
586/// effects may be re-applied if that enclosing task is invalidated — acceptable for lazily-created
587/// resources.
588pub async fn resolve_strongly_consistent_and_take_and_apply_effects<T>(
589 op: OperationVc<T>,
590) -> Result<ResolvedVc<T>>
591where
592 T: VcValueType,
593{
594 let mut attempts = 0usize;
595 loop {
596 let value = op.resolve().strongly_consistent().await?;
597 // Run the callback while *not* marked top-level so it can `take_effects` / read Vcs.
598
599 let effects = take_effects(op).await?;
600 // `Effects::apply` asserts it runs at the top-level. Mark only around the apply, then
601 // unmark so any further work (including the next loop iteration's read) is unaffected.
602 mark_top_level_task();
603 let result = effects.apply().await;
604 unmark_top_level_task_may_leak_eventually_consistent_state();
605 match result {
606 Ok(()) => return Ok(value),
607 Err(e) => handle_apply_retry(e, &mut attempts)?,
608 }
609 }
610}
611
612/// Shared retry-decision for the two `read_strongly_consistent_and_apply_effects*` helpers.
613///
614/// Returns `Ok(())` to signal the caller should retry the read+apply loop (bounded by
615/// `MAX_RETRIES`). Returns `Err` for terminal outcomes: a non-`Retry` error, or `Retry` after the
616/// retry budget is exhausted.
617fn handle_apply_retry(err: EffectsError, attempts: &mut usize) -> Result<()> {
618 const MAX_RETRIES: usize = 4; // chosen by a fair dice roll
619 match err {
620 EffectsError::Retry { keys } if *attempts < MAX_RETRIES => {
621 *attempts += 1;
622 // Warn on every retry after the first.
623 if *attempts > 1 {
624 tracing::warn!(
625 attempts = *attempts,
626 ?keys,
627 "retrying effect application; this implies multiple routes are fighting to \
628 write one of these files",
629 );
630 }
631 Ok(())
632 }
633 EffectsError::Retry { keys } => anyhow::bail!(
634 "gave up applying effects after {MAX_RETRIES} retries; repeated effect-state \
635 divergence on: {keys:?}. This implies multiple routes are fighting to write one of \
636 these files."
637 ),
638 e => Err(e.into()),
639 }
640}
641
642/// Build the deduped per-key indices into the captured slice. Detects per-key value-hash
643/// conflicts. This is the eager half of effect deduplication — it inspects only the captured
644/// effects themselves (no [`EffectStateStorage`] interaction) and is therefore safe to call
645/// from inside a turbo-tasks task in [`take_effects`].
646fn build_unique_keys(captured: &[Box<dyn CapturedEffect>]) -> UniqueKeys {
647 let mut by_key: FxHashMap<Box<[u8]>, usize> = FxHashMap::default();
648 for (idx, effect) in captured.iter().enumerate() {
649 match by_key.entry(effect.key()) {
650 hash_map::Entry::Vacant(entry) => {
651 entry.insert(idx);
652 }
653 hash_map::Entry::Occupied(entry) => {
654 if captured[*entry.get()].value_hash() != effect.value_hash() {
655 return Err(Arc::new(ConflictingEffectError {
656 key_len: entry.key().len(),
657 }));
658 }
659 }
660 }
661 }
662
663 let mut keys: Vec<usize> = by_key.into_values().collect();
664 // Sort by idx so the order is deterministic — useful for stable tracing/logging.
665 keys.sort_unstable();
666 Ok(keys)
667}
668
669#[cfg(test)]
670mod tests {
671 use crate::{CollectiblesSource, Effects, take_effects};
672
673 #[test]
674 #[allow(dead_code)]
675 fn is_send() {
676 fn assert_send<T: Send>(_: T) {}
677 fn check_effects_apply() {
678 assert_send(Effects::empty().apply());
679 }
680 fn check_take_effects<T: CollectiblesSource + Send + Sync>(t: T) {
681 assert_send(take_effects(t));
682 }
683 }
684}