Skip to main content

turbo_tasks/
effect.rs

1use std::{
2    collections::hash_map,
3    error::Error as StdError,
4    future::Future,
5    mem::{forget, replace},
6    sync::Arc,
7};
8
9use anyhow::Result;
10use async_trait::async_trait;
11use futures::{StreamExt, TryStreamExt};
12use parking_lot::{Mutex, MutexGuard};
13use rustc_hash::FxHashMap;
14use tracing::Instrument;
15
16use crate::{
17    self as turbo_tasks, CollectiblesSource, NonLocalValue, OperationVc, ReadRef, ResolvedVc,
18    TryJoinIterExt, Upcast, VcRead, VcValueType, emit,
19    event::Event,
20    invalidation::{Invalidator, get_invalidator},
21    manager::{
22        debug_assert_in_top_level_task, debug_assert_not_in_top_level_task, mark_top_level_task,
23        unmark_top_level_task_may_leak_eventually_consistent_state, with_turbo_tasks,
24    },
25    spawn,
26    trace::TraceRawVcs,
27};
28
29const APPLY_EFFECTS_CONCURRENCY_LIMIT: usize = 1024;
30
31/// An IO Side effect to be computed by turbo tasks and then executed outside of turbo tasks.
32#[async_trait]
33#[turbo_tasks::value_trait]
34pub trait Effect {
35    /// Read any Vc data needed for `apply()` and return the [`CapturedEffect`] that performs it.
36    ///
37    /// An implementation may elect to elide capturing data if the `EffectStateStorage` state is
38    /// already up to date.
39    async fn capture(&self) -> Result<Box<dyn CapturedEffect>>;
40}
41
42pub trait EffectExt {
43    fn emit(self);
44}
45
46impl<T> EffectExt for ResolvedVc<T>
47where
48    T: Upcast<Box<dyn Effect>>,
49{
50    fn emit(self) {
51        emit::<Box<dyn Effect>>(ResolvedVc::upcast_non_strict(self));
52    }
53}
54
55/// Post-capture effect. Holds data needed to perform the actual side effect in a top level context.
56///
57/// `apply()` is responsible for coordinating with [`EffectStateStorage`] via
58/// [`EffectStateStorage::run_apply`] (which handles the per-key state machine, in-progress
59/// coordination, dedup-hit short-circuit, and panic recovery).
60#[async_trait]
61pub trait CapturedEffect: TraceRawVcs + NonLocalValue + Send + Sync + 'static {
62    /// Unique key identifying this effect's target (e.g., absolute path bytes).
63    fn key(&self) -> Box<[u8]>;
64
65    /// Extract the hash of the value part of this effect for comparison.
66    fn value_hash(&self) -> u128;
67
68    /// Perform the side effect
69    ///
70    /// Implementations typically dispatch into [`EffectStateStorage::run_apply`].
71    async fn apply(&self) -> Result<(), ApplyError>;
72}
73
74/// Outcome of [`CapturedEffect::apply`]. Distinguishes a side-effect failure (terminal) from a
75/// soft failure where the captured form had no content and storage state diverged between
76/// capture and apply (recoverable via [`Effects::apply`]'s invalidator path).
77#[derive(Debug)]
78pub enum ApplyError {
79    /// The side effect itself failed.
80    Failed(Arc<dyn EffectError>),
81    /// Capture short-circuited content materialization (observed `Applied { matching }` in
82    /// storage), but by apply time the storage state had diverged and we have no content to
83    /// re-apply. [`Effects::apply`] should invalidate the producing operation and return
84    /// [`EffectsError::Retry`].
85    Retry,
86}
87
88/// The error type that an effect can return. We use `dyn std::error::Error` (instead of
89/// [`anyhow::Error`] or [`SharedError`]) to encourage use of structured error types that can
90/// potentially be transformed into `Issue`s.
91///
92/// We can't require that the returned error implements `Issue`:
93/// - `Issue` uses `FileSystemPath`
94/// - `turbo-tasks-fs` returns effect errors that should be transformed into `Issue`s.
95/// - It logically doesn't make sense to define `Issue` in `turbo-tasks-fs`, `Issue` can't be
96///   defined in a base crate either because it would form a circular crate dependency.
97///
98/// So instead, we leave it up to the caller to figure out how to downcast these errors themselves.
99///
100/// [`SharedError`]: crate::util::SharedError
101pub trait EffectError: StdError + TraceRawVcs + NonLocalValue + Send + Sync + 'static {}
102impl<T> EffectError for T where T: StdError + TraceRawVcs + NonLocalValue + Send + Sync + 'static {}
103
104enum EffectLastApplied {
105    Unapplied,
106    InProgress {
107        write_event: Event,
108    },
109    Applied {
110        value_hash: u128,
111        result: Result<(), Arc<dyn EffectError>>,
112    },
113}
114
115/// Per-key entry in the effect state storage.
116type EffectStateEntry = Arc<Mutex<EffectLastApplied>>;
117/// Shared state storage for tracking applied effects. Stored on the filesystem implementation
118/// (e.g. DiskFileSystemInner).
119#[derive(Default)]
120pub struct EffectStateStorage {
121    effect_state: Mutex<FxHashMap<Box<[u8]>, EffectStateEntry>>,
122}
123
124impl EffectStateStorage {
125    /// Returns true if the per-key state holds `Applied { value_hash == target, result: Ok(()) }`.
126    ///
127    /// Intended for use by [`Effect::capture`] to elide content materialization when the apply
128    /// would dedup. Reading this from inside a turbo-tasks task is sound because
129    /// [`Effects::apply`] re-checks at apply time and fires the producing task's invalidator on
130    /// mismatch (via the [`ApplyError::Retry`] / [`EffectsError::Retry`] pathway).
131    pub fn matches_applied(&self, key: &[u8], target: u128) -> bool {
132        let entry = self.effect_state.lock().get(key).cloned();
133        let Some(entry) = entry else { return false };
134        matches!(
135            &*entry.lock(),
136            EffectLastApplied::Applied {
137                value_hash,
138                result: Ok(()),
139            } if *value_hash == target,
140        )
141    }
142
143    /// Look up or create the per-key state entry.
144    fn entry_for(&self, key: Box<[u8]>) -> EffectStateEntry {
145        self.effect_state
146            .lock()
147            .entry(key)
148            .or_insert_with(|| Arc::new(Mutex::new(EffectLastApplied::Unapplied)))
149            .clone()
150    }
151
152    /// Coordinate an apply for `(key, value_hash)` against the per-key state machine.
153    ///
154    /// Dedup hits (state already `Applied` with a matching hash) return the cached result without
155    /// running `body`. Otherwise `body` runs once under an `InProgress` guard and the result is
156    /// stored. A `None` `body` (capture elided content because storage matched, but it no longer
157    /// does) yields [`ApplyError::Retry`].
158    pub async fn run_apply<E, F, Fut>(
159        &self,
160        key: Box<[u8]>,
161        value_hash: u128,
162        body: Option<F>,
163    ) -> Result<(), ApplyError>
164    where
165        E: EffectError,
166        F: FnOnce() -> Fut + Send,
167        Fut: Future<Output = Result<(), E>> + Send,
168    {
169        let entry = self.entry_for(key);
170
171        // If `body` panics or the future is dropped before completion, the guard's drop impl
172        // resets the per-key state to `Unapplied` and notifies other waiters via the `Event` it
173        // recovers from the previous `InProgress`, so they retry rather than deadlock or observe
174        // a stale "panic" cache entry.
175        struct EventGuard<'a> {
176            entry: &'a EffectStateEntry,
177        }
178        impl Drop for EventGuard<'_> {
179            fn drop(&mut self) {
180                let prev_state = replace(&mut *self.entry.lock(), EffectLastApplied::Unapplied);
181                let EffectLastApplied::InProgress { write_event } = prev_state else {
182                    unreachable!("EventGuard: prev_state must be InProgress");
183                };
184                write_event.notify(usize::MAX);
185            }
186        }
187
188        let begin_in_progress = |mut last_applied_guard: MutexGuard<'_, _>| {
189            *last_applied_guard = EffectLastApplied::InProgress {
190                write_event: Event::new(|| || "effect application in progress".to_string()),
191            };
192            EventGuard { entry: &entry }
193        };
194
195        let event_guard = loop {
196            let listener;
197            {
198                let last_applied_guard = entry.lock();
199                match &*last_applied_guard {
200                    EffectLastApplied::Unapplied => {
201                        break begin_in_progress(last_applied_guard);
202                    }
203                    EffectLastApplied::Applied {
204                        value_hash: stored,
205                        result,
206                    } => {
207                        if value_hash == *stored {
208                            return result.clone().map_err(ApplyError::Failed);
209                        } else {
210                            break begin_in_progress(last_applied_guard);
211                        }
212                    }
213                    EffectLastApplied::InProgress { write_event } => {
214                        // Event::listen registers the listener immediately, so notifications
215                        // fired after we drop last_applied_guard cannot be missed.
216                        listener = write_event.listen();
217                    }
218                }
219            };
220            listener.await;
221        };
222
223        // We hold the InProgress guard. Either run the body, or — if we have no content to
224        // apply — release the guard (resetting state to Unapplied + waking waiters) and Retry.
225        let Some(body) = body else {
226            drop(event_guard);
227            return Err(ApplyError::Retry);
228        };
229
230        // Erase the body's concrete error type to `Arc<dyn EffectError>` so the cached result
231        // type is uniform across all callers of the same key.
232        let effect_result: Result<(), Arc<dyn EffectError>> = body()
233            .await
234            .map_err(|err| Arc::new(err) as Arc<dyn EffectError>);
235
236        let prev_state = replace(
237            &mut *entry.lock(),
238            EffectLastApplied::Applied {
239                value_hash,
240                result: effect_result.clone(),
241            },
242        );
243        forget(event_guard);
244
245        let EffectLastApplied::InProgress { write_event } = prev_state else {
246            unreachable!("Effect applied: prev_state must be InProgress");
247        };
248        write_event.notify(usize::MAX);
249
250        effect_result.map_err(ApplyError::Failed)
251    }
252}
253
254/// Capture effects. Call this from within a [turbo-tasks operation][crate::OperationVc].
255///
256/// Collectibles are read from `ResolvedVc`s, so this function, and the return value of this
257/// function should be applied with [`Effects::apply`].
258///
259/// It's important to wrap calls to this function in an [operation with a strongly consistent
260/// read][crate::OperationVc::read_strongly_consistent] before applying the effects outside of the
261/// operation at the top-level (e.g. in a `run_once` closure) with [`Effects::apply`].
262///
263/// # Example
264///
265/// ```rust
266/// # #![feature(arbitrary_self_types_pointers)]
267/// #
268/// # use anyhow::Result;
269/// # use turbo_tasks::{Effects, ReadRef, Vc, run_once, take_effects};
270/// #
271/// # async fn _wrapper() -> Result<()> {
272/// # type Example = ();
273/// # type Args = ();
274/// # let args = ();
275/// # #[turbo_tasks::function(operation)]
276/// # fn some_turbo_tasks_operation(_args: Args) {}
277/// #
278/// #[turbo_tasks::value(serialization = "skip")]
279/// struct OutputWithEffects {
280///     output: ReadRef<Example>,
281///     effects: Effects,
282/// }
283///
284/// // ensure the return value and the collectibles match by using a single operation for both
285/// #[turbo_tasks::function(operation)]
286/// async fn some_turbo_tasks_operation_with_effects(args: Args) -> Result<Vc<OutputWithEffects>> {
287///     let operation = some_turbo_tasks_operation(args);
288///     // we must first read the operation to populate the collectibles
289///     let output = operation.connect().await?;
290///     // read the effects from the collectibles
291///     let effects = take_effects(operation).await?;
292///     Ok(OutputWithEffects { output, effects }.cell())
293/// }
294///
295/// // every operation must be read with strong consistency at the top-level
296/// let result_with_effects = some_turbo_tasks_operation_with_effects(args)
297///     .read_strongly_consistent()
298///     .await?;
299///
300/// // apply the effects once outside of a turbo_tasks::function at the top-level (e.g. `run_once`)
301/// result_with_effects.effects.apply().await?;
302/// # Ok(())
303/// # }
304/// ```
305pub async fn take_effects(source: impl CollectiblesSource) -> Result<Effects> {
306    debug_assert_not_in_top_level_task("take_effects");
307    let effects = source.take_collectibles::<Box<dyn Effect>>();
308
309    let captured: Vec<Box<dyn CapturedEffect>> = effects
310        .into_iter()
311        .map(async |effect_vc| effect_vc.into_trait_ref().await?.capture().await)
312        .try_join()
313        .await?;
314
315    // detect duplicate keys
316    let unique_keys = build_unique_keys(&captured);
317
318    let invalidator = get_invalidator()
319        .expect("take_effects must be called from within a turbo-tasks task context");
320
321    Ok(Effects::new(captured, unique_keys, invalidator))
322}
323
324#[derive(thiserror::Error, Debug, TraceRawVcs, NonLocalValue)]
325#[error("Conflicting effects for the same key (key length: {key_len} bytes)")]
326struct ConflictingEffectError {
327    key_len: usize,
328}
329
330const MAX_KEYS_TO_DISPLAY: usize = 10;
331/// Error returned by [`Effects::apply`]. Callers should retry on `Retry`; everything else is
332/// terminal.
333#[derive(thiserror::Error, Debug, Clone)]
334pub enum EffectsError {
335    /// A side effect failed during apply. Holds the first error encountered.
336    #[error(transparent)]
337    Apply(Arc<dyn EffectError>),
338
339    #[error("conflicting effects for the same key (key length: {0} bytes)")]
340    Conflict(usize),
341
342    #[error(
343        "effect state diverged before apply for {}{}; producing task invalidated, retry required",
344        keys.iter().take(MAX_KEYS_TO_DISPLAY).cloned().collect::<Vec<_>>().join(", "),
345        if keys.len() > MAX_KEYS_TO_DISPLAY { format!(", ... ({} more)", keys.len() - MAX_KEYS_TO_DISPLAY) } else { String::new() }
346    )]
347    Retry { keys: Vec<String> },
348}
349
350impl From<Arc<dyn EffectError>> for EffectsError {
351    fn from(err: Arc<dyn EffectError>) -> Self {
352        EffectsError::Apply(err)
353    }
354}
355
356/// Dedup'd indices into the captured Vec — one entry per unique key. Computed eagerly in
357/// [`take_effects`] purely from the captured effects (no [`EffectStateStorage`] interaction);
358/// the apply-side state machine in [`EffectStateStorage::run_apply`] handles per-key hash dedup.
359type UniqueKeys = Result<Vec<usize>, Arc<ConflictingEffectError>>;
360
361/// Slice of captured effects, individually Arc'd. Each effect is `Arc<dyn CapturedEffect>`
362/// so callers can cheaply clone a Send handle out across `.await` boundaries without holding
363/// the outer mutex.
364type CapturedSlice = Arc<[Arc<dyn CapturedEffect>]>;
365
366/// Captured effects from an operation. This struct can be used to return Effects from a turbo-tasks
367/// function and apply them later.
368///
369/// # Cell semantics
370///
371/// `Effects` uses `cell = "new"`: every producer re-execution allocates a fresh cell value and
372/// the prior cell is dropped. Cell-level dedup of `Effects` is given up; per-key dedup at apply
373/// time is provided by [`EffectStateStorage`]'s state machine (see
374/// [`EffectStateStorage::run_apply`]), which short-circuits when storage already holds
375/// `Applied { value_hash }` matching the new hash.
376///
377/// `Effects::apply` is idempotent and safe to call multiple times on the same value — the state
378/// machine in `run_apply` ensures each underlying side effect runs at most once per stored
379/// `(key, value_hash)` pair across all callers.
380#[turbo_tasks::value(shared, eq = "manual", serialization = "skip", cell = "new")]
381pub struct Effects {
382    /// Pre-resolved effects awaiting application. Lives for the lifetime of the cell — released
383    /// when the producer reruns and `cell = "new"` overwrites the cell, which is when any
384    /// upstream `ReadRef` strong-count cascades are naturally released.
385    #[turbo_tasks(debug_ignore, trace_ignore)]
386    captured: CapturedSlice,
387    /// Captured at `take_effects` time. `None` for `Effects::empty()` (nothing to retry).
388    #[turbo_tasks(debug_ignore, trace_ignore)]
389    invalidator: Option<Invalidator>,
390    /// Unique key info computed eagerly in `take_effects`. Holds one index into `captured` per
391    /// unique key, or a `ConflictingEffectError` if two captured effects share a key with
392    /// different hashes. No [`EffectStateStorage`] interaction here — that is deferred to
393    /// `apply()`.
394    #[turbo_tasks(debug_ignore, trace_ignore)]
395    unique_keys: Arc<UniqueKeys>,
396}
397
398/// `PartialEq`/`Eq` are compat shims so containing structs (which derive `PartialEq`/`Eq` via
399/// `turbo_tasks::value`) can still embed `Effects`. The actual cell-update strategy for `Effects`
400/// itself is `cell = "new"` — see the doc-comment above — so this `PartialEq` is not consulted
401/// for `Effects` cells. We always return `false` to match `cell = "new"` semantics for the
402/// wrapper structs (they should also refresh on every producer run).
403impl PartialEq for Effects {
404    fn eq(&self, _other: &Self) -> bool {
405        false
406    }
407}
408impl Eq for Effects {}
409
410impl Effects {
411    /// A test-only placeholder `Effects` value with no effects (and no producer to invalidate).
412    #[cfg(test)]
413    fn empty() -> Self {
414        Self {
415            captured: Arc::from(Vec::new()),
416            invalidator: None,
417            unique_keys: Arc::new(Ok(Vec::new())),
418        }
419    }
420
421    fn new(
422        captured: Vec<Box<dyn CapturedEffect>>,
423        unique_keys: UniqueKeys,
424        invalidator: Invalidator,
425    ) -> Self {
426        // Convert Box<dyn> into Arc<dyn> per slot. Each Arc is independently Send/Sync.
427        let captured: CapturedSlice = captured
428            .into_iter()
429            .map(Arc::<dyn CapturedEffect>::from)
430            .collect();
431        Self {
432            captured,
433            invalidator: Some(invalidator),
434            unique_keys: Arc::new(unique_keys),
435        }
436    }
437
438    /// Applies all effects that have been captured.
439    ///
440    /// Dispatch goes through each captured effect's [`CapturedEffect::apply`] (via
441    /// [`EffectStateStorage::run_apply`]) which handles the per-key state machine, dedup hits,
442    /// in-progress coordination, and panic recovery. The dispatch is idempotent — calling
443    /// `apply()` multiple times on the same `Effects` value runs each underlying side effect at
444    /// most once per stored `(key, value_hash)` pair.
445    ///
446    /// If any captured effect signals [`ApplyError::Retry`] (its content was elided at capture
447    /// time and storage state diverged between capture and apply), the producing task is
448    /// invalidated and [`EffectsError::Retry`] is returned after the remaining keys finish.
449    /// Side-effect failures (`ApplyError::Failed`) propagate as [`EffectsError::Apply`]; the
450    /// first such error wins.
451    ///
452    /// `apply` must only be used in a "top-level" task (e.g. [`run_once`][crate::run_once]),
453    /// after [`take_effects`] is called from an [operation read with strong
454    /// consistency][crate::OperationVc::read_strongly_consistent].
455    ///
456    /// See [`take_effects`] for example usage.
457    ///
458    /// **Do not call this directly.** External callers must go through
459    /// [`read_strongly_consistent_and_apply_effects`] or
460    /// [`resolve_strongly_consistent_and_take_and_apply_effects`], which own the read+apply+retry
461    /// loop required to recover from [`EffectsError::Retry`]. Exposed publicly only as
462    /// [`Effects::apply_for_testing`] (`#[doc(hidden)]`) so integration tests can drive the apply
463    /// state machine directly.
464    async fn apply(&self) -> Result<(), EffectsError> {
465        debug_assert_in_top_level_task(
466            "Effects::apply must be called from a top-level task to avoid unintended \
467             re-executions due to eventual consistency",
468        );
469        let unique = match self.unique_keys.as_ref() {
470            Ok(unique) => unique.as_slice(),
471            Err(err) => return Err(EffectsError::Conflict(err.key_len)),
472        };
473        if unique.is_empty() {
474            return Ok(());
475        }
476
477        let span = tracing::info_span!("apply effects", count = unique.len());
478        let captured = &self.captured;
479
480        async {
481            // Collect the keys of any effects that signaled `Retry` across the parallel apply so
482            // we invalidate at most once at the end of the batch and can report which outputs
483            // forced the retry. `Apply` errors still take precedence — they fail-fast through the
484            // `try_for_each_concurrent`.
485            let retry_keys = Mutex::new(Vec::<String>::new());
486            let result: Result<(), EffectsError> = futures::stream::iter(unique.iter())
487                .map(Ok::<_, EffectsError>)
488                .try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |idx| {
489                    // Run each apply on its own spawned task so that pending effects execute in
490                    // parallel rather than serially on this future (see #94140).
491                    let effect = captured[*idx].clone();
492                    match spawn(async move { effect.apply().await }).await {
493                        Ok(()) => Ok(()),
494                        Err(ApplyError::Failed(err)) => Err(EffectsError::Apply(err)),
495                        Err(ApplyError::Retry) => {
496                            let key = captured[*idx].key();
497                            retry_keys
498                                .lock()
499                                .push(String::from_utf8_lossy(&key).into_owned());
500                            Ok(())
501                        }
502                    }
503                })
504                .await;
505
506            match result {
507                Err(e) => Err(e),
508                Ok(()) => {
509                    let retry_keys = retry_keys.into_inner();
510                    if retry_keys.is_empty() {
511                        Ok(())
512                    } else {
513                        self.signal_retry(retry_keys)
514                    }
515                }
516            }
517        }
518        .instrument(span)
519        .await
520    }
521
522    /// Test-only public alias for [`Effects::apply`]. Lets integration tests in other crates drive
523    /// the per-key apply state machine directly (e.g. asserting dedup counts or the raw
524    /// [`EffectsError::Retry`] signal). Production code must use
525    /// [`read_strongly_consistent_and_apply_effects`] instead, which owns the retry loop.
526    #[doc(hidden)]
527    pub async fn apply_for_testing(&self) -> Result<(), EffectsError> {
528        self.apply().await
529    }
530
531    /// Invalidate the producing task (if any) and return [`EffectsError::Retry`] carrying the
532    /// `keys` that signaled [`ApplyError::Retry`] (their capture elided content materialization but
533    /// storage state diverged before apply).
534    fn signal_retry(&self, keys: Vec<String>) -> Result<(), EffectsError> {
535        if let Some(invalidator) = self.invalidator {
536            with_turbo_tasks(|tt| invalidator.invalidate(&**tt));
537        }
538        Err(EffectsError::Retry { keys })
539    }
540}
541
542/// Strongly-consistent read of `op`, then apply its effects, retrying the whole read+apply on
543/// [`EffectsError::Retry`].
544///
545/// `get_effects` extracts the [`Effects`] from the read value (for a wrapper struct this is
546/// `|v| &v.effects`; for an `OperationVc<Effects>` it is `|e| e`).
547///
548/// On [`EffectsError::Retry`] the producing operation has already been invalidated by
549/// [`Effects::apply`], so the next
550/// [`read_strongly_consistent`][OperationVc::read_strongly_consistent] re-runs the producer and
551/// yields a fresh [`Effects`] whose `capture()` re-materializes content. Retries are bounded to
552/// avoid livelock when two producers perpetually stomp the same key; after the first retry a
553/// warning is logged on each subsequent attempt, and on exhaustion the last `Retry` surfaces as an
554/// error.
555///
556/// This is one of two public entry points for applying effects (see also
557/// [`read_strongly_consistent_and_apply_effects_with`]) — [`Effects::apply`] is private so the
558/// retry contract cannot be bypassed.
559pub async fn read_strongly_consistent_and_apply_effects<T, F>(
560    op: OperationVc<T>,
561    get_effects: F,
562) -> Result<ReadRef<T>>
563where
564    T: VcValueType,
565    F: Fn(&<<T as VcValueType>::Read as VcRead<T>>::Target) -> &Effects,
566{
567    let mut attempts = 0usize;
568    loop {
569        let value = op.read_strongly_consistent().await?;
570        // Deref the `ReadRef<T>` to the read target (`T` for non-transparent types).
571        let effects = get_effects(&*value);
572        match effects.apply().await {
573            Ok(()) => return Ok(value),
574            Err(e) => handle_apply_retry(e, &mut attempts)?,
575        }
576    }
577}
578
579/// AVOID CALLING THIS UNLESS DEEPLY REQUIRED
580///
581/// Like [`read_strongly_consistent_and_apply_effects`], but the [`Effects`] directly accessed by
582/// calling [`take_effects`] on the supplied operation.
583///
584/// Unlike [`read_strongly_consistent_and_apply_effects`], this may be called from *inside* a
585/// turbo-tasks task (it owns the `mark`/`unmark` around `apply`). The consequence is that the
586/// effects may be re-applied if that enclosing task is invalidated — acceptable for lazily-created
587/// resources.
588pub async fn resolve_strongly_consistent_and_take_and_apply_effects<T>(
589    op: OperationVc<T>,
590) -> Result<ResolvedVc<T>>
591where
592    T: VcValueType,
593{
594    let mut attempts = 0usize;
595    loop {
596        let value = op.resolve().strongly_consistent().await?;
597        // Run the callback while *not* marked top-level so it can `take_effects` / read Vcs.
598
599        let effects = take_effects(op).await?;
600        // `Effects::apply` asserts it runs at the top-level. Mark only around the apply, then
601        // unmark so any further work (including the next loop iteration's read) is unaffected.
602        mark_top_level_task();
603        let result = effects.apply().await;
604        unmark_top_level_task_may_leak_eventually_consistent_state();
605        match result {
606            Ok(()) => return Ok(value),
607            Err(e) => handle_apply_retry(e, &mut attempts)?,
608        }
609    }
610}
611
612/// Shared retry-decision for the two `read_strongly_consistent_and_apply_effects*` helpers.
613///
614/// Returns `Ok(())` to signal the caller should retry the read+apply loop (bounded by
615/// `MAX_RETRIES`). Returns `Err` for terminal outcomes: a non-`Retry` error, or `Retry` after the
616/// retry budget is exhausted.
617fn handle_apply_retry(err: EffectsError, attempts: &mut usize) -> Result<()> {
618    const MAX_RETRIES: usize = 4; // chosen by a fair dice roll
619    match err {
620        EffectsError::Retry { keys } if *attempts < MAX_RETRIES => {
621            *attempts += 1;
622            // Warn on every retry after the first.
623            if *attempts > 1 {
624                tracing::warn!(
625                    attempts = *attempts,
626                    ?keys,
627                    "retrying effect application; this implies multiple routes are fighting to \
628                     write one of these files",
629                );
630            }
631            Ok(())
632        }
633        EffectsError::Retry { keys } => anyhow::bail!(
634            "gave up applying effects after {MAX_RETRIES} retries; repeated effect-state \
635             divergence on: {keys:?}. This implies multiple routes are fighting to write one of \
636             these files."
637        ),
638        e => Err(e.into()),
639    }
640}
641
642/// Build the deduped per-key indices into the captured slice. Detects per-key value-hash
643/// conflicts. This is the eager half of effect deduplication — it inspects only the captured
644/// effects themselves (no [`EffectStateStorage`] interaction) and is therefore safe to call
645/// from inside a turbo-tasks task in [`take_effects`].
646fn build_unique_keys(captured: &[Box<dyn CapturedEffect>]) -> UniqueKeys {
647    let mut by_key: FxHashMap<Box<[u8]>, usize> = FxHashMap::default();
648    for (idx, effect) in captured.iter().enumerate() {
649        match by_key.entry(effect.key()) {
650            hash_map::Entry::Vacant(entry) => {
651                entry.insert(idx);
652            }
653            hash_map::Entry::Occupied(entry) => {
654                if captured[*entry.get()].value_hash() != effect.value_hash() {
655                    return Err(Arc::new(ConflictingEffectError {
656                        key_len: entry.key().len(),
657                    }));
658                }
659            }
660        }
661    }
662
663    let mut keys: Vec<usize> = by_key.into_values().collect();
664    // Sort by idx so the order is deterministic — useful for stable tracing/logging.
665    keys.sort_unstable();
666    Ok(keys)
667}
668
669#[cfg(test)]
670mod tests {
671    use crate::{CollectiblesSource, Effects, take_effects};
672
673    #[test]
674    #[allow(dead_code)]
675    fn is_send() {
676        fn assert_send<T: Send>(_: T) {}
677        fn check_effects_apply() {
678            assert_send(Effects::empty().apply());
679        }
680        fn check_take_effects<T: CollectiblesSource + Send + Sync>(t: T) {
681            assert_send(take_effects(t));
682        }
683    }
684}