turbo_tasks/effect.rs
1use std::{
2 collections::hash_map,
3 error::Error as StdError,
4 future::Future,
5 mem::{forget, replace},
6 pin::Pin,
7 sync::{Arc, OnceLock},
8};
9
10use anyhow::Result;
11use futures::{StreamExt, TryStreamExt};
12use parking_lot::{Mutex, MutexGuard};
13use rustc_hash::{FxHashMap, FxHashSet};
14use tracing::Instrument;
15
16use crate::{
17 self as turbo_tasks, CollectiblesSource, NonLocalValue, ReadRef, ResolvedVc, TryJoinIterExt,
18 emit,
19 event::Event,
20 manager::{debug_assert_in_top_level_task, debug_assert_not_in_top_level_task},
21 trace::TraceRawVcs,
22};
23
24const APPLY_EFFECTS_CONCURRENCY_LIMIT: usize = 1024;
25
26pub trait Effect: TraceRawVcs + NonLocalValue + Send + Sync + 'static {
27 /// The error type that an effect can return. We use `dyn std::error::Error` (instead of
28 /// [`anyhow::Error`] or [`SharedError`]) to encourage use of structured error types that can
29 /// potentially be transformed into `Issue`s.
30 ///
31 /// We can't require that the returned error implements `Issue`:
32 /// - `Issue` uses `FileSystemPath`
33 /// - `turbo-tasks-fs` returns effect errors that should be transformed into `Issue`s.
34 /// - It logically doesn't make sense to define `Issue` in `turbo-tasks-fs`, `Issue` can't be
35 /// defined in a base crate either because it would form a circular crate dependency.
36 ///
37 /// So instead, we leave it up to the caller to figure out how to downcast these errors
38 /// themselves.
39 ///
40 /// [`SharedError`]: crate::util::SharedError
41 type Error: EffectError;
42
43 /// Unique key identifying this effect's target (e.g., absolute path bytes).
44 fn key(&self) -> Box<[u8]>;
45
46 /// Extract the hash of the value part of this effect for comparison.
47 fn value_hash(&self) -> u128;
48
49 /// Returns a reference to the state storage.
50 fn state_storage(&self) -> &EffectStateStorage;
51
52 /// Perform the side effect (write file, create symlink, etc.).
53 fn apply(&self) -> impl Future<Output = Result<(), Self::Error>> + Send;
54}
55
56/// The error type that an effect can return. We use `dyn std::error::Error` (instead of
57/// [`anyhow::Error`] or [`SharedError`]) to encourage use of structured error types that can
58/// potentially be transformed into `Issue`s.
59///
60/// We can't require that the returned error implements `Issue`:
61/// - `Issue` uses `FileSystemPath`
62/// - `turbo-tasks-fs` returns effect errors that should be transformed into `Issue`s.
63/// - It logically doesn't make sense to define `Issue` in `turbo-tasks-fs`, `Issue` can't be
64/// defined in a base crate either because it would form a circular crate dependency.
65///
66/// So instead, we leave it up to the caller to figure out how to downcast these errors themselves.
67///
68/// [`SharedError`]: crate::util::SharedError
69pub trait EffectError: StdError + TraceRawVcs + NonLocalValue + Send + Sync + 'static {}
70impl<T> EffectError for T where T: StdError + TraceRawVcs + NonLocalValue + Send + Sync + 'static {}
71
72enum EffectLastApplied {
73 Unapplied,
74 InProgress {
75 write_event: Event,
76 },
77 Applied {
78 value_hash: u128,
79 result: Result<(), Arc<dyn EffectError>>,
80 },
81}
82
83/// Per-key entry in the effect state storage.
84type EffectStateEntry = Arc<Mutex<EffectLastApplied>>;
85/// Shared state storage for tracking applied effects. Stored on the filesystem implementation
86/// (e.g. DiskFileSystemInner).
87#[derive(Default)]
88pub struct EffectStateStorage {
89 effect_state: Mutex<FxHashMap<Box<[u8]>, EffectStateEntry>>,
90}
91
92// Private wrapper trait to allow dynamic dispatch of an `Effect`. This is similar to the pattern
93// that the dynosaur crate uses: https://github.com/spastorino/dynosaur
94trait DynEffect: TraceRawVcs + NonLocalValue + Send + Sync + 'static {
95 fn key(&self) -> Box<[u8]>;
96 fn value_hash(&self) -> u128;
97 fn state_storage(&self) -> &EffectStateStorage;
98 fn dyn_apply<'a>(&'a self) -> DynEffectApplyFuture<'a>;
99}
100
101impl<T> DynEffect for T
102where
103 T: Effect,
104{
105 fn key(&self) -> Box<[u8]> {
106 Effect::key(self)
107 }
108
109 fn value_hash(&self) -> u128 {
110 Effect::value_hash(self)
111 }
112
113 fn state_storage(&self) -> &EffectStateStorage {
114 Effect::state_storage(self)
115 }
116
117 fn dyn_apply<'a>(&'a self) -> DynEffectApplyFuture<'a> {
118 Box::pin(async move { Effect::apply(self).await.map_err(|err| Arc::new(err) as _) })
119 }
120}
121
122type DynEffectApplyFuture<'a> =
123 Pin<Box<dyn Future<Output = Result<(), Arc<dyn EffectError>>> + Send + 'a>>;
124
125/// A trait to emit a task effect as collectible. This trait only has one implementation,
126/// `EffectInstance` and no other implementation is allowed. The trait is private to this module so
127/// that no other implementation can be added.
128#[turbo_tasks::value_trait]
129trait EffectCollectible {}
130
131/// The Effect instance collectible that is emitted for effects.
132#[turbo_tasks::value(serialization = "skip", evict = "last", cell = "new", eq = "manual")]
133struct EffectInstance {
134 #[turbo_tasks(debug_ignore)]
135 inner: Box<dyn DynEffect>,
136}
137
138impl EffectInstance {
139 fn new(effect: impl Effect) -> Self {
140 Self {
141 inner: Box::new(effect) as Box<dyn DynEffect>,
142 }
143 }
144}
145
146#[turbo_tasks::value_impl]
147impl EffectCollectible for EffectInstance {}
148
149/// Emits an effect to be applied. The effect is executed once [`Effects::apply`] is called (see
150/// [`take_effects`]).
151///
152/// The effect will only executed once. The effect is executed outside of the current task
153/// and can't read any Vcs. These need to be read before. ReadRefs can be passed into the effect.
154///
155/// Effects are executed in parallel, so they might need to use async locking to avoid problems.
156/// Order of execution of multiple effects is not defined. You must not use multiple conflicting
157/// effects to avoid non-deterministic behavior.
158pub fn emit_effect(effect: impl Effect) {
159 emit::<Box<dyn EffectCollectible>>(ResolvedVc::upcast(
160 EffectInstance::new(effect).resolved_cell(),
161 ));
162}
163
164/// Capture effects. Call this from within a [turbo-tasks operation][crate::OperationVc].
165///
166/// Collectibles are read from `ResolvedVc`s, so this function, and the return value of this
167/// function should be applied with [`Effects::apply`].
168///
169/// It's important to wrap calls to this function in an [operation with a strongly consistent
170/// read][crate::OperationVc::read_strongly_consistent] before applying the effects outside of the
171/// operation at the top-level (e.g. in a `run_once` closure) with [`Effects::apply`].
172///
173/// # Example
174///
175/// ```rust
176/// # #![feature(arbitrary_self_types_pointers)]
177/// #
178/// # use anyhow::Result;
179/// # use turbo_tasks::{Effects, ReadRef, Vc, run_once, take_effects};
180/// #
181/// # async fn _wrapper() -> Result<()> {
182/// # type Example = ();
183/// # type Args = ();
184/// # let args = ();
185/// # #[turbo_tasks::function(operation)]
186/// # fn some_turbo_tasks_operation(_args: Args) {}
187/// #
188/// #[turbo_tasks::value(serialization = "skip", evict = "last")]
189/// struct OutputWithEffects {
190/// output: ReadRef<Example>,
191/// effects: Effects,
192/// }
193///
194/// // ensure the return value and the collectibles match by using a single operation for both
195/// #[turbo_tasks::function(operation)]
196/// async fn some_turbo_tasks_operation_with_effects(args: Args) -> Result<Vc<OutputWithEffects>> {
197/// let operation = some_turbo_tasks_operation(args);
198/// // we must first read the operation to populate the collectibles
199/// let output = operation.connect().await?;
200/// // read the effects from the collectibles
201/// let effects = take_effects(operation).await?;
202/// Ok(OutputWithEffects { output, effects }.cell())
203/// }
204///
205/// // every operation must be read with strong consistency at the top-level
206/// let result_with_effects = some_turbo_tasks_operation_with_effects(args)
207/// .read_strongly_consistent()
208/// .await?;
209///
210/// // apply the effects once outside of a turbo_tasks::function at the top-level (e.g. `run_once`)
211/// result_with_effects.effects.apply().await?;
212/// # Ok(())
213/// # }
214/// ```
215pub async fn take_effects(source: impl CollectiblesSource) -> Result<Effects> {
216 debug_assert_not_in_top_level_task("take_effects");
217 let effects = source
218 .take_collectibles::<Box<dyn EffectCollectible>>()
219 .into_iter()
220 .map(|effect| {
221 if let Some(effect) = ResolvedVc::try_downcast_type::<EffectInstance>(effect) {
222 effect
223 } else {
224 unreachable!("EffectCollectible must only be implemented by EffectInstance");
225 }
226 })
227 .try_join()
228 .await?;
229 Ok(Effects {
230 effects,
231 unique_indices: OnceLock::new(),
232 })
233}
234
235#[derive(thiserror::Error, Debug, TraceRawVcs, NonLocalValue)]
236#[error("Conflicting effects for the same key (key length: {key_len} bytes)")]
237struct ConflictingEffectError {
238 key_len: usize,
239}
240
241/// Cached result of grouping effects by key and dedup/conflict detection.
242/// Each entry is (index into `effects`, per-key state entry).
243/// The `EffectStateEntry` is resolved once and cached to avoid repeated map lookups on subsequent
244/// `apply()` calls.
245type UniqueEffectIndices = Result<Vec<(usize, EffectStateEntry)>, Arc<ConflictingEffectError>>;
246
247/// Captured effects from an operation. This struct can be used to return Effects from a turbo-tasks
248/// function and apply them later.
249#[derive(Default)]
250#[turbo_tasks::value(shared, eq = "manual", serialization = "skip", evict = "last")]
251pub struct Effects {
252 #[turbo_tasks(debug_ignore)]
253 effects: Vec<ReadRef<EffectInstance>>,
254 /// Cached `(index, state_entry)` pairs after grouping by key and dedup/conflict detection.
255 /// Computed once on first `apply()` call; reused on subsequent calls to avoid repeated
256 /// key allocations and map lookups.
257 /// `Err` means a conflict was detected.
258 #[turbo_tasks(debug_ignore, trace_ignore)]
259 unique_indices: OnceLock<UniqueEffectIndices>,
260}
261
262impl PartialEq for Effects {
263 fn eq(&self, other: &Self) -> bool {
264 if self.effects.len() != other.effects.len() {
265 return false;
266 }
267 let effect_ptrs = self
268 .effects
269 .iter()
270 .map(ReadRef::ptr)
271 .collect::<FxHashSet<_>>();
272 other
273 .effects
274 .iter()
275 .all(|e| effect_ptrs.contains(&ReadRef::ptr(e)))
276 }
277}
278
279impl Eq for Effects {}
280
281impl Effects {
282 /// Applies all effects that have been captured.
283 ///
284 /// On first call: groups effects by key, detects duplicates/conflicts, caches deduped indices.
285 /// On subsequent calls: skips grouping (reuses cached indices), only runs per-key state checks.
286 ///
287 /// `apply` must only be used in a "top-level" task (e.g. [`run_once`][crate::run_once]), after
288 /// [`take_effects`] is called from an [operation read with strong
289 /// consistency][crate::OperationVc::read_strongly_consistent].
290 ///
291 /// See [`take_effects`] for example usage.
292 pub async fn apply(&self) -> Result<(), Arc<dyn EffectError>> {
293 debug_assert_in_top_level_task(
294 "Effects::apply must be called from a top-level task to avoid unintended \
295 re-executions due to eventual consistency",
296 );
297 if self.effects.is_empty() {
298 return Ok(());
299 }
300
301 let span = tracing::info_span!("apply effects", count = self.effects.len());
302
303 async {
304 // Compute unique (index, state_entry) pairs once; reuse on later calls.
305 // The EffectStateEntry is resolved from the state map on first call and cached
306 // here, so subsequent apply() calls bypass the map lookup entirely.
307 let unique_indices = self
308 .unique_indices
309 .get_or_init(|| {
310 let mut by_key: FxHashMap<Box<[u8]>, usize> = FxHashMap::default();
311 for (idx, effect) in self.effects.iter().enumerate() {
312 match by_key.entry(effect.inner.key()) {
313 hash_map::Entry::Vacant(entry) => {
314 entry.insert(idx);
315 }
316 hash_map::Entry::Occupied(entry) => {
317 if self.effects[*entry.get()].inner.value_hash()
318 != effect.inner.value_hash()
319 {
320 return Err(Arc::new(ConflictingEffectError {
321 key_len: entry.key().len(),
322 }));
323 }
324 }
325 }
326 }
327
328 let mut indices = Vec::with_capacity(by_key.len());
329 for (key, effect_idx) in by_key {
330 let state_storage = self.effects[effect_idx].inner.state_storage();
331 // Look up or create the per-key state entry and cache the Arc directly.
332 let entry = state_storage
333 .effect_state
334 .lock()
335 .entry(key)
336 .or_insert_with(|| Arc::new(Mutex::new(EffectLastApplied::Unapplied)))
337 .clone();
338 indices.push((effect_idx, entry));
339 }
340 Ok(indices)
341 })
342 .as_ref()
343 .map_err(|err| err.clone() as Arc<_>)?;
344
345 // Apply effects using cached (index, state_entry) pairs.
346 // Hot path: no map lookup — EffectStateEntry is cached in unique_indices.
347 futures::stream::iter(unique_indices.iter())
348 .map(Ok::<_, Arc<dyn EffectError>>)
349 .try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |(idx, entry)| {
350 let effect: &dyn DynEffect = &*self.effects[*idx].inner;
351
352 // If `effect.dyn_apply` panics or the apply future is dropped before
353 // completion, the guard's drop impl resets the per-key state to `Unapplied`
354 // and notifies other waiters via the `Event` it recovers from the previous
355 // `InProgress`, so they retry rather than deadlock or observe a stale
356 // "panic" cache entry.
357 struct EventGuard<'a> {
358 entry: &'a EffectStateEntry,
359 }
360 impl Drop for EventGuard<'_> {
361 fn drop(&mut self) {
362 let prev_state =
363 replace(&mut *self.entry.lock(), EffectLastApplied::Unapplied);
364 let EffectLastApplied::InProgress { write_event } = prev_state else {
365 unreachable!("EventGuard: prev_state must be InProgress");
366 };
367 write_event.notify(usize::MAX);
368 }
369 }
370
371 let begin_in_progress = |mut last_applied_guard: MutexGuard<'_, _>| {
372 *last_applied_guard = EffectLastApplied::InProgress {
373 write_event: Event::new(|| {
374 || "effect application in progress".to_string()
375 }),
376 };
377 EventGuard { entry }
378 };
379
380 let event_guard = loop {
381 let listener;
382 {
383 let last_applied_guard = entry.lock();
384 match &*last_applied_guard {
385 EffectLastApplied::Unapplied => {
386 break begin_in_progress(last_applied_guard);
387 }
388 EffectLastApplied::Applied { value_hash, result } => {
389 // Fast path: check if the stored value already matches
390 if effect.value_hash() == *value_hash {
391 return result.clone();
392 } else {
393 break begin_in_progress(last_applied_guard);
394 }
395 }
396 EffectLastApplied::InProgress { write_event } => {
397 // `Event::listen` registers the listener immediately, so
398 // notifications fired after we drop `last_applied_guard`
399 // cannot be missed.
400 listener = write_event.listen();
401 }
402 }
403 };
404 // We didn't break out of the loop: There's a concurrent in-progress
405 // application. Wait for it to finish and then read `last_applied` again.
406 listener.await;
407 };
408
409 // Apply the effect. InProgress is visible to concurrent readers, preventing
410 // stale fast-path matches.
411 let effect_result = effect.dyn_apply().await;
412
413 // Update the state, clear the EventGuard
414 let prev_state = replace(
415 &mut *entry.lock(),
416 EffectLastApplied::Applied {
417 value_hash: effect.value_hash(),
418 result: effect_result.clone(),
419 },
420 );
421 // don't run the `Drop` impl on `EventGuard`. We'll do the notification
422 // ourselves.
423 forget(event_guard);
424
425 let EffectLastApplied::InProgress { write_event } = prev_state else {
426 unreachable!("Effect applied: prev_state must be InProgress");
427 };
428 write_event.notify(usize::MAX);
429
430 effect_result
431 })
432 .await
433 }
434 .instrument(span)
435 .await
436 }
437}
438
439#[cfg(test)]
440mod tests {
441 use crate::{CollectiblesSource, Effects, take_effects};
442
443 #[test]
444 #[allow(dead_code)]
445 fn is_send() {
446 fn assert_send<T: Send>(_: T) {}
447 fn check_effects_apply() {
448 assert_send(
449 Effects {
450 effects: Vec::new(),
451 unique_indices: Default::default(),
452 }
453 .apply(),
454 );
455 }
456 fn check_take_effects<T: CollectiblesSource + Send + Sync>(t: T) {
457 assert_send(take_effects(t));
458 }
459 }
460}