Skip to main content

turbo_tasks/
effect.rs

1use std::{
2    any::{Any, TypeId},
3    error::Error as StdError,
4    future::Future,
5    mem::replace,
6    panic,
7    pin::Pin,
8    sync::Arc,
9};
10
11use anyhow::Result;
12use auto_hash_map::AutoSet;
13use futures::{StreamExt, TryStreamExt};
14use parking_lot::Mutex;
15use rustc_hash::{FxHashMap, FxHashSet};
16use tokio::task_local;
17use tracing::Instrument;
18
19use crate::{
20    self as turbo_tasks, CollectiblesSource, NonLocalValue, ReadRef, ResolvedVc, TryJoinIterExt,
21    emit,
22    event::{Event, EventListener},
23    spawn,
24    trace::TraceRawVcs,
25};
26
27const APPLY_EFFECTS_CONCURRENCY_LIMIT: usize = 1024;
28
29pub trait Effect: TraceRawVcs + NonLocalValue + Send + Sync + 'static {
30    type Error: EffectError;
31
32    /// A function that is called once at the top level of the program's execution after everything
33    /// has "settled".
34    ///
35    /// This function is executed outside of the turbo-tasks context, and therefore cannot read any
36    /// `Vc`s or call any turbo-task functions. The effect can store [`ResolvedVc`]s (or any other
37    /// `Vc` type), but should not read or resolve their contents.
38    fn apply(&self) -> impl Future<Output = Result<(), Self::Error>> + Send;
39}
40
41/// The error type that an effect can return. We use `dyn std::error::Error` (instead of
42/// [`anyhow::Error`] or [`SharedError`]) to encourage use of structured error types that can
43/// potentially be transformed into `Issue`s.
44///
45/// We can't require that the returned error implements `Issue`:
46/// - `Issue` uses `FileSystemPath`
47/// - `turbo-tasks-fs` returns effect errors that should be transformed into `Issue`s.
48/// - It logically doesn't make sense to define `Issue` in `turbo-tasks-fs`, `Issue` can't be
49///   defined in a base crate either because it would form a circular crate dependency.
50///
51/// So instead, we leave it up to the caller to figure out how to downcast these errors themselves.
52///
53/// [`SharedError`]: crate::util::SharedError
54pub trait EffectError: StdError + TraceRawVcs + NonLocalValue + Send + Sync + 'static {}
55impl<T> EffectError for T where T: StdError + TraceRawVcs + NonLocalValue + Send + Sync + 'static {}
56
57// Private wrapper trait to allow dynamic dispatch of an `Effect`. This is similar to the pattern
58// that the dynosaur crate uses: https://github.com/spastorino/dynosaur
59trait DynEffect: TraceRawVcs + NonLocalValue + Send + Sync + 'static {
60    fn dyn_apply<'a>(&'a self) -> DynEffectApplyFuture<'a>;
61}
62
63impl<T> DynEffect for T
64where
65    T: Effect,
66{
67    fn dyn_apply<'a>(&'a self) -> DynEffectApplyFuture<'a> {
68        Box::pin(async move {
69            self.apply()
70                .await
71                .map_err(|err| Arc::new(err) as Arc<dyn EffectError>)
72        })
73    }
74}
75
76type DynEffectApplyFuture<'a> =
77    Pin<Box<dyn Future<Output = Result<(), Arc<dyn EffectError>>> + Send + 'a>>;
78
79/// A trait to emit a task effect as collectible. This trait only has one implementation,
80/// `EffectInstance` and no other implementation is allowed. The trait is private to this module so
81/// that no other implementation can be added.
82#[turbo_tasks::value_trait]
83trait EffectCollectible {}
84
85#[derive(TraceRawVcs, NonLocalValue)]
86enum EffectState {
87    NotStarted(Box<dyn DynEffect>),
88    /// The `Effect` has already begun execution in another thread. The `DynEffect` is moved here so
89    /// that `TraceRawVcs` works as expected. An alternative is that we could always run
90    /// `TraceRawVcs` before starting execution and just store a `Vec` of `Vc`s here, but
91    /// `TraceRawVcs` is potentially slow.
92    Started(Arc<dyn DynEffect>, Event),
93    Finished(Result<(), Arc<dyn EffectError>>),
94
95    /// Can occur if we paniced while constructing the Started state
96    Invalid,
97}
98
99/// The Effect instance collectible that is emitted for effects.
100#[turbo_tasks::value(serialization = "none", cell = "new", eq = "manual")]
101struct EffectInstance {
102    // Internal mutability: It's important that if `EffectInstance::apply` is called multiple
103    // times, the caller sees the same return value.
104    #[turbo_tasks(debug_ignore)]
105    inner: Mutex<EffectState>,
106}
107
108impl EffectInstance {
109    fn new(effect: impl Effect) -> Self {
110        Self {
111            inner: Mutex::new(EffectState::NotStarted(
112                Box::new(effect) as Box<dyn DynEffect>
113            )),
114        }
115    }
116
117    async fn apply(&self) -> Result<()> {
118        loop {
119            enum State {
120                Started(EventListener),
121                NotStarted(Arc<dyn DynEffect>),
122            }
123            let state = {
124                let mut guard = self.inner.lock();
125                match &*guard {
126                    EffectState::Started(_, event) => {
127                        let listener = event.listen();
128                        State::Started(listener)
129                    }
130                    EffectState::Finished(result) => {
131                        return result.clone().map_err(Into::into);
132                    }
133                    EffectState::NotStarted(_) => {
134                        let EffectState::NotStarted(effect) =
135                            std::mem::replace(&mut *guard, EffectState::Invalid)
136                        else {
137                            unreachable!()
138                        };
139                        let effect: Arc<dyn DynEffect> = Arc::from(effect);
140                        *guard = EffectState::Started(
141                            effect.clone(),
142                            Event::new(|| || "Effect".to_string()),
143                        );
144                        State::NotStarted(effect)
145                    }
146                    EffectState::Invalid => unreachable!(),
147                }
148            };
149            match state {
150                State::Started(listener) => listener.await,
151                State::NotStarted(effect) => {
152                    // This spawn prevents the effect from running within a turbo_tasks context.
153                    // This is important because if we read a `Vc`, we want it to fail (panic). If
154                    // it didn't, we'd assign the dependency to the wrong task.
155                    let join_handle = spawn(ApplyEffectsContext::in_current_scope(async move {
156                        effect.dyn_apply().await
157                    }));
158                    let result = match join_handle.await {
159                        Err(err) => Err(err),
160                        Ok(()) => Ok(()),
161                    };
162                    let event = {
163                        let mut guard = self.inner.lock();
164                        let EffectState::Started(_, event) =
165                            replace(&mut *guard, EffectState::Finished(result.clone()))
166                        else {
167                            unreachable!();
168                        };
169                        event
170                    };
171                    event.notify(usize::MAX);
172                    return result.map_err(Into::into);
173                }
174            }
175        }
176    }
177}
178
179#[turbo_tasks::value_impl]
180impl EffectCollectible for EffectInstance {}
181
182/// Emits an effect to be applied. The effect is executed once `apply_effects` is called.
183///
184/// The effect will only executed once. The effect is executed outside of the current task
185/// and can't read any Vcs. These need to be read before. ReadRefs can be passed into the effect.
186///
187/// Effects are executed in parallel, so they might need to use async locking to avoid problems.
188/// Order of execution of multiple effects is not defined. You must not use multiple conflicting
189/// effects to avoid non-deterministic behavior.
190pub fn emit_effect(effect: impl Effect) {
191    emit::<Box<dyn EffectCollectible>>(ResolvedVc::upcast(
192        EffectInstance::new(effect).resolved_cell(),
193    ));
194}
195
196/// Applies all effects that have been emitted by an operations.
197///
198/// Usually it's important that effects are strongly consistent, so one want to use `apply_effects`
199/// only on operations that have been strongly consistently read before.
200///
201/// The order of execution is not defined and effects are executed in parallel.
202///
203/// `apply_effects` must only be used in a "once" task. When used in a "root" task, a
204/// combination of `get_effects` and `Effects::apply` must be used.
205///
206/// # Example
207///
208/// ```rust
209/// let operation = some_turbo_tasks_function(args);
210/// let result = operation.strongly_consistent().await?;
211/// apply_effects(operation).await?;
212/// ```
213pub async fn apply_effects(source: impl CollectiblesSource) -> Result<()> {
214    let effects: AutoSet<ResolvedVc<Box<dyn EffectCollectible>>> = source.take_collectibles();
215    if effects.is_empty() {
216        return Ok(());
217    }
218    let span = tracing::info_span!("apply effects", count = effects.len());
219    APPLY_EFFECTS_CONTEXT
220        .scope(Default::default(), async move {
221            // Limit the concurrency of effects
222            futures::stream::iter(effects)
223                .map(Ok)
224                .try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |effect| {
225                    let Some(effect) = ResolvedVc::try_downcast_type::<EffectInstance>(effect)
226                    else {
227                        panic!("Effect must only be implemented by EffectInstance");
228                    };
229                    effect.await?.apply().await
230                })
231                .await
232        })
233        .instrument(span)
234        .await
235}
236
237/// Capture effects from an turbo-tasks operation. Since this captures collectibles it might
238/// invalidate the current task when effects are changing or even temporary change.
239///
240/// Therefore it's important to wrap this in a strongly consistent read before applying the effects
241/// with `Effects::apply`.
242///
243/// # Example
244///
245/// ```rust
246/// async fn some_turbo_tasks_function_with_effects(args: Args) -> Result<ResultWithEffects> {
247///     let operation = some_turbo_tasks_function(args);
248///     let result = operation.strongly_consistent().await?;
249///     let effects = get_effects(operation).await?;
250///     Ok(ResultWithEffects { result, effects })
251/// }
252///
253/// let result_with_effects = some_turbo_tasks_function_with_effects(args).strongly_consistent().await?;
254/// result_with_effects.effects.apply().await?;
255/// ```
256pub async fn get_effects(source: impl CollectiblesSource) -> Result<Effects> {
257    let effects: AutoSet<ResolvedVc<Box<dyn EffectCollectible>>> = source.take_collectibles();
258    let effects = effects
259        .into_iter()
260        .map(|effect| async move {
261            if let Some(effect) = ResolvedVc::try_downcast_type::<EffectInstance>(effect) {
262                Ok(effect.await?)
263            } else {
264                panic!("Effect must only be implemented by EffectInstance");
265            }
266        })
267        .try_join()
268        .await?;
269    Ok(Effects { effects })
270}
271
272/// Captured effects from an operation. This struct can be used to return Effects from a turbo-tasks
273/// function and apply them later.
274#[derive(Default)]
275#[turbo_tasks::value(shared, eq = "manual", serialization = "none")]
276pub struct Effects {
277    #[turbo_tasks(debug_ignore)]
278    effects: Vec<ReadRef<EffectInstance>>,
279}
280
281impl PartialEq for Effects {
282    fn eq(&self, other: &Self) -> bool {
283        if self.effects.len() != other.effects.len() {
284            return false;
285        }
286        let effect_ptrs = self
287            .effects
288            .iter()
289            .map(ReadRef::ptr)
290            .collect::<FxHashSet<_>>();
291        other
292            .effects
293            .iter()
294            .all(|e| effect_ptrs.contains(&ReadRef::ptr(e)))
295    }
296}
297
298impl Eq for Effects {}
299
300impl Effects {
301    /// Applies all effects that have been captured by this struct.
302    pub async fn apply(&self) -> Result<()> {
303        let span = tracing::info_span!("apply effects", count = self.effects.len());
304        APPLY_EFFECTS_CONTEXT
305            .scope(Default::default(), async move {
306                // Limit the concurrency of effects
307                futures::stream::iter(self.effects.iter())
308                    .map(Ok)
309                    .try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |effect| {
310                        effect.apply().await
311                    })
312                    .await
313            })
314            .instrument(span)
315            .await
316    }
317}
318
319task_local! {
320    /// The context of the current effects application.
321    static APPLY_EFFECTS_CONTEXT: Arc<Mutex<ApplyEffectsContext>>;
322}
323
324#[derive(Default)]
325pub struct ApplyEffectsContext {
326    data: FxHashMap<TypeId, Box<dyn Any + Send + Sync>>,
327}
328
329impl ApplyEffectsContext {
330    fn in_current_scope<F: Future>(f: F) -> impl Future<Output = F::Output> {
331        let current = Self::current();
332        APPLY_EFFECTS_CONTEXT.scope(current, f)
333    }
334
335    fn current() -> Arc<Mutex<Self>> {
336        APPLY_EFFECTS_CONTEXT
337            .try_with(|mutex| mutex.clone())
338            .expect("No effect context found")
339    }
340
341    fn with_context<T, F: FnOnce(&mut Self) -> T>(f: F) -> T {
342        APPLY_EFFECTS_CONTEXT
343            .try_with(|mutex| f(&mut mutex.lock()))
344            .expect("No effect context found")
345    }
346
347    pub fn set<T: Any + Send + Sync>(value: T) {
348        Self::with_context(|this| {
349            this.data.insert(TypeId::of::<T>(), Box::new(value));
350        })
351    }
352
353    pub fn with<T: Any + Send + Sync, R>(f: impl FnOnce(&mut T) -> R) -> Option<R> {
354        Self::with_context(|this| {
355            this.data
356                .get_mut(&TypeId::of::<T>())
357                .map(|value| {
358                    // Safety: the map is keyed by TypeId
359                    unsafe { value.downcast_unchecked_mut() }
360                })
361                .map(f)
362        })
363    }
364
365    pub fn with_or_insert_with<T: Any + Send + Sync, R>(
366        insert_with: impl FnOnce() -> T,
367        f: impl FnOnce(&mut T) -> R,
368    ) -> R {
369        Self::with_context(|this| {
370            let value = this.data.entry(TypeId::of::<T>()).or_insert_with(|| {
371                let value = insert_with();
372                Box::new(value)
373            });
374            f(
375                // Safety: the map is keyed by TypeId
376                unsafe { value.downcast_unchecked_mut() },
377            )
378        })
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use crate::{CollectiblesSource, apply_effects, get_effects};
385
386    #[test]
387    #[allow(dead_code)]
388    fn is_sync_and_send() {
389        fn assert_sync<T: Sync + Send>(_: T) {}
390        fn check_apply_effects<T: CollectiblesSource + Send + Sync>(t: T) {
391            assert_sync(apply_effects(t));
392        }
393        fn check_get_effects<T: CollectiblesSource + Send + Sync>(t: T) {
394            assert_sync(get_effects(t));
395        }
396    }
397}