turbo_tasks_backend/backend/
storage.rs

1use std::{
2    hash::Hash,
3    ops::{Deref, DerefMut},
4    sync::{Arc, atomic::AtomicBool},
5    thread::available_parallelism,
6};
7
8use bitfield::bitfield;
9use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
10use smallvec::SmallVec;
11use turbo_tasks::{FxDashMap, TaskId};
12
13use crate::{
14    backend::dynamic_storage::DynamicStorage,
15    data::{
16        AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
17        CachedDataItemValue, CachedDataItemValueRef, CachedDataItemValueRefMut, OutputValue,
18    },
19    data_storage::{AutoMapStorage, OptionStorage},
20    utils::dash_map_multi::{RefMut, get_multiple_mut},
21};
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24pub enum TaskDataCategory {
25    Meta,
26    Data,
27    All,
28}
29
30impl TaskDataCategory {
31    pub fn into_specific(self) -> SpecificTaskDataCategory {
32        match self {
33            TaskDataCategory::Meta => SpecificTaskDataCategory::Meta,
34            TaskDataCategory::Data => SpecificTaskDataCategory::Data,
35            TaskDataCategory::All => unreachable!(),
36        }
37    }
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
41pub enum SpecificTaskDataCategory {
42    Meta,
43    Data,
44}
45
46impl IntoIterator for TaskDataCategory {
47    type Item = TaskDataCategory;
48
49    type IntoIter = TaskDataCategoryIterator;
50
51    fn into_iter(self) -> Self::IntoIter {
52        match self {
53            TaskDataCategory::Meta => TaskDataCategoryIterator::Meta,
54            TaskDataCategory::Data => TaskDataCategoryIterator::Data,
55            TaskDataCategory::All => TaskDataCategoryIterator::All,
56        }
57    }
58}
59
60pub enum TaskDataCategoryIterator {
61    All,
62    Meta,
63    Data,
64    None,
65}
66
67impl Iterator for TaskDataCategoryIterator {
68    type Item = TaskDataCategory;
69
70    fn next(&mut self) -> Option<Self::Item> {
71        match self {
72            TaskDataCategoryIterator::All => {
73                *self = TaskDataCategoryIterator::Data;
74                Some(TaskDataCategory::Meta)
75            }
76            TaskDataCategoryIterator::Meta => {
77                *self = TaskDataCategoryIterator::None;
78                Some(TaskDataCategory::Meta)
79            }
80            TaskDataCategoryIterator::Data => {
81                *self = TaskDataCategoryIterator::None;
82                Some(TaskDataCategory::Data)
83            }
84            TaskDataCategoryIterator::None => None,
85        }
86    }
87}
88
89bitfield! {
90    // Note: Due to alignment in InnerStorage it doesn't matter if this struct is 1 or 4 bytes.
91    #[derive(Clone, Default)]
92    pub struct InnerStorageState(u32);
93    impl Debug;
94    pub meta_restored, set_meta_restored: 0;
95    pub data_restored, set_data_restored: 1;
96    /// Item was modified before snapshot mode was entered.
97    pub meta_modified, set_meta_modified: 2;
98    pub data_modified, set_data_modified: 3;
99    /// Item was modified after snapshot mode was entered. A snapshot was taken.
100    pub meta_snapshot, set_meta_snapshot: 4;
101    pub data_snapshot, set_data_snapshot: 5;
102}
103
104impl InnerStorageState {
105    pub fn set_restored(&mut self, category: TaskDataCategory) {
106        match category {
107            TaskDataCategory::Meta => {
108                self.set_meta_restored(true);
109            }
110            TaskDataCategory::Data => {
111                self.set_data_restored(true);
112            }
113            TaskDataCategory::All => {
114                self.set_meta_restored(true);
115                self.set_data_restored(true);
116            }
117        }
118    }
119
120    pub fn is_restored(&self, category: TaskDataCategory) -> bool {
121        match category {
122            TaskDataCategory::Meta => self.meta_restored(),
123            TaskDataCategory::Data => self.data_restored(),
124            TaskDataCategory::All => self.meta_restored() && self.data_restored(),
125        }
126    }
127
128    pub fn any_snapshot(&self) -> bool {
129        self.meta_snapshot() || self.data_snapshot()
130    }
131
132    pub fn any_modified(&self) -> bool {
133        self.meta_modified() || self.data_modified()
134    }
135}
136
137pub struct InnerStorageSnapshot {
138    aggregation_number: OptionStorage<AggregationNumber>,
139    output_dependent: AutoMapStorage<TaskId, ()>,
140    output: OptionStorage<OutputValue>,
141    upper: AutoMapStorage<TaskId, u32>,
142    dynamic: DynamicStorage,
143    pub meta_modified: bool,
144    pub data_modified: bool,
145}
146
147impl From<&InnerStorage> for InnerStorageSnapshot {
148    fn from(inner: &InnerStorage) -> Self {
149        Self {
150            aggregation_number: inner.aggregation_number.clone(),
151            output_dependent: inner.output_dependent.clone(),
152            output: inner.output.clone(),
153            upper: inner.upper.clone(),
154            dynamic: inner.dynamic.snapshot_for_persisting(),
155            meta_modified: inner.state.meta_modified(),
156            data_modified: inner.state.data_modified(),
157        }
158    }
159}
160
161impl InnerStorageSnapshot {
162    pub fn iter_all(
163        &self,
164    ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)> {
165        use crate::data_storage::Storage;
166        self.dynamic
167            .iter_all()
168            .chain(self.aggregation_number.iter().map(|(_, value)| {
169                (
170                    CachedDataItemKey::AggregationNumber {},
171                    CachedDataItemValueRef::AggregationNumber { value },
172                )
173            }))
174            .chain(self.output.iter().map(|(_, value)| {
175                (
176                    CachedDataItemKey::Output {},
177                    CachedDataItemValueRef::Output { value },
178                )
179            }))
180            .chain(self.upper.iter().map(|(k, value)| {
181                (
182                    CachedDataItemKey::Upper { task: *k },
183                    CachedDataItemValueRef::Upper { value },
184                )
185            }))
186            .chain(self.output_dependent.iter().map(|(k, value)| {
187                (
188                    CachedDataItemKey::OutputDependent { task: *k },
189                    CachedDataItemValueRef::OutputDependent { value },
190                )
191            }))
192    }
193
194    pub fn len(&self) -> usize {
195        use crate::data_storage::Storage;
196        self.dynamic.len()
197            + self.aggregation_number.len()
198            + self.output.len()
199            + self.upper.len()
200            + self.output_dependent.len()
201    }
202}
203
204#[derive(Debug, Clone)]
205pub struct InnerStorage {
206    aggregation_number: OptionStorage<AggregationNumber>,
207    output_dependent: AutoMapStorage<TaskId, ()>,
208    output: OptionStorage<OutputValue>,
209    upper: AutoMapStorage<TaskId, u32>,
210    dynamic: DynamicStorage,
211    state: InnerStorageState,
212}
213
214impl InnerStorage {
215    fn new() -> Self {
216        Self {
217            aggregation_number: Default::default(),
218            output_dependent: Default::default(),
219            output: Default::default(),
220            upper: Default::default(),
221            dynamic: DynamicStorage::new(),
222            state: InnerStorageState::default(),
223        }
224    }
225
226    pub fn state(&self) -> &InnerStorageState {
227        &self.state
228    }
229
230    pub fn state_mut(&mut self) -> &mut InnerStorageState {
231        &mut self.state
232    }
233}
234
235#[macro_export]
236macro_rules! generate_inner_storage_internal {
237    // Matching on CachedDataItem with a $value
238    (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $key_field:ident => $field:ident,) => {
239        if let CachedDataItem::$tag { $key_field, $value } = $item {
240            let result = $self.$field.$fn($key_field, $($args)*);
241            return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $key_field => $field);
242        }
243    };
244    (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident => $field:ident,) => {
245        if let CachedDataItem::$tag { $value } = $item {
246            let result = $self.$field.$fn((), $($args)*);
247            return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag => $field);
248        }
249    };
250    (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
251        $crate::generate_inner_storage_internal!(CachedDataItem: $self, $item, $value, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
252        $crate::generate_inner_storage_internal!(CachedDataItem: $self, $item, $value, $return_ty, $fn($($args)*): $($config)+)
253    };
254    // Matching on CachedDataItemKey without a $value
255    (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $key_field:ident => $field:ident,) => {
256        if let CachedDataItemKey::$tag { $key_field } = $item {
257            let result = $self.$field.$fn($key_field, $($args)*);
258            return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $key_field => $field);
259        }
260    };
261    (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident => $field:ident,) => {
262        if let CachedDataItemKey::$tag { } = $item {
263            let result = $self.$field.$fn(&(), $($args)*);
264            return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag => $field);
265        }
266    };
267    (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
268        $crate::generate_inner_storage_internal!(CachedDataItemKey: $self, $item, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
269        $crate::generate_inner_storage_internal!(CachedDataItemKey: $self, $item, $return_ty, $fn($($args)*): $($config)+)
270    };
271    // Matching on CachedDataItemType without a $value
272    (CachedDataItemType: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident,) => {
273        if let CachedDataItemType::$tag = $item {
274            let result = $self.$field.$fn($($args)*);
275            return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $($key_field)? => $field);
276        }
277    };
278    (CachedDataItemType: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
279        $crate::generate_inner_storage_internal!(CachedDataItemType: $self, $item, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
280        $crate::generate_inner_storage_internal!(CachedDataItemType: $self, $item, $return_ty, $fn($($args)*): $($config)+)
281    };
282
283    // fn update
284    (update: $self:ident, $key:ident, $update:ident: $tag:ident $key_field:ident => $field:ident,) => {
285        if let CachedDataItemKey::$tag { $key_field } = $key {
286            $self.$field.update($key_field, |old| {
287                let old = old.map(|old| CachedDataItemValue::$tag { value: old });
288                let new = $update(old);
289                new.map(|new| if let CachedDataItemValue::$tag { value } = new {
290                    value
291                } else {
292                    unreachable!()
293                })
294            });
295            return;
296        }
297    };
298    (update: $self:ident, $key:ident, $update:ident: $tag:ident => $field:ident,) => {
299        if let CachedDataItemKey::$tag { } = $key {
300            $self.$field.update((), |old| {
301                let old = old.map(|old| CachedDataItemValue::$tag { value: old });
302                let new = $update(old);
303                new.map(|new| if let CachedDataItemValue::$tag { value } = new {
304                    value
305                } else {
306                    unreachable!()
307                })
308            });
309            return;
310        }
311    };
312    (update: $self:ident, $key:ident, $update:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
313        $crate::generate_inner_storage_internal!(update: $self, $key, $update: $tag $($key_field)? => $field,);
314        $crate::generate_inner_storage_internal!(update: $self, $key, $update: $($config)+)
315    };
316
317    // fn get_mut_or_insert_with
318    (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident $key_field:ident => $field:ident,) => {
319        if let CachedDataItemKey::$tag { $key_field } = $key {
320            let value = $self.$field.get_mut_or_insert_with($key_field, || {
321                let value = $insert_with();
322                if let CachedDataItemValue::$tag { value } = value {
323                    value
324                } else {
325                    unreachable!()
326                }
327            });
328            return CachedDataItemValueRefMut::$tag { value };
329        }
330    };
331    (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident => $field:ident,) => {
332        if let CachedDataItemKey::$tag { } = $key {
333            let value = $self.$field.get_mut_or_insert_with((), || {
334                let value = $insert_with();
335                if let CachedDataItemValue::$tag { value } = value {
336                    value
337                } else {
338                    unreachable!()
339                }
340            });
341            return CachedDataItemValueRefMut::$tag { value };
342        }
343    };
344    (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
345        $crate::generate_inner_storage_internal!(get_mut_or_insert_with: $self, $key, $insert_with: $tag $($key_field)? => $field,);
346        $crate::generate_inner_storage_internal!(get_mut_or_insert_with: $self, $key, $insert_with: $($config)+)
347    };
348
349    // fn extract_if
350    (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident $key_field:ident => $field:ident,) => {
351        if let CachedDataItemType::$tag = $ty {
352            let iter = $self.$field.extract_if(move |key, value| {
353                $f(CachedDataItemKey::$tag { $key_field: *key }, CachedDataItemValueRef::$tag { value })
354            }).map(|($key_field, value)| CachedDataItem::$tag { $key_field, value });
355            return InnerStorageIter::$tag(iter);
356        }
357    };
358    (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident => $field:ident,) => {
359        if let CachedDataItemType::$tag = $ty {
360            let iter = $self.$field.extract_if(move |_, value| {
361                $f(CachedDataItemKey::$tag { }, CachedDataItemValueRef::$tag { value })
362            }).map(|(_, value)| CachedDataItem::$tag { value });
363            return InnerStorageIter::$tag(iter);
364        }
365    };
366    (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
367        $crate::generate_inner_storage_internal!(extract_if: $self, $ty, $f: $tag $($key_field)? => $field,);
368        $crate::generate_inner_storage_internal!(extract_if: $self, $ty, $f: $($config)+)
369    };
370
371    // fn iter
372    (iter: $self:ident, $ty:ident: $tag:ident $key_field:ident => $field:ident,) => {
373        if let CachedDataItemType::$tag = $ty {
374            let iter = $self.$field.iter().map(|($key_field, value)| (CachedDataItemKey::$tag { $key_field: *$key_field }, CachedDataItemValueRef::$tag { value }));
375            return InnerStorageIter::$tag(iter);
376        }
377    };
378    (iter: $self:ident, $ty:ident: $tag:ident => $field:ident,) => {
379        if let CachedDataItemType::$tag = $ty {
380            let iter = $self.$field.iter().map(|(_, value)| (CachedDataItemKey::$tag { }, CachedDataItemValueRef::$tag { value }));
381            return InnerStorageIter::$tag(iter);
382        }
383    };
384    (iter: $self:ident, $ty:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
385        $crate::generate_inner_storage_internal!(iter: $self, $ty: $tag $($key_field)? => $field,);
386        $crate::generate_inner_storage_internal!(iter: $self, $ty: $($config)+)
387    };
388
389
390    // Return value handling
391    (return_value: $result:ident, none: $($more:tt)*) => {
392        $result
393    };
394    (return_value: $result:ident, option_value: $tag:ident $($more:tt)*) => {
395        $result.map(|value| CachedDataItemValue::$tag { value })
396    };
397    (return_value: $result:ident, option_ref: $tag:ident $($more:tt)*) => {
398        $result.map(|value| CachedDataItemValueRef::$tag { value })
399    };
400    (return_value: $result:ident, option_ref_mut: $tag:ident $($more:tt)*) => {
401        $result.map(|value| CachedDataItemValueRefMut::$tag { value })
402    };
403
404    // Input value handling
405    (input_value: $input:ident, option_value: $tag:ident $($more:tt)*) => {
406        $input.map(|value| {
407            if let CachedDataItemValue::$tag { value } = value {
408                value
409            } else {
410                unreachable!()
411            }
412        })
413    };
414
415}
416
417macro_rules! generate_inner_storage {
418    ($($config:tt)*) => {
419        impl InnerStorage {
420            pub fn add(&mut self, item: CachedDataItem) -> bool {
421                use crate::data_storage::Storage;
422                $crate::generate_inner_storage_internal!(CachedDataItem: self, item, value, none, add(value): $($config)*);
423                self.dynamic.add(item)
424            }
425
426            pub fn insert(&mut self, item: CachedDataItem) -> Option<CachedDataItemValue> {
427                use crate::data_storage::Storage;
428                $crate::generate_inner_storage_internal!(CachedDataItem: self, item, value, option_value, insert(value): $($config)*);
429                self.dynamic.insert(item)
430            }
431
432            pub fn remove(&mut self, key: &CachedDataItemKey) -> Option<CachedDataItemValue> {
433                use crate::data_storage::Storage;
434                $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_value, remove(): $($config)*);
435                self.dynamic.remove(key)
436            }
437
438            pub fn count(&self, ty: CachedDataItemType) -> usize {
439                use crate::data_storage::Storage;
440                $crate::generate_inner_storage_internal!(CachedDataItemType: self, ty, none, len(): $($config)*);
441                self.dynamic.count(ty)
442            }
443
444            pub fn get(&self, key: &CachedDataItemKey) -> Option<CachedDataItemValueRef<'_>> {
445                use crate::data_storage::Storage;
446                $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_ref, get(): $($config)*);
447                self.dynamic.get(key)
448            }
449
450            pub fn contains_key(&self, key: &CachedDataItemKey) -> bool {
451                use crate::data_storage::Storage;
452                $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, none, contains_key(): $($config)*);
453                self.dynamic.contains_key(key)
454            }
455
456            pub fn get_mut(&mut self, key: &CachedDataItemKey) -> Option<CachedDataItemValueRefMut<'_>> {
457                use crate::data_storage::Storage;
458                $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_ref_mut, get_mut(): $($config)*);
459                self.dynamic.get_mut(key)
460            }
461
462            pub fn shrink_to_fit(&mut self, ty: CachedDataItemType) {
463                use crate::data_storage::Storage;
464                $crate::generate_inner_storage_internal!(CachedDataItemType: self, ty, none, shrink_to_fit(): $($config)*);
465                self.dynamic.shrink_to_fit(ty)
466            }
467
468            pub fn update(
469                &mut self,
470                key: CachedDataItemKey,
471                update: impl FnOnce(Option<CachedDataItemValue>) -> Option<CachedDataItemValue>,
472            ) {
473                use crate::data_storage::Storage;
474                $crate::generate_inner_storage_internal!(update: self, key, update: $($config)*);
475                self.dynamic.update(key, update)
476            }
477
478            pub fn extract_if<'l, F>(
479                &'l mut self,
480                ty: CachedDataItemType,
481                mut f: F,
482            ) -> impl Iterator<Item = CachedDataItem> + use<'l, F>
483            where
484                F: for<'a> FnMut(CachedDataItemKey, CachedDataItemValueRef<'a>) -> bool + 'l,
485            {
486                use crate::data_storage::Storage;
487                $crate::generate_inner_storage_internal!(extract_if: self, ty, f: $($config)*);
488                InnerStorageIter::Dynamic(self.dynamic.extract_if(ty, f))
489            }
490
491            pub fn get_mut_or_insert_with(
492                &mut self,
493                key: CachedDataItemKey,
494                f: impl FnOnce() -> CachedDataItemValue,
495            ) -> CachedDataItemValueRefMut<'_>
496            {
497                use crate::data_storage::Storage;
498                $crate::generate_inner_storage_internal!(get_mut_or_insert_with: self, key, f: $($config)*);
499                self.dynamic.get_mut_or_insert_with(key, f)
500            }
501
502            pub fn iter(
503                &self,
504                ty: CachedDataItemType,
505            ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)>
506            {
507                use crate::data_storage::Storage;
508                $crate::generate_inner_storage_internal!(iter: self, ty: $($config)*);
509                InnerStorageIter::Dynamic(self.dynamic.iter(ty))
510            }
511
512        }
513    };
514}
515
516generate_inner_storage!(
517    AggregationNumber => aggregation_number,
518    OutputDependent task => output_dependent,
519    Output => output,
520    Upper task => upper,
521);
522
523enum InnerStorageIter<A, B, C, D, E> {
524    AggregationNumber(A),
525    OutputDependent(B),
526    Output(C),
527    Upper(D),
528    Dynamic(E),
529}
530
531impl<T, A, B, C, D, E> Iterator for InnerStorageIter<A, B, C, D, E>
532where
533    A: Iterator<Item = T>,
534    B: Iterator<Item = T>,
535    C: Iterator<Item = T>,
536    D: Iterator<Item = T>,
537    E: Iterator<Item = T>,
538{
539    type Item = T;
540
541    fn next(&mut self) -> Option<Self::Item> {
542        match self {
543            InnerStorageIter::AggregationNumber(iter) => iter.next(),
544            InnerStorageIter::OutputDependent(iter) => iter.next(),
545            InnerStorageIter::Output(iter) => iter.next(),
546            InnerStorageIter::Upper(iter) => iter.next(),
547            InnerStorageIter::Dynamic(iter) => iter.next(),
548        }
549    }
550}
551
552impl InnerStorage {
553    pub fn iter_all(
554        &self,
555    ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)> {
556        use crate::data_storage::Storage;
557        self.dynamic
558            .iter_all()
559            .chain(self.aggregation_number.iter().map(|(_, value)| {
560                (
561                    CachedDataItemKey::AggregationNumber {},
562                    CachedDataItemValueRef::AggregationNumber { value },
563                )
564            }))
565            .chain(self.output.iter().map(|(_, value)| {
566                (
567                    CachedDataItemKey::Output {},
568                    CachedDataItemValueRef::Output { value },
569                )
570            }))
571            .chain(self.upper.iter().map(|(k, value)| {
572                (
573                    CachedDataItemKey::Upper { task: *k },
574                    CachedDataItemValueRef::Upper { value },
575                )
576            }))
577            .chain(self.output_dependent.iter().map(|(k, value)| {
578                (
579                    CachedDataItemKey::OutputDependent { task: *k },
580                    CachedDataItemValueRef::OutputDependent { value },
581                )
582            }))
583    }
584
585    pub fn len(&self) -> usize {
586        use crate::data_storage::Storage;
587        self.dynamic.len()
588            + self.aggregation_number.len()
589            + self.output.len()
590            + self.upper.len()
591            + self.output_dependent.len()
592    }
593}
594
595enum ModifiedState {
596    /// It was modified before snapshot mode was entered, but it was not accessed during snapshot
597    /// mode.
598    Modified,
599    /// Snapshot(Some):
600    /// It was modified before snapshot mode was entered and it was accessed again during snapshot
601    /// mode. A copy of the version of the item when snapshot mode was entered is stored here.
602    /// Snapshot(None):
603    /// It was not modified before snapshot mode was entered, but it was accessed during snapshot
604    /// mode. Or the snapshot was already taken out by the snapshot operation.
605    Snapshot(Option<Box<InnerStorageSnapshot>>),
606}
607
608pub struct Storage {
609    snapshot_mode: AtomicBool,
610    modified: FxDashMap<TaskId, ModifiedState>,
611    map: FxDashMap<TaskId, Box<InnerStorage>>,
612}
613
614impl Storage {
615    pub fn new(small_preallocation: bool) -> Self {
616        let map_capacity: usize = if small_preallocation {
617            1024
618        } else {
619            1024 * 1024
620        };
621        let modified_capacity: usize = if small_preallocation { 0 } else { 1024 };
622        let shard_factor: usize = if small_preallocation { 4 } else { 64 };
623
624        let shard_amount =
625            (available_parallelism().map_or(4, |v| v.get()) * shard_factor).next_power_of_two();
626
627        Self {
628            snapshot_mode: AtomicBool::new(false),
629            modified: FxDashMap::with_capacity_and_hasher_and_shard_amount(
630                modified_capacity,
631                Default::default(),
632                shard_amount,
633            ),
634            map: FxDashMap::with_capacity_and_hasher_and_shard_amount(
635                map_capacity,
636                Default::default(),
637                shard_amount,
638            ),
639        }
640    }
641
642    /// Processes every modified item (resp. a snapshot of it) with the given functions and returns
643    /// the results. Ends snapshot mode afterwards.
644    /// preprocess is potentially called within a lock, so it should be fast.
645    /// process is called outside of locks, so it could do more expensive operations.
646    pub fn take_snapshot<
647        'l,
648        T,
649        R,
650        PP: for<'a> Fn(TaskId, &'a InnerStorage) -> T + Sync,
651        P: Fn(TaskId, T) -> R + Sync,
652        PS: Fn(TaskId, Box<InnerStorageSnapshot>) -> R + Sync,
653    >(
654        &'l self,
655        preprocess: &'l PP,
656        process: &'l P,
657        process_snapshot: &'l PS,
658    ) -> Vec<SnapshotShard<'l, PP, P, PS>> {
659        if !self.snapshot_mode() {
660            self.start_snapshot();
661        }
662
663        let guard = Arc::new(SnapshotGuard { storage: self });
664
665        // The number of shards is much larger than the number of threads, so the effect of the
666        // locks held is negligible.
667        self.modified
668            .shards()
669            .par_iter()
670            .with_max_len(1)
671            .map(|shard| {
672                let mut direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)> = Vec::new();
673                let mut modified: SmallVec<[TaskId; 4]> = SmallVec::new();
674                {
675                    // Take the snapshots from the modified map
676                    let guard = shard.write();
677                    // Safety: guard must outlive the iterator.
678                    for bucket in unsafe { guard.iter() } {
679                        // Safety: the guard guarantees that the bucket is not removed and the ptr
680                        // is valid.
681                        let (key, shared_value) = unsafe { bucket.as_mut() };
682                        let modified_state = shared_value.get_mut();
683                        match modified_state {
684                            ModifiedState::Modified => {
685                                modified.push(*key);
686                            }
687                            ModifiedState::Snapshot(snapshot) => {
688                                if let Some(snapshot) = snapshot.take() {
689                                    direct_snapshots.push((*key, snapshot));
690                                }
691                            }
692                        }
693                    }
694                    // Safety: guard must outlive the iterator.
695                    drop(guard);
696                }
697
698                SnapshotShard {
699                    direct_snapshots,
700                    modified,
701                    storage: self,
702                    guard: Some(guard.clone()),
703                    process,
704                    preprocess,
705                    process_snapshot,
706                }
707            })
708            .collect::<Vec<_>>()
709    }
710
711    /// Start snapshot mode.
712    pub fn start_snapshot(&self) {
713        self.snapshot_mode
714            .store(true, std::sync::atomic::Ordering::Release);
715    }
716
717    /// End snapshot mode.
718    /// Items that have snapshots will be kept as modified since they have been accessed during the
719    /// snapshot mode. Items that are modified will be removed and considered as unmodified.
720    /// When items are accessed in future they will be marked as modified.
721    fn end_snapshot(&self) {
722        // We are still in snapshot mode, so all accessed items would be stored as snapshot.
723        // This means we can start by removing all modified items.
724        let mut removed_modified = Vec::new();
725        self.modified.retain(|key, inner| {
726            if matches!(inner, ModifiedState::Modified) {
727                removed_modified.push(*key);
728                false
729            } else {
730                true
731            }
732        });
733
734        // We also need to unset all the modified flags.
735        for key in removed_modified {
736            if let Some(mut inner) = self.map.get_mut(&key) {
737                let state = inner.state_mut();
738                state.set_data_modified(false);
739                state.set_meta_modified(false);
740            }
741        }
742
743        // Now modified only contains snapshots.
744        // We leave snapshot mode. Any access would be stored as modified and not as snapshot.
745        self.snapshot_mode
746            .store(false, std::sync::atomic::Ordering::Release);
747
748        // We can change all the snapshots to modified now.
749        let mut removed_snapshots = Vec::new();
750        for mut item in self.modified.iter_mut() {
751            match item.value() {
752                ModifiedState::Snapshot(_) => {
753                    removed_snapshots.push(*item.key());
754                    *item.value_mut() = ModifiedState::Modified;
755                }
756                ModifiedState::Modified => {
757                    // This means it was concurrently modified.
758                    // It's already in the correct state.
759                }
760            }
761        }
762
763        // And update the flags
764        for key in removed_snapshots {
765            if let Some(mut inner) = self.map.get_mut(&key) {
766                let state = inner.state_mut();
767                if state.meta_snapshot() {
768                    state.set_meta_snapshot(false);
769                    state.set_meta_modified(true);
770                }
771                if state.data_snapshot() {
772                    state.set_data_snapshot(false);
773                    state.set_data_modified(true);
774                }
775            }
776        }
777
778        // Remove excessive capacity in modified
779        self.modified.shrink_to_fit();
780    }
781
782    fn snapshot_mode(&self) -> bool {
783        self.snapshot_mode
784            .load(std::sync::atomic::Ordering::Acquire)
785    }
786
787    pub fn access_mut(&self, key: TaskId) -> StorageWriteGuard<'_> {
788        let inner = match self.map.entry(key) {
789            dashmap::mapref::entry::Entry::Occupied(e) => e.into_ref(),
790            dashmap::mapref::entry::Entry::Vacant(e) => e.insert(Box::new(InnerStorage::new())),
791        };
792        StorageWriteGuard {
793            storage: self,
794            inner: inner.into(),
795        }
796    }
797
798    pub fn access_pair_mut(
799        &self,
800        key1: TaskId,
801        key2: TaskId,
802    ) -> (StorageWriteGuard<'_>, StorageWriteGuard<'_>) {
803        let (a, b) = get_multiple_mut(&self.map, key1, key2, || Box::new(InnerStorage::new()));
804        (
805            StorageWriteGuard {
806                storage: self,
807                inner: a,
808            },
809            StorageWriteGuard {
810                storage: self,
811                inner: b,
812            },
813        )
814    }
815}
816
817pub struct StorageWriteGuard<'a> {
818    storage: &'a Storage,
819    inner: RefMut<'a, TaskId, Box<InnerStorage>>,
820}
821
822impl StorageWriteGuard<'_> {
823    /// Tracks mutation of this task
824    pub fn track_modification(&mut self, category: SpecificTaskDataCategory) {
825        let state = self.inner.state();
826        let snapshot = match category {
827            SpecificTaskDataCategory::Meta => state.meta_snapshot(),
828            SpecificTaskDataCategory::Data => state.data_snapshot(),
829        };
830        if !snapshot {
831            let modified = match category {
832                SpecificTaskDataCategory::Meta => state.meta_modified(),
833                SpecificTaskDataCategory::Data => state.data_modified(),
834            };
835            match (self.storage.snapshot_mode(), modified) {
836                (false, false) => {
837                    // Not in snapshot mode and item is unmodified
838                    if !state.any_snapshot() && !state.any_modified() {
839                        self.storage
840                            .modified
841                            .insert(*self.inner.key(), ModifiedState::Modified);
842                    }
843                    let state = self.inner.state_mut();
844                    match category {
845                        SpecificTaskDataCategory::Meta => state.set_meta_modified(true),
846                        SpecificTaskDataCategory::Data => state.set_data_modified(true),
847                    }
848                }
849                (false, true) => {
850                    // Not in snapshot mode and item is already modified
851                    // Do nothing
852                }
853                (true, false) => {
854                    // In snapshot mode and item is unmodified (so it's not part of the snapshot)
855                    if !state.any_snapshot() {
856                        self.storage
857                            .modified
858                            .insert(*self.inner.key(), ModifiedState::Snapshot(None));
859                    }
860                    let state = self.inner.state_mut();
861                    match category {
862                        SpecificTaskDataCategory::Meta => state.set_meta_snapshot(true),
863                        SpecificTaskDataCategory::Data => state.set_data_snapshot(true),
864                    }
865                }
866                (true, true) => {
867                    // In snapshot mode and item is modified (so it's part of the snapshot)
868                    // We need to store the original version that is part of the snapshot
869                    if !state.any_snapshot() {
870                        self.storage.modified.insert(
871                            *self.inner.key(),
872                            ModifiedState::Snapshot(Some(Box::new((&**self.inner).into()))),
873                        );
874                    }
875                    let state = self.inner.state_mut();
876                    match category {
877                        SpecificTaskDataCategory::Meta => state.set_meta_snapshot(true),
878                        SpecificTaskDataCategory::Data => state.set_data_snapshot(true),
879                    }
880                }
881            }
882        }
883    }
884}
885
886impl Deref for StorageWriteGuard<'_> {
887    type Target = InnerStorage;
888
889    fn deref(&self) -> &Self::Target {
890        &self.inner
891    }
892}
893
894impl DerefMut for StorageWriteGuard<'_> {
895    fn deref_mut(&mut self) -> &mut Self::Target {
896        &mut self.inner
897    }
898}
899
900macro_rules! count {
901    ($task:ident, $key:ident) => {{ $task.count($crate::data::CachedDataItemType::$key) }};
902}
903
904macro_rules! get {
905    ($task:ident, $key:ident $input:tt) => {{
906        #[allow(unused_imports)]
907        use $crate::backend::operation::TaskGuard;
908        if let Some($crate::data::CachedDataItemValueRef::$key {
909            value,
910        }) = $task.get(&$crate::data::CachedDataItemKey::$key $input) {
911            Some(value)
912        } else {
913            None
914        }
915    }};
916    ($task:ident, $key:ident) => {
917        $crate::backend::storage::get!($task, $key {})
918    };
919}
920
921macro_rules! get_mut {
922    ($task:ident, $key:ident $input:tt) => {{
923        #[allow(unused_imports)]
924        use $crate::backend::operation::TaskGuard;
925        if let Some($crate::data::CachedDataItemValueRefMut::$key {
926            value,
927        }) = $task.get_mut(&$crate::data::CachedDataItemKey::$key $input) {
928            let () = $crate::data::allow_mut_access::$key;
929            Some(value)
930        } else {
931            None
932        }
933    }};
934    ($task:ident, $key:ident) => {
935        $crate::backend::storage::get_mut!($task, $key {})
936    };
937}
938
939macro_rules! get_mut_or_insert_with {
940    ($task:ident, $key:ident $input:tt, $f:expr) => {{
941        #[allow(unused_imports)]
942        use $crate::backend::operation::TaskGuard;
943        let () = $crate::data::allow_mut_access::$key;
944        let functor = $f;
945        let $crate::data::CachedDataItemValueRefMut::$key {
946            value,
947        } = $task.get_mut_or_insert_with($crate::data::CachedDataItemKey::$key $input, move || $crate::data::CachedDataItemValue::$key { value: functor() }) else {
948            unreachable!()
949        };
950        value
951    }};
952    ($task:ident, $key:ident, $f:expr) => {
953        $crate::backend::storage::get_mut_or_insert_with!($task, $key {}, $f)
954    };
955}
956
957/// Creates an iterator over all [`CachedDataItemKey::$key`][crate::data::CachedDataItemKey]s in
958/// `$task` matching the given `$key_pattern`, optional `$value_pattern`, and optional `if $cond`.
959///
960/// Each element in the iterator is determined by `$iter_item`, which may use fields extracted by
961/// `$key_pattern` or `$value_pattern`.
962macro_rules! iter_many {
963    ($task:ident, $key:ident $key_pattern:tt $(if $cond:expr)? => $iter_item:expr) => {{
964        #[allow(unused_imports)]
965        use $crate::backend::operation::TaskGuard;
966        $task
967            .iter($crate::data::CachedDataItemType::$key)
968            .filter_map(|(key, _)| match key {
969                $crate::data::CachedDataItemKey::$key $key_pattern $(if $cond)? => Some(
970                    $iter_item
971                ),
972                _ => None,
973            })
974    }};
975    ($task:ident, $key:ident $input:tt $value_pattern:tt $(if $cond:expr)? => $iter_item:expr) => {{
976        #[allow(unused_imports)]
977        use $crate::backend::operation::TaskGuard;
978        $task
979            .iter($crate::data::CachedDataItemType::$key)
980            .filter_map(|(key, value)| match (key, value) {
981                (
982                    $crate::data::CachedDataItemKey::$key $input,
983                    $crate::data::CachedDataItemValueRef::$key { value: $value_pattern }
984                ) $(if $cond)? => Some($iter_item),
985                _ => None,
986            })
987    }};
988}
989
990/// A thin wrapper around [`iter_many`] that calls [`Iterator::collect`].
991///
992/// Note that the return type of [`Iterator::collect`] may be ambiguous in certain contexts, so
993/// using this macro may require explicit type annotations on variables.
994macro_rules! get_many {
995    ($($args:tt)*) => {
996        $crate::backend::storage::iter_many!($($args)*).collect()
997    };
998}
999
1000macro_rules! update {
1001    ($task:ident, $key:ident $input:tt, $update:expr) => {{
1002        #[allow(unused_imports)]
1003        use $crate::backend::operation::TaskGuard;
1004        #[allow(unused_mut)]
1005        let mut update = $update;
1006        $task.update($crate::data::CachedDataItemKey::$key $input, |old| {
1007            update(old.and_then(|old| {
1008                if let $crate::data::CachedDataItemValue::$key { value } = old {
1009                    Some(value)
1010                } else {
1011                    None
1012                }
1013            }))
1014            .map(|new| $crate::data::CachedDataItemValue::$key { value: new })
1015        })
1016    }};
1017    ($task:ident, $key:ident, $update:expr) => {
1018        $crate::backend::storage::update!($task, $key {}, $update)
1019    };
1020}
1021
1022macro_rules! update_count {
1023    ($task:ident, $key:ident $input:tt, -$update:expr) => {{
1024        let update = $update;
1025        let mut state_change = false;
1026        $crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
1027            #[allow(unused_comparisons, reason = "type of update might be unsigned, where update < 0 is always false")]
1028            if let Some(old) = old {
1029                let new = old - update;
1030                state_change = old <= 0 && new > 0 || old > 0 && new <= 0;
1031                (new != 0).then_some(new)
1032            } else {
1033                state_change = update < 0;
1034                (update != 0).then_some(-update)
1035            }
1036        });
1037        state_change
1038    }};
1039    ($task:ident, $key:ident $input:tt, $update:expr) => {
1040        match $update {
1041            update => {
1042                let mut state_change = false;
1043                $crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
1044                    if let Some(old) = old {
1045                        let new = old + update;
1046                        state_change = old <= 0 && new > 0 || old > 0 && new <= 0;
1047                        (new != 0).then_some(new)
1048                    } else {
1049                        state_change = update > 0;
1050                        (update != 0).then_some(update)
1051                    }
1052                });
1053                state_change
1054            }
1055        }
1056    };
1057    ($task:ident, $key:ident, -$update:expr) => {
1058        $crate::backend::storage::update_count!($task, $key {}, -$update)
1059    };    ($task:ident, $key:ident, $update:expr) => {
1060        $crate::backend::storage::update_count!($task, $key {}, $update)
1061    };
1062}
1063
1064macro_rules! remove {
1065    ($task:ident, $key:ident $input:tt) => {{
1066        #[allow(unused_imports)]
1067        use $crate::backend::operation::TaskGuard;
1068        if let Some($crate::data::CachedDataItemValue::$key { value }) = $task.remove(
1069            &$crate::data::CachedDataItemKey::$key $input
1070        ) {
1071            Some(value)
1072        } else {
1073            None
1074        }
1075    }};
1076    ($task:ident, $key:ident) => {
1077        $crate::backend::storage::remove!($task, $key {})
1078    };
1079}
1080
1081pub(crate) use count;
1082pub(crate) use get;
1083pub(crate) use get_many;
1084pub(crate) use get_mut;
1085pub(crate) use get_mut_or_insert_with;
1086pub(crate) use iter_many;
1087pub(crate) use remove;
1088pub(crate) use update;
1089pub(crate) use update_count;
1090
1091pub struct SnapshotGuard<'l> {
1092    storage: &'l Storage,
1093}
1094
1095impl Drop for SnapshotGuard<'_> {
1096    fn drop(&mut self) {
1097        self.storage.end_snapshot();
1098    }
1099}
1100
1101pub struct SnapshotShard<'l, PP, P, PS> {
1102    direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)>,
1103    modified: SmallVec<[TaskId; 4]>,
1104    storage: &'l Storage,
1105    guard: Option<Arc<SnapshotGuard<'l>>>,
1106    process: &'l P,
1107    preprocess: &'l PP,
1108    process_snapshot: &'l PS,
1109}
1110
1111impl<'l, T, R, PP, P, PS> Iterator for SnapshotShard<'l, PP, P, PS>
1112where
1113    PP: for<'a> Fn(TaskId, &'a InnerStorage) -> T + Sync,
1114    P: Fn(TaskId, T) -> R + Sync,
1115    PS: Fn(TaskId, Box<InnerStorageSnapshot>) -> R + Sync,
1116{
1117    type Item = R;
1118
1119    fn next(&mut self) -> Option<Self::Item> {
1120        if let Some((task_id, snapshot)) = self.direct_snapshots.pop() {
1121            return Some((self.process_snapshot)(task_id, snapshot));
1122        }
1123        while let Some(task_id) = self.modified.pop() {
1124            let inner = self.storage.map.get(&task_id).unwrap();
1125            let state = inner.state();
1126            if !state.any_snapshot() {
1127                let preprocessed = (self.preprocess)(task_id, &inner);
1128                drop(inner);
1129                return Some((self.process)(task_id, preprocessed));
1130            } else {
1131                drop(inner);
1132                let maybe_snapshot = {
1133                    let mut modified_state = self.storage.modified.get_mut(&task_id).unwrap();
1134                    let ModifiedState::Snapshot(snapshot) = &mut *modified_state else {
1135                        unreachable!("The snapshot bit was set, so it must be in Snapshot state");
1136                    };
1137                    snapshot.take()
1138                };
1139                if let Some(snapshot) = maybe_snapshot {
1140                    return Some((self.process_snapshot)(task_id, snapshot));
1141                }
1142            }
1143        }
1144        self.guard = None;
1145        None
1146    }
1147}