1use std::{
2 any::{Any, TypeId},
3 borrow::Cow,
4 future::Future,
5 mem::replace,
6 panic,
7 pin::Pin,
8 sync::Arc,
9};
10
11use anyhow::{Result, anyhow};
12use auto_hash_map::AutoSet;
13use futures::{StreamExt, TryStreamExt};
14use parking_lot::Mutex;
15use rustc_hash::{FxHashMap, FxHashSet};
16use tokio::task_local;
17use tracing::{Instrument, Span};
18
19use crate::{
20 self as turbo_tasks, CollectiblesSource, NonLocalValue, ReadRef, ResolvedVc, TryJoinIterExt,
21 debug::ValueDebugFormat,
22 emit,
23 event::{Event, EventListener},
24 manager::turbo_tasks_future_scope,
25 trace::TraceRawVcs,
26 util::SharedError,
27};
28
29const APPLY_EFFECTS_CONCURRENCY_LIMIT: usize = 1024;
30
31#[turbo_tasks::value_trait]
36trait Effect {}
37
38type EffectFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + Sync + 'static>>;
41
42struct EffectInner {
44 future: EffectFuture,
45}
46
47enum EffectState {
48 NotStarted(EffectInner),
49 Started(Event),
50 Finished(Result<(), SharedError>),
51}
52
53#[turbo_tasks::value(serialization = "none", cell = "new", eq = "manual")]
55struct EffectInstance {
56 #[turbo_tasks(trace_ignore, debug_ignore)]
57 inner: Mutex<EffectState>,
58}
59
60impl EffectInstance {
61 fn new(future: impl Future<Output = Result<()>> + Send + Sync + 'static) -> Self {
62 Self {
63 inner: Mutex::new(EffectState::NotStarted(EffectInner {
64 future: Box::pin(future),
65 })),
66 }
67 }
68
69 async fn apply(&self) -> Result<()> {
70 loop {
71 enum State {
72 Started(EventListener),
73 NotStarted(EffectInner),
74 }
75 let state = {
76 let mut guard = self.inner.lock();
77 match &*guard {
78 EffectState::Started(event) => {
79 let listener = event.listen();
80 State::Started(listener)
81 }
82 EffectState::Finished(result) => {
83 return result.clone().map_err(Into::into);
84 }
85 EffectState::NotStarted(_) => {
86 let EffectState::NotStarted(inner) = std::mem::replace(
87 &mut *guard,
88 EffectState::Started(Event::new(|| "Effect".to_string())),
89 ) else {
90 unreachable!();
91 };
92 State::NotStarted(inner)
93 }
94 }
95 };
96 match state {
97 State::Started(listener) => {
98 listener.await;
99 }
100 State::NotStarted(EffectInner { future }) => {
101 let join_handle = tokio::spawn(ApplyEffectsContext::in_current_scope(
102 turbo_tasks_future_scope(turbo_tasks::turbo_tasks(), future)
103 .instrument(Span::current()),
104 ));
105 let result = match join_handle.await {
106 Ok(Err(err)) => Err(SharedError::new(err)),
107 Err(err) => {
108 let any = err.into_panic();
109 let panic = match any.downcast::<String>() {
110 Ok(owned) => Some(Cow::Owned(*owned)),
111 Err(any) => match any.downcast::<&'static str>() {
112 Ok(str) => Some(Cow::Borrowed(*str)),
113 Err(_) => None,
114 },
115 };
116 Err(SharedError::new(if let Some(panic) = panic {
117 anyhow!("Task effect panicked: {panic}")
118 } else {
119 anyhow!("Task effect panicked")
120 }))
121 }
122 Ok(Ok(())) => Ok(()),
123 };
124 let event = {
125 let mut guard = self.inner.lock();
126 let EffectState::Started(event) =
127 replace(&mut *guard, EffectState::Finished(result.clone()))
128 else {
129 unreachable!();
130 };
131 event
132 };
133 event.notify(usize::MAX);
134 return result.map_err(Into::into);
135 }
136 }
137 }
138 }
139}
140
141#[turbo_tasks::value_impl]
142impl Effect for EffectInstance {}
143
144pub fn effect(future: impl Future<Output = Result<()>> + Send + Sync + 'static) {
153 emit::<Box<dyn Effect>>(ResolvedVc::upcast(
154 EffectInstance::new(future).resolved_cell(),
155 ));
156}
157
158pub async fn apply_effects(source: impl CollectiblesSource) -> Result<()> {
176 let effects: AutoSet<ResolvedVc<Box<dyn Effect>>> = source.take_collectibles();
177 if effects.is_empty() {
178 return Ok(());
179 }
180 let span = tracing::info_span!("apply effects", count = effects.len());
181 APPLY_EFFECTS_CONTEXT
182 .scope(Default::default(), async move {
183 futures::stream::iter(effects)
185 .map(Ok)
186 .try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |effect| {
187 let Some(effect) = ResolvedVc::try_downcast_type::<EffectInstance>(effect)
188 else {
189 panic!("Effect must only be implemented by EffectInstance");
190 };
191 effect.await?.apply().await
192 })
193 .await
194 })
195 .instrument(span)
196 .await
197}
198
199pub async fn get_effects(source: impl CollectiblesSource) -> Result<Effects> {
219 let effects: AutoSet<ResolvedVc<Box<dyn Effect>>> = source.take_collectibles();
220 let effects = effects
221 .into_iter()
222 .map(|effect| async move {
223 if let Some(effect) = ResolvedVc::try_downcast_type::<EffectInstance>(effect) {
224 Ok(effect.await?)
225 } else {
226 panic!("Effect must only be implemented by EffectInstance");
227 }
228 })
229 .try_join()
230 .await?;
231 Ok(Effects { effects })
232}
233
234#[derive(TraceRawVcs, Default, ValueDebugFormat, NonLocalValue)]
237pub struct Effects {
238 #[turbo_tasks(trace_ignore, debug_ignore)]
239 effects: Vec<ReadRef<EffectInstance>>,
240}
241
242impl PartialEq for Effects {
243 fn eq(&self, other: &Self) -> bool {
244 if self.effects.len() != other.effects.len() {
245 return false;
246 }
247 let effect_ptrs = self
248 .effects
249 .iter()
250 .map(ReadRef::ptr)
251 .collect::<FxHashSet<_>>();
252 other
253 .effects
254 .iter()
255 .all(|e| effect_ptrs.contains(&ReadRef::ptr(e)))
256 }
257}
258
259impl Eq for Effects {}
260
261impl Effects {
262 pub async fn apply(&self) -> Result<()> {
264 let span = tracing::info_span!("apply effects", count = self.effects.len());
265 APPLY_EFFECTS_CONTEXT
266 .scope(Default::default(), async move {
267 futures::stream::iter(self.effects.iter())
269 .map(Ok)
270 .try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |effect| {
271 effect.apply().await
272 })
273 .await
274 })
275 .instrument(span)
276 .await
277 }
278}
279
280task_local! {
281 static APPLY_EFFECTS_CONTEXT: Arc<Mutex<ApplyEffectsContext>>;
283}
284
285#[derive(Default)]
286pub struct ApplyEffectsContext {
287 data: FxHashMap<TypeId, Box<dyn Any + Send + Sync>>,
288}
289
290impl ApplyEffectsContext {
291 fn in_current_scope<F: Future>(f: F) -> impl Future<Output = F::Output> {
292 let current = Self::current();
293 APPLY_EFFECTS_CONTEXT.scope(current, f)
294 }
295
296 fn current() -> Arc<Mutex<Self>> {
297 APPLY_EFFECTS_CONTEXT
298 .try_with(|mutex| mutex.clone())
299 .expect("No effect context found")
300 }
301
302 fn with_context<T, F: FnOnce(&mut Self) -> T>(f: F) -> T {
303 APPLY_EFFECTS_CONTEXT
304 .try_with(|mutex| f(&mut mutex.lock()))
305 .expect("No effect context found")
306 }
307
308 pub fn set<T: Any + Send + Sync>(value: T) {
309 Self::with_context(|this| {
310 this.data.insert(TypeId::of::<T>(), Box::new(value));
311 })
312 }
313
314 pub fn with<T: Any + Send + Sync, R>(f: impl FnOnce(&mut T) -> R) -> Option<R> {
315 Self::with_context(|this| {
316 this.data
317 .get_mut(&TypeId::of::<T>())
318 .map(|value| {
319 unsafe { value.downcast_mut_unchecked() }
321 })
322 .map(f)
323 })
324 }
325
326 pub fn with_or_insert_with<T: Any + Send + Sync, R>(
327 insert_with: impl FnOnce() -> T,
328 f: impl FnOnce(&mut T) -> R,
329 ) -> R {
330 Self::with_context(|this| {
331 let value = this.data.entry(TypeId::of::<T>()).or_insert_with(|| {
332 let value = insert_with();
333 Box::new(value)
334 });
335 f(
336 unsafe { value.downcast_mut_unchecked() },
338 )
339 })
340 }
341}
342
343#[cfg(test)]
344mod tests {
345 use crate::{CollectiblesSource, apply_effects, get_effects};
346
347 #[test]
348 #[allow(dead_code)]
349 fn is_sync_and_send() {
350 fn assert_sync<T: Sync + Send>(_: T) {}
351 fn check_apply_effects<T: CollectiblesSource + Send + Sync>(t: T) {
352 assert_sync(apply_effects(t));
353 }
354 fn check_get_effects<T: CollectiblesSource + Send + Sync>(t: T) {
355 assert_sync(get_effects(t));
356 }
357 }
358}