Skip to main content

turbo_tasks/
effect.rs

1use std::{
2    collections::hash_map,
3    error::Error as StdError,
4    future::Future,
5    mem::{forget, replace},
6    pin::Pin,
7    sync::{Arc, OnceLock},
8};
9
10use anyhow::Result;
11use futures::{StreamExt, TryStreamExt};
12use parking_lot::{Mutex, MutexGuard};
13use rustc_hash::{FxHashMap, FxHashSet};
14use tracing::Instrument;
15
16use crate::{
17    self as turbo_tasks, CollectiblesSource, NonLocalValue, ReadRef, ResolvedVc, TryJoinIterExt,
18    emit,
19    event::Event,
20    manager::{debug_assert_in_top_level_task, debug_assert_not_in_top_level_task},
21    trace::TraceRawVcs,
22};
23
24const APPLY_EFFECTS_CONCURRENCY_LIMIT: usize = 1024;
25
26pub trait Effect: TraceRawVcs + NonLocalValue + Send + Sync + 'static {
27    /// The error type that an effect can return. We use `dyn std::error::Error` (instead of
28    /// [`anyhow::Error`] or [`SharedError`]) to encourage use of structured error types that can
29    /// potentially be transformed into `Issue`s.
30    ///
31    /// We can't require that the returned error implements `Issue`:
32    /// - `Issue` uses `FileSystemPath`
33    /// - `turbo-tasks-fs` returns effect errors that should be transformed into `Issue`s.
34    /// - It logically doesn't make sense to define `Issue` in `turbo-tasks-fs`, `Issue` can't be
35    ///   defined in a base crate either because it would form a circular crate dependency.
36    ///
37    /// So instead, we leave it up to the caller to figure out how to downcast these errors
38    /// themselves.
39    ///
40    /// [`SharedError`]: crate::util::SharedError
41    type Error: EffectError;
42
43    /// Unique key identifying this effect's target (e.g., absolute path bytes).
44    fn key(&self) -> Box<[u8]>;
45
46    /// Extract the hash of the value part of this effect for comparison.
47    fn value_hash(&self) -> u128;
48
49    /// Returns a reference to the state storage.
50    fn state_storage(&self) -> &EffectStateStorage;
51
52    /// Perform the side effect (write file, create symlink, etc.).
53    fn apply(&self) -> impl Future<Output = Result<(), Self::Error>> + Send;
54}
55
56/// The error type that an effect can return. We use `dyn std::error::Error` (instead of
57/// [`anyhow::Error`] or [`SharedError`]) to encourage use of structured error types that can
58/// potentially be transformed into `Issue`s.
59///
60/// We can't require that the returned error implements `Issue`:
61/// - `Issue` uses `FileSystemPath`
62/// - `turbo-tasks-fs` returns effect errors that should be transformed into `Issue`s.
63/// - It logically doesn't make sense to define `Issue` in `turbo-tasks-fs`, `Issue` can't be
64///   defined in a base crate either because it would form a circular crate dependency.
65///
66/// So instead, we leave it up to the caller to figure out how to downcast these errors themselves.
67///
68/// [`SharedError`]: crate::util::SharedError
69pub trait EffectError: StdError + TraceRawVcs + NonLocalValue + Send + Sync + 'static {}
70impl<T> EffectError for T where T: StdError + TraceRawVcs + NonLocalValue + Send + Sync + 'static {}
71
72enum EffectLastApplied {
73    Unapplied,
74    InProgress {
75        write_event: Event,
76    },
77    Applied {
78        value_hash: u128,
79        result: Result<(), Arc<dyn EffectError>>,
80    },
81}
82
83/// Per-key entry in the effect state storage.
84type EffectStateEntry = Arc<Mutex<EffectLastApplied>>;
85/// Shared state storage for tracking applied effects. Stored on the filesystem implementation
86/// (e.g. DiskFileSystemInner).
87#[derive(Default)]
88pub struct EffectStateStorage {
89    effect_state: Mutex<FxHashMap<Box<[u8]>, EffectStateEntry>>,
90}
91
92// Private wrapper trait to allow dynamic dispatch of an `Effect`. This is similar to the pattern
93// that the dynosaur crate uses: https://github.com/spastorino/dynosaur
94trait DynEffect: TraceRawVcs + NonLocalValue + Send + Sync + 'static {
95    fn key(&self) -> Box<[u8]>;
96    fn value_hash(&self) -> u128;
97    fn state_storage(&self) -> &EffectStateStorage;
98    fn dyn_apply<'a>(&'a self) -> DynEffectApplyFuture<'a>;
99}
100
101impl<T> DynEffect for T
102where
103    T: Effect,
104{
105    fn key(&self) -> Box<[u8]> {
106        Effect::key(self)
107    }
108
109    fn value_hash(&self) -> u128 {
110        Effect::value_hash(self)
111    }
112
113    fn state_storage(&self) -> &EffectStateStorage {
114        Effect::state_storage(self)
115    }
116
117    fn dyn_apply<'a>(&'a self) -> DynEffectApplyFuture<'a> {
118        Box::pin(async move { Effect::apply(self).await.map_err(|err| Arc::new(err) as _) })
119    }
120}
121
122type DynEffectApplyFuture<'a> =
123    Pin<Box<dyn Future<Output = Result<(), Arc<dyn EffectError>>> + Send + 'a>>;
124
125/// A trait to emit a task effect as collectible. This trait only has one implementation,
126/// `EffectInstance` and no other implementation is allowed. The trait is private to this module so
127/// that no other implementation can be added.
128#[turbo_tasks::value_trait]
129trait EffectCollectible {}
130
131/// The Effect instance collectible that is emitted for effects.
132#[turbo_tasks::value(serialization = "skip", evict = "last", cell = "new", eq = "manual")]
133struct EffectInstance {
134    #[turbo_tasks(debug_ignore)]
135    inner: Box<dyn DynEffect>,
136}
137
138impl EffectInstance {
139    fn new(effect: impl Effect) -> Self {
140        Self {
141            inner: Box::new(effect) as Box<dyn DynEffect>,
142        }
143    }
144}
145
146#[turbo_tasks::value_impl]
147impl EffectCollectible for EffectInstance {}
148
149/// Emits an effect to be applied. The effect is executed once [`Effects::apply`] is called (see
150/// [`take_effects`]).
151///
152/// The effect will only executed once. The effect is executed outside of the current task
153/// and can't read any Vcs. These need to be read before. ReadRefs can be passed into the effect.
154///
155/// Effects are executed in parallel, so they might need to use async locking to avoid problems.
156/// Order of execution of multiple effects is not defined. You must not use multiple conflicting
157/// effects to avoid non-deterministic behavior.
158pub fn emit_effect(effect: impl Effect) {
159    emit::<Box<dyn EffectCollectible>>(ResolvedVc::upcast(
160        EffectInstance::new(effect).resolved_cell(),
161    ));
162}
163
164/// Capture effects. Call this from within a [turbo-tasks operation][crate::OperationVc].
165///
166/// Collectibles are read from `ResolvedVc`s, so this function, and the return value of this
167/// function should be applied with [`Effects::apply`].
168///
169/// It's important to wrap calls to this function in an [operation with a strongly consistent
170/// read][crate::OperationVc::read_strongly_consistent] before applying the effects outside of the
171/// operation at the top-level (e.g. in a `run_once` closure) with [`Effects::apply`].
172///
173/// # Example
174///
175/// ```rust
176/// # #![feature(arbitrary_self_types_pointers)]
177/// #
178/// # use anyhow::Result;
179/// # use turbo_tasks::{Effects, ReadRef, Vc, run_once, take_effects};
180/// #
181/// # async fn _wrapper() -> Result<()> {
182/// # type Example = ();
183/// # type Args = ();
184/// # let args = ();
185/// # #[turbo_tasks::function(operation)]
186/// # fn some_turbo_tasks_operation(_args: Args) {}
187/// #
188/// #[turbo_tasks::value(serialization = "skip", evict = "last")]
189/// struct OutputWithEffects {
190///     output: ReadRef<Example>,
191///     effects: Effects,
192/// }
193///
194/// // ensure the return value and the collectibles match by using a single operation for both
195/// #[turbo_tasks::function(operation)]
196/// async fn some_turbo_tasks_operation_with_effects(args: Args) -> Result<Vc<OutputWithEffects>> {
197///     let operation = some_turbo_tasks_operation(args);
198///     // we must first read the operation to populate the collectibles
199///     let output = operation.connect().await?;
200///     // read the effects from the collectibles
201///     let effects = take_effects(operation).await?;
202///     Ok(OutputWithEffects { output, effects }.cell())
203/// }
204///
205/// // every operation must be read with strong consistency at the top-level
206/// let result_with_effects = some_turbo_tasks_operation_with_effects(args)
207///     .read_strongly_consistent()
208///     .await?;
209///
210/// // apply the effects once outside of a turbo_tasks::function at the top-level (e.g. `run_once`)
211/// result_with_effects.effects.apply().await?;
212/// # Ok(())
213/// # }
214/// ```
215pub async fn take_effects(source: impl CollectiblesSource) -> Result<Effects> {
216    debug_assert_not_in_top_level_task("take_effects");
217    let effects = source
218        .take_collectibles::<Box<dyn EffectCollectible>>()
219        .into_iter()
220        .map(|effect| {
221            if let Some(effect) = ResolvedVc::try_downcast_type::<EffectInstance>(effect) {
222                effect
223            } else {
224                unreachable!("EffectCollectible must only be implemented by EffectInstance");
225            }
226        })
227        .try_join()
228        .await?;
229    Ok(Effects {
230        effects,
231        unique_indices: OnceLock::new(),
232    })
233}
234
235#[derive(thiserror::Error, Debug, TraceRawVcs, NonLocalValue)]
236#[error("Conflicting effects for the same key (key length: {key_len} bytes)")]
237struct ConflictingEffectError {
238    key_len: usize,
239}
240
241/// Cached result of grouping effects by key and dedup/conflict detection.
242/// Each entry is (index into `effects`, per-key state entry).
243/// The `EffectStateEntry` is resolved once and cached to avoid repeated map lookups on subsequent
244/// `apply()` calls.
245type UniqueEffectIndices = Result<Vec<(usize, EffectStateEntry)>, Arc<ConflictingEffectError>>;
246
247/// Captured effects from an operation. This struct can be used to return Effects from a turbo-tasks
248/// function and apply them later.
249#[derive(Default)]
250#[turbo_tasks::value(shared, eq = "manual", serialization = "skip", evict = "last")]
251pub struct Effects {
252    #[turbo_tasks(debug_ignore)]
253    effects: Vec<ReadRef<EffectInstance>>,
254    /// Cached `(index, state_entry)` pairs after grouping by key and dedup/conflict detection.
255    /// Computed once on first `apply()` call; reused on subsequent calls to avoid repeated
256    /// key allocations and map lookups.
257    /// `Err` means a conflict was detected.
258    #[turbo_tasks(debug_ignore, trace_ignore)]
259    unique_indices: OnceLock<UniqueEffectIndices>,
260}
261
262impl PartialEq for Effects {
263    fn eq(&self, other: &Self) -> bool {
264        if self.effects.len() != other.effects.len() {
265            return false;
266        }
267        let effect_ptrs = self
268            .effects
269            .iter()
270            .map(ReadRef::ptr)
271            .collect::<FxHashSet<_>>();
272        other
273            .effects
274            .iter()
275            .all(|e| effect_ptrs.contains(&ReadRef::ptr(e)))
276    }
277}
278
279impl Eq for Effects {}
280
281impl Effects {
282    /// Applies all effects that have been captured.
283    ///
284    /// On first call: groups effects by key, detects duplicates/conflicts, caches deduped indices.
285    /// On subsequent calls: skips grouping (reuses cached indices), only runs per-key state checks.
286    ///
287    /// `apply` must only be used in a "top-level" task (e.g. [`run_once`][crate::run_once]), after
288    /// [`take_effects`] is called from an [operation read with strong
289    /// consistency][crate::OperationVc::read_strongly_consistent].
290    ///
291    /// See [`take_effects`] for example usage.
292    pub async fn apply(&self) -> Result<(), Arc<dyn EffectError>> {
293        debug_assert_in_top_level_task(
294            "Effects::apply must be called from a top-level task to avoid unintended \
295             re-executions due to eventual consistency",
296        );
297        if self.effects.is_empty() {
298            return Ok(());
299        }
300
301        let span = tracing::info_span!("apply effects", count = self.effects.len());
302
303        async {
304            // Compute unique (index, state_entry) pairs once; reuse on later calls.
305            // The EffectStateEntry is resolved from the state map on first call and cached
306            // here, so subsequent apply() calls bypass the map lookup entirely.
307            let unique_indices = self
308                .unique_indices
309                .get_or_init(|| {
310                    let mut by_key: FxHashMap<Box<[u8]>, usize> = FxHashMap::default();
311                    for (idx, effect) in self.effects.iter().enumerate() {
312                        match by_key.entry(effect.inner.key()) {
313                            hash_map::Entry::Vacant(entry) => {
314                                entry.insert(idx);
315                            }
316                            hash_map::Entry::Occupied(entry) => {
317                                if self.effects[*entry.get()].inner.value_hash()
318                                    != effect.inner.value_hash()
319                                {
320                                    return Err(Arc::new(ConflictingEffectError {
321                                        key_len: entry.key().len(),
322                                    }));
323                                }
324                            }
325                        }
326                    }
327
328                    let mut indices = Vec::with_capacity(by_key.len());
329                    for (key, effect_idx) in by_key {
330                        let state_storage = self.effects[effect_idx].inner.state_storage();
331                        // Look up or create the per-key state entry and cache the Arc directly.
332                        let entry = state_storage
333                            .effect_state
334                            .lock()
335                            .entry(key)
336                            .or_insert_with(|| Arc::new(Mutex::new(EffectLastApplied::Unapplied)))
337                            .clone();
338                        indices.push((effect_idx, entry));
339                    }
340                    Ok(indices)
341                })
342                .as_ref()
343                .map_err(|err| err.clone() as Arc<_>)?;
344
345            // Apply effects using cached (index, state_entry) pairs.
346            // Hot path: no map lookup — EffectStateEntry is cached in unique_indices.
347            futures::stream::iter(unique_indices.iter())
348                .map(Ok::<_, Arc<dyn EffectError>>)
349                .try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |(idx, entry)| {
350                    let effect: &dyn DynEffect = &*self.effects[*idx].inner;
351
352                    // If `effect.dyn_apply` panics or the apply future is dropped before
353                    // completion, the guard's drop impl resets the per-key state to `Unapplied`
354                    // and notifies other waiters via the `Event` it recovers from the previous
355                    // `InProgress`, so they retry rather than deadlock or observe a stale
356                    // "panic" cache entry.
357                    struct EventGuard<'a> {
358                        entry: &'a EffectStateEntry,
359                    }
360                    impl Drop for EventGuard<'_> {
361                        fn drop(&mut self) {
362                            let prev_state =
363                                replace(&mut *self.entry.lock(), EffectLastApplied::Unapplied);
364                            let EffectLastApplied::InProgress { write_event } = prev_state else {
365                                unreachable!("EventGuard: prev_state must be InProgress");
366                            };
367                            write_event.notify(usize::MAX);
368                        }
369                    }
370
371                    let begin_in_progress = |mut last_applied_guard: MutexGuard<'_, _>| {
372                        *last_applied_guard = EffectLastApplied::InProgress {
373                            write_event: Event::new(|| {
374                                || "effect application in progress".to_string()
375                            }),
376                        };
377                        EventGuard { entry }
378                    };
379
380                    let event_guard = loop {
381                        let listener;
382                        {
383                            let last_applied_guard = entry.lock();
384                            match &*last_applied_guard {
385                                EffectLastApplied::Unapplied => {
386                                    break begin_in_progress(last_applied_guard);
387                                }
388                                EffectLastApplied::Applied { value_hash, result } => {
389                                    // Fast path: check if the stored value already matches
390                                    if effect.value_hash() == *value_hash {
391                                        return result.clone();
392                                    } else {
393                                        break begin_in_progress(last_applied_guard);
394                                    }
395                                }
396                                EffectLastApplied::InProgress { write_event } => {
397                                    // `Event::listen` registers the listener immediately, so
398                                    // notifications fired after we drop `last_applied_guard`
399                                    // cannot be missed.
400                                    listener = write_event.listen();
401                                }
402                            }
403                        };
404                        // We didn't break out of the loop: There's a concurrent in-progress
405                        // application. Wait for it to finish and then read `last_applied` again.
406                        listener.await;
407                    };
408
409                    // Apply the effect. InProgress is visible to concurrent readers, preventing
410                    // stale fast-path matches.
411                    let effect_result = effect.dyn_apply().await;
412
413                    // Update the state, clear the EventGuard
414                    let prev_state = replace(
415                        &mut *entry.lock(),
416                        EffectLastApplied::Applied {
417                            value_hash: effect.value_hash(),
418                            result: effect_result.clone(),
419                        },
420                    );
421                    // don't run the `Drop` impl on `EventGuard`. We'll do the notification
422                    // ourselves.
423                    forget(event_guard);
424
425                    let EffectLastApplied::InProgress { write_event } = prev_state else {
426                        unreachable!("Effect applied: prev_state must be InProgress");
427                    };
428                    write_event.notify(usize::MAX);
429
430                    effect_result
431                })
432                .await
433        }
434        .instrument(span)
435        .await
436    }
437}
438
439#[cfg(test)]
440mod tests {
441    use crate::{CollectiblesSource, Effects, take_effects};
442
443    #[test]
444    #[allow(dead_code)]
445    fn is_send() {
446        fn assert_send<T: Send>(_: T) {}
447        fn check_effects_apply() {
448            assert_send(
449                Effects {
450                    effects: Vec::new(),
451                    unique_indices: Default::default(),
452                }
453                .apply(),
454            );
455        }
456        fn check_take_effects<T: CollectiblesSource + Send + Sync>(t: T) {
457            assert_send(take_effects(t));
458        }
459    }
460}