turbo_tasks/
effect.rs

1use std::{
2    any::{Any, TypeId},
3    borrow::Cow,
4    future::Future,
5    mem::replace,
6    panic,
7    pin::Pin,
8    sync::Arc,
9};
10
11use anyhow::{Result, anyhow};
12use auto_hash_map::AutoSet;
13use futures::{StreamExt, TryStreamExt};
14use parking_lot::Mutex;
15use rustc_hash::{FxHashMap, FxHashSet};
16use tokio::task_local;
17use tracing::{Instrument, Span};
18
19use crate::{
20    self as turbo_tasks, CollectiblesSource, NonLocalValue, ReadRef, ResolvedVc, TryJoinIterExt,
21    debug::ValueDebugFormat,
22    emit,
23    event::{Event, EventListener},
24    manager::turbo_tasks_future_scope,
25    trace::TraceRawVcs,
26    util::SharedError,
27};
28
29const APPLY_EFFECTS_CONCURRENCY_LIMIT: usize = 1024;
30
31/// A trait to emit a task effect as collectible. This trait only has one
32/// implementation, `EffectInstance` and no other implementation is allowed.
33/// The trait is private to this module so that no other implementation can be
34/// added.
35#[turbo_tasks::value_trait]
36trait Effect {}
37
38/// A future that represents the effect of a task. The future is executed when
39/// the effect is applied.
40type EffectFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + Sync + 'static>>;
41
42/// The inner state of an effect instance if it has not been applied yet.
43struct EffectInner {
44    future: EffectFuture,
45}
46
47enum EffectState {
48    NotStarted(EffectInner),
49    Started(Event),
50    Finished(Result<(), SharedError>),
51}
52
53/// The Effect instance collectible that is emitted for effects.
54#[turbo_tasks::value(serialization = "none", cell = "new", eq = "manual")]
55struct EffectInstance {
56    #[turbo_tasks(trace_ignore, debug_ignore)]
57    inner: Mutex<EffectState>,
58}
59
60impl EffectInstance {
61    fn new(future: impl Future<Output = Result<()>> + Send + Sync + 'static) -> Self {
62        Self {
63            inner: Mutex::new(EffectState::NotStarted(EffectInner {
64                future: Box::pin(future),
65            })),
66        }
67    }
68
69    async fn apply(&self) -> Result<()> {
70        loop {
71            enum State {
72                Started(EventListener),
73                NotStarted(EffectInner),
74            }
75            let state = {
76                let mut guard = self.inner.lock();
77                match &*guard {
78                    EffectState::Started(event) => {
79                        let listener = event.listen();
80                        State::Started(listener)
81                    }
82                    EffectState::Finished(result) => {
83                        return result.clone().map_err(Into::into);
84                    }
85                    EffectState::NotStarted(_) => {
86                        let EffectState::NotStarted(inner) = std::mem::replace(
87                            &mut *guard,
88                            EffectState::Started(Event::new(|| "Effect".to_string())),
89                        ) else {
90                            unreachable!();
91                        };
92                        State::NotStarted(inner)
93                    }
94                }
95            };
96            match state {
97                State::Started(listener) => {
98                    listener.await;
99                }
100                State::NotStarted(EffectInner { future }) => {
101                    let join_handle = tokio::spawn(ApplyEffectsContext::in_current_scope(
102                        turbo_tasks_future_scope(turbo_tasks::turbo_tasks(), future)
103                            .instrument(Span::current()),
104                    ));
105                    let result = match join_handle.await {
106                        Ok(Err(err)) => Err(SharedError::new(err)),
107                        Err(err) => {
108                            let any = err.into_panic();
109                            let panic = match any.downcast::<String>() {
110                                Ok(owned) => Some(Cow::Owned(*owned)),
111                                Err(any) => match any.downcast::<&'static str>() {
112                                    Ok(str) => Some(Cow::Borrowed(*str)),
113                                    Err(_) => None,
114                                },
115                            };
116                            Err(SharedError::new(if let Some(panic) = panic {
117                                anyhow!("Task effect panicked: {panic}")
118                            } else {
119                                anyhow!("Task effect panicked")
120                            }))
121                        }
122                        Ok(Ok(())) => Ok(()),
123                    };
124                    let event = {
125                        let mut guard = self.inner.lock();
126                        let EffectState::Started(event) =
127                            replace(&mut *guard, EffectState::Finished(result.clone()))
128                        else {
129                            unreachable!();
130                        };
131                        event
132                    };
133                    event.notify(usize::MAX);
134                    return result.map_err(Into::into);
135                }
136            }
137        }
138    }
139}
140
141#[turbo_tasks::value_impl]
142impl Effect for EffectInstance {}
143
144/// Schedules an effect to be applied. The passed future is executed once `apply_effects` is called.
145///
146/// The effect will only executed once. The passed future is executed outside of the current task
147/// and can't read any Vcs. These need to be read before. ReadRefs can be passed into the future.
148///
149/// Effects are executed in parallel, so they might need to use async locking to avoid problems.
150/// Order of execution of multiple effects is not defined. You must not use mutliple conflicting
151/// effects to avoid non-deterministic behavior.
152pub fn effect(future: impl Future<Output = Result<()>> + Send + Sync + 'static) {
153    emit::<Box<dyn Effect>>(ResolvedVc::upcast(
154        EffectInstance::new(future).resolved_cell(),
155    ));
156}
157
158/// Applies all effects that have been emitted by an operations.
159///
160/// Usually it's important that effects are strongly consistent, so one want to use `apply_effects`
161/// only on operations that have been strongly consistently read before.
162///
163/// The order of execution is not defined and effects are executed in parallel.
164///
165/// `apply_effects` must only be used in a "once" task. When used in a "root" task, a
166/// combination of `get_effects` and `Effects::apply` must be used.
167///
168/// # Example
169///
170/// ```rust
171/// let operation = some_turbo_tasks_function(args);
172/// let result = operation.strongly_consistent().await?;
173/// apply_effects(operation).await?;
174/// ```
175pub async fn apply_effects(source: impl CollectiblesSource) -> Result<()> {
176    let effects: AutoSet<ResolvedVc<Box<dyn Effect>>> = source.take_collectibles();
177    if effects.is_empty() {
178        return Ok(());
179    }
180    let span = tracing::info_span!("apply effects", count = effects.len());
181    APPLY_EFFECTS_CONTEXT
182        .scope(Default::default(), async move {
183            // Limit the concurrency of effects
184            futures::stream::iter(effects)
185                .map(Ok)
186                .try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |effect| {
187                    let Some(effect) = ResolvedVc::try_downcast_type::<EffectInstance>(effect)
188                    else {
189                        panic!("Effect must only be implemented by EffectInstance");
190                    };
191                    effect.await?.apply().await
192                })
193                .await
194        })
195        .instrument(span)
196        .await
197}
198
199/// Capture effects from an turbo-tasks operation. Since this captures collectibles it might
200/// invalidate the current task when effects are changing or even temporary change.
201///
202/// Therefore it's important to wrap this in a strongly consistent read before applying the effects
203/// with `Effects::apply`.
204///
205/// # Example
206///
207/// ```rust
208/// async fn some_turbo_tasks_function_with_effects(args: Args) -> Result<ResultWithEffects> {
209///     let operation = some_turbo_tasks_function(args);
210///     let result = operation.strongly_consistent().await?;
211///     let effects = get_effects(operation).await?;
212///     Ok(ResultWithEffects { result, effects })
213/// }
214///
215/// let result_with_effects = some_turbo_tasks_function_with_effects(args).strongly_consistent().await?;
216/// result_with_effects.effects.apply().await?;
217/// ```
218pub async fn get_effects(source: impl CollectiblesSource) -> Result<Effects> {
219    let effects: AutoSet<ResolvedVc<Box<dyn Effect>>> = source.take_collectibles();
220    let effects = effects
221        .into_iter()
222        .map(|effect| async move {
223            if let Some(effect) = ResolvedVc::try_downcast_type::<EffectInstance>(effect) {
224                Ok(effect.await?)
225            } else {
226                panic!("Effect must only be implemented by EffectInstance");
227            }
228        })
229        .try_join()
230        .await?;
231    Ok(Effects { effects })
232}
233
234/// Captured effects from an operation. This struct can be used to return Effects from a turbo-tasks
235/// function and apply them later.
236#[derive(TraceRawVcs, Default, ValueDebugFormat, NonLocalValue)]
237pub struct Effects {
238    #[turbo_tasks(trace_ignore, debug_ignore)]
239    effects: Vec<ReadRef<EffectInstance>>,
240}
241
242impl PartialEq for Effects {
243    fn eq(&self, other: &Self) -> bool {
244        if self.effects.len() != other.effects.len() {
245            return false;
246        }
247        let effect_ptrs = self
248            .effects
249            .iter()
250            .map(ReadRef::ptr)
251            .collect::<FxHashSet<_>>();
252        other
253            .effects
254            .iter()
255            .all(|e| effect_ptrs.contains(&ReadRef::ptr(e)))
256    }
257}
258
259impl Eq for Effects {}
260
261impl Effects {
262    /// Applies all effects that have been captured by this struct.
263    pub async fn apply(&self) -> Result<()> {
264        let span = tracing::info_span!("apply effects", count = self.effects.len());
265        APPLY_EFFECTS_CONTEXT
266            .scope(Default::default(), async move {
267                // Limit the concurrency of effects
268                futures::stream::iter(self.effects.iter())
269                    .map(Ok)
270                    .try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |effect| {
271                        effect.apply().await
272                    })
273                    .await
274            })
275            .instrument(span)
276            .await
277    }
278}
279
280task_local! {
281    /// The context of the current effects application.
282    static APPLY_EFFECTS_CONTEXT: Arc<Mutex<ApplyEffectsContext>>;
283}
284
285#[derive(Default)]
286pub struct ApplyEffectsContext {
287    data: FxHashMap<TypeId, Box<dyn Any + Send + Sync>>,
288}
289
290impl ApplyEffectsContext {
291    fn in_current_scope<F: Future>(f: F) -> impl Future<Output = F::Output> {
292        let current = Self::current();
293        APPLY_EFFECTS_CONTEXT.scope(current, f)
294    }
295
296    fn current() -> Arc<Mutex<Self>> {
297        APPLY_EFFECTS_CONTEXT
298            .try_with(|mutex| mutex.clone())
299            .expect("No effect context found")
300    }
301
302    fn with_context<T, F: FnOnce(&mut Self) -> T>(f: F) -> T {
303        APPLY_EFFECTS_CONTEXT
304            .try_with(|mutex| f(&mut mutex.lock()))
305            .expect("No effect context found")
306    }
307
308    pub fn set<T: Any + Send + Sync>(value: T) {
309        Self::with_context(|this| {
310            this.data.insert(TypeId::of::<T>(), Box::new(value));
311        })
312    }
313
314    pub fn with<T: Any + Send + Sync, R>(f: impl FnOnce(&mut T) -> R) -> Option<R> {
315        Self::with_context(|this| {
316            this.data
317                .get_mut(&TypeId::of::<T>())
318                .map(|value| {
319                    // Safety: the map is keyed by TypeId
320                    unsafe { value.downcast_mut_unchecked() }
321                })
322                .map(f)
323        })
324    }
325
326    pub fn with_or_insert_with<T: Any + Send + Sync, R>(
327        insert_with: impl FnOnce() -> T,
328        f: impl FnOnce(&mut T) -> R,
329    ) -> R {
330        Self::with_context(|this| {
331            let value = this.data.entry(TypeId::of::<T>()).or_insert_with(|| {
332                let value = insert_with();
333                Box::new(value)
334            });
335            f(
336                // Safety: the map is keyed by TypeId
337                unsafe { value.downcast_mut_unchecked() },
338            )
339        })
340    }
341}
342
343#[cfg(test)]
344mod tests {
345    use crate::{CollectiblesSource, apply_effects, get_effects};
346
347    #[test]
348    #[allow(dead_code)]
349    fn is_sync_and_send() {
350        fn assert_sync<T: Sync + Send>(_: T) {}
351        fn check_apply_effects<T: CollectiblesSource + Send + Sync>(t: T) {
352            assert_sync(apply_effects(t));
353        }
354        fn check_get_effects<T: CollectiblesSource + Send + Sync>(t: T) {
355            assert_sync(get_effects(t));
356        }
357    }
358}