turbo_tasks_backend/backend/
storage.rs

1use std::{
2    hash::Hash,
3    ops::{Deref, DerefMut},
4    sync::{Arc, atomic::AtomicBool},
5};
6
7use bitfield::bitfield;
8use smallvec::SmallVec;
9use turbo_tasks::{FxDashMap, TaskId, parallel};
10
11use crate::{
12    backend::dynamic_storage::DynamicStorage,
13    data::{
14        AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
15        CachedDataItemValue, CachedDataItemValueRef, CachedDataItemValueRefMut, OutputValue,
16    },
17    data_storage::{AutoMapStorage, OptionStorage},
18    utils::{
19        dash_map_drop_contents::drop_contents,
20        dash_map_multi::{RefMut, get_multiple_mut},
21    },
22};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
25pub enum TaskDataCategory {
26    Meta,
27    Data,
28    All,
29}
30
31impl TaskDataCategory {
32    pub fn into_specific(self) -> SpecificTaskDataCategory {
33        match self {
34            TaskDataCategory::Meta => SpecificTaskDataCategory::Meta,
35            TaskDataCategory::Data => SpecificTaskDataCategory::Data,
36            TaskDataCategory::All => unreachable!(),
37        }
38    }
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
42pub enum SpecificTaskDataCategory {
43    Meta,
44    Data,
45}
46
47impl IntoIterator for TaskDataCategory {
48    type Item = TaskDataCategory;
49
50    type IntoIter = TaskDataCategoryIterator;
51
52    fn into_iter(self) -> Self::IntoIter {
53        match self {
54            TaskDataCategory::Meta => TaskDataCategoryIterator::Meta,
55            TaskDataCategory::Data => TaskDataCategoryIterator::Data,
56            TaskDataCategory::All => TaskDataCategoryIterator::All,
57        }
58    }
59}
60
61pub enum TaskDataCategoryIterator {
62    All,
63    Meta,
64    Data,
65    None,
66}
67
68impl Iterator for TaskDataCategoryIterator {
69    type Item = TaskDataCategory;
70
71    fn next(&mut self) -> Option<Self::Item> {
72        match self {
73            TaskDataCategoryIterator::All => {
74                *self = TaskDataCategoryIterator::Data;
75                Some(TaskDataCategory::Meta)
76            }
77            TaskDataCategoryIterator::Meta => {
78                *self = TaskDataCategoryIterator::None;
79                Some(TaskDataCategory::Meta)
80            }
81            TaskDataCategoryIterator::Data => {
82                *self = TaskDataCategoryIterator::None;
83                Some(TaskDataCategory::Data)
84            }
85            TaskDataCategoryIterator::None => None,
86        }
87    }
88}
89
90bitfield! {
91    // Note: Due to alignment in InnerStorage it doesn't matter if this struct is 1 or 4 bytes.
92    #[derive(Clone, Default)]
93    pub struct InnerStorageState(u32);
94    impl Debug;
95    pub meta_restored, set_meta_restored: 0;
96    pub data_restored, set_data_restored: 1;
97    /// Item was modified before snapshot mode was entered.
98    pub meta_modified, set_meta_modified: 2;
99    pub data_modified, set_data_modified: 3;
100    /// Item was modified after snapshot mode was entered. A snapshot was taken.
101    pub meta_snapshot, set_meta_snapshot: 4;
102    pub data_snapshot, set_data_snapshot: 5;
103    /// Prefetched dependencies
104    pub prefetched, set_prefetched: 6;
105}
106
107impl InnerStorageState {
108    pub fn set_restored(&mut self, category: TaskDataCategory) {
109        match category {
110            TaskDataCategory::Meta => {
111                self.set_meta_restored(true);
112            }
113            TaskDataCategory::Data => {
114                self.set_data_restored(true);
115            }
116            TaskDataCategory::All => {
117                self.set_meta_restored(true);
118                self.set_data_restored(true);
119            }
120        }
121    }
122
123    pub fn is_restored(&self, category: TaskDataCategory) -> bool {
124        match category {
125            TaskDataCategory::Meta => self.meta_restored(),
126            TaskDataCategory::Data => self.data_restored(),
127            TaskDataCategory::All => self.meta_restored() && self.data_restored(),
128        }
129    }
130
131    pub fn any_snapshot(&self) -> bool {
132        self.meta_snapshot() || self.data_snapshot()
133    }
134
135    pub fn any_modified(&self) -> bool {
136        self.meta_modified() || self.data_modified()
137    }
138}
139
140pub struct InnerStorageSnapshot {
141    aggregation_number: OptionStorage<AggregationNumber>,
142    output_dependent: AutoMapStorage<TaskId, ()>,
143    output: OptionStorage<OutputValue>,
144    upper: AutoMapStorage<TaskId, u32>,
145    dynamic: DynamicStorage,
146    pub meta_modified: bool,
147    pub data_modified: bool,
148}
149
150impl From<&InnerStorage> for InnerStorageSnapshot {
151    fn from(inner: &InnerStorage) -> Self {
152        Self {
153            aggregation_number: inner.aggregation_number.clone(),
154            output_dependent: inner.output_dependent.clone(),
155            output: inner.output.clone(),
156            upper: inner.upper.clone(),
157            dynamic: inner.dynamic.snapshot_for_persisting(),
158            meta_modified: inner.state.meta_modified(),
159            data_modified: inner.state.data_modified(),
160        }
161    }
162}
163
164impl InnerStorageSnapshot {
165    pub fn iter_all(
166        &self,
167    ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)> {
168        use crate::data_storage::Storage;
169        self.dynamic
170            .iter_all()
171            .chain(self.aggregation_number.iter().map(|(_, value)| {
172                (
173                    CachedDataItemKey::AggregationNumber {},
174                    CachedDataItemValueRef::AggregationNumber { value },
175                )
176            }))
177            .chain(self.output.iter().map(|(_, value)| {
178                (
179                    CachedDataItemKey::Output {},
180                    CachedDataItemValueRef::Output { value },
181                )
182            }))
183            .chain(self.upper.iter().map(|(k, value)| {
184                (
185                    CachedDataItemKey::Upper { task: *k },
186                    CachedDataItemValueRef::Upper { value },
187                )
188            }))
189            .chain(self.output_dependent.iter().map(|(k, value)| {
190                (
191                    CachedDataItemKey::OutputDependent { task: *k },
192                    CachedDataItemValueRef::OutputDependent { value },
193                )
194            }))
195    }
196
197    pub fn len(&self) -> usize {
198        use crate::data_storage::Storage;
199        self.dynamic.len()
200            + self.aggregation_number.len()
201            + self.output.len()
202            + self.upper.len()
203            + self.output_dependent.len()
204    }
205}
206
207#[derive(Debug, Clone)]
208pub struct InnerStorage {
209    aggregation_number: OptionStorage<AggregationNumber>,
210    output_dependent: AutoMapStorage<TaskId, ()>,
211    output: OptionStorage<OutputValue>,
212    upper: AutoMapStorage<TaskId, u32>,
213    dynamic: DynamicStorage,
214    state: InnerStorageState,
215}
216
217impl InnerStorage {
218    fn new() -> Self {
219        Self {
220            aggregation_number: Default::default(),
221            output_dependent: Default::default(),
222            output: Default::default(),
223            upper: Default::default(),
224            dynamic: DynamicStorage::new(),
225            state: InnerStorageState::default(),
226        }
227    }
228
229    pub fn state(&self) -> &InnerStorageState {
230        &self.state
231    }
232
233    pub fn state_mut(&mut self) -> &mut InnerStorageState {
234        &mut self.state
235    }
236}
237
238#[macro_export]
239macro_rules! generate_inner_storage_internal {
240    // Matching on CachedDataItem with a $value
241    (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $key_field:ident => $field:ident,) => {
242        if let CachedDataItem::$tag { $key_field, $value } = $item {
243            let result = $self.$field.$fn($key_field, $($args)*);
244            return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $key_field => $field);
245        }
246    };
247    (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident => $field:ident,) => {
248        if let CachedDataItem::$tag { $value } = $item {
249            let result = $self.$field.$fn((), $($args)*);
250            return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag => $field);
251        }
252    };
253    (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
254        $crate::generate_inner_storage_internal!(CachedDataItem: $self, $item, $value, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
255        $crate::generate_inner_storage_internal!(CachedDataItem: $self, $item, $value, $return_ty, $fn($($args)*): $($config)+)
256    };
257    // Matching on CachedDataItemKey without a $value
258    (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $key_field:ident => $field:ident,) => {
259        if let CachedDataItemKey::$tag { $key_field } = $item {
260            let result = $self.$field.$fn($key_field, $($args)*);
261            return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $key_field => $field);
262        }
263    };
264    (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident => $field:ident,) => {
265        if let CachedDataItemKey::$tag { } = $item {
266            let result = $self.$field.$fn(&(), $($args)*);
267            return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag => $field);
268        }
269    };
270    (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
271        $crate::generate_inner_storage_internal!(CachedDataItemKey: $self, $item, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
272        $crate::generate_inner_storage_internal!(CachedDataItemKey: $self, $item, $return_ty, $fn($($args)*): $($config)+)
273    };
274    // Matching on CachedDataItemType without a $value
275    (CachedDataItemType: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident,) => {
276        if let CachedDataItemType::$tag = $item {
277            let result = $self.$field.$fn($($args)*);
278            return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $($key_field)? => $field);
279        }
280    };
281    (CachedDataItemType: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
282        $crate::generate_inner_storage_internal!(CachedDataItemType: $self, $item, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
283        $crate::generate_inner_storage_internal!(CachedDataItemType: $self, $item, $return_ty, $fn($($args)*): $($config)+)
284    };
285
286    // fn update
287    (update: $self:ident, $key:ident, $update:ident: $tag:ident $key_field:ident => $field:ident,) => {
288        if let CachedDataItemKey::$tag { $key_field } = $key {
289            $self.$field.update($key_field, |old| {
290                let old = old.map(|old| CachedDataItemValue::$tag { value: old });
291                let new = $update(old);
292                new.map(|new| if let CachedDataItemValue::$tag { value } = new {
293                    value
294                } else {
295                    unreachable!()
296                })
297            });
298            return;
299        }
300    };
301    (update: $self:ident, $key:ident, $update:ident: $tag:ident => $field:ident,) => {
302        if let CachedDataItemKey::$tag { } = $key {
303            $self.$field.update((), |old| {
304                let old = old.map(|old| CachedDataItemValue::$tag { value: old });
305                let new = $update(old);
306                new.map(|new| if let CachedDataItemValue::$tag { value } = new {
307                    value
308                } else {
309                    unreachable!()
310                })
311            });
312            return;
313        }
314    };
315    (update: $self:ident, $key:ident, $update:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
316        $crate::generate_inner_storage_internal!(update: $self, $key, $update: $tag $($key_field)? => $field,);
317        $crate::generate_inner_storage_internal!(update: $self, $key, $update: $($config)+)
318    };
319
320    // fn get_mut_or_insert_with
321    (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident $key_field:ident => $field:ident,) => {
322        if let CachedDataItemKey::$tag { $key_field } = $key {
323            let value = $self.$field.get_mut_or_insert_with($key_field, || {
324                let value = $insert_with();
325                if let CachedDataItemValue::$tag { value } = value {
326                    value
327                } else {
328                    unreachable!()
329                }
330            });
331            return CachedDataItemValueRefMut::$tag { value };
332        }
333    };
334    (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident => $field:ident,) => {
335        if let CachedDataItemKey::$tag { } = $key {
336            let value = $self.$field.get_mut_or_insert_with((), || {
337                let value = $insert_with();
338                if let CachedDataItemValue::$tag { value } = value {
339                    value
340                } else {
341                    unreachable!()
342                }
343            });
344            return CachedDataItemValueRefMut::$tag { value };
345        }
346    };
347    (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
348        $crate::generate_inner_storage_internal!(get_mut_or_insert_with: $self, $key, $insert_with: $tag $($key_field)? => $field,);
349        $crate::generate_inner_storage_internal!(get_mut_or_insert_with: $self, $key, $insert_with: $($config)+)
350    };
351
352    // fn extract_if
353    (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident $key_field:ident => $field:ident,) => {
354        if let CachedDataItemType::$tag = $ty {
355            let iter = $self.$field.extract_if(move |key, value| {
356                $f(CachedDataItemKey::$tag { $key_field: *key }, CachedDataItemValueRef::$tag { value })
357            }).map(|($key_field, value)| CachedDataItem::$tag { $key_field, value });
358            return InnerStorageIter::$tag(iter);
359        }
360    };
361    (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident => $field:ident,) => {
362        if let CachedDataItemType::$tag = $ty {
363            let iter = $self.$field.extract_if(move |_, value| {
364                $f(CachedDataItemKey::$tag { }, CachedDataItemValueRef::$tag { value })
365            }).map(|(_, value)| CachedDataItem::$tag { value });
366            return InnerStorageIter::$tag(iter);
367        }
368    };
369    (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
370        $crate::generate_inner_storage_internal!(extract_if: $self, $ty, $f: $tag $($key_field)? => $field,);
371        $crate::generate_inner_storage_internal!(extract_if: $self, $ty, $f: $($config)+)
372    };
373
374    // fn iter
375    (iter: $self:ident, $ty:ident: $tag:ident $key_field:ident => $field:ident,) => {
376        if let CachedDataItemType::$tag = $ty {
377            let iter = $self.$field.iter().map(|($key_field, value)| (CachedDataItemKey::$tag { $key_field: *$key_field }, CachedDataItemValueRef::$tag { value }));
378            return InnerStorageIter::$tag(iter);
379        }
380    };
381    (iter: $self:ident, $ty:ident: $tag:ident => $field:ident,) => {
382        if let CachedDataItemType::$tag = $ty {
383            let iter = $self.$field.iter().map(|(_, value)| (CachedDataItemKey::$tag { }, CachedDataItemValueRef::$tag { value }));
384            return InnerStorageIter::$tag(iter);
385        }
386    };
387    (iter: $self:ident, $ty:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
388        $crate::generate_inner_storage_internal!(iter: $self, $ty: $tag $($key_field)? => $field,);
389        $crate::generate_inner_storage_internal!(iter: $self, $ty: $($config)+)
390    };
391
392
393    // Return value handling
394    (return_value: $result:ident, none: $($more:tt)*) => {
395        $result
396    };
397    (return_value: $result:ident, option_value: $tag:ident $($more:tt)*) => {
398        $result.map(|value| CachedDataItemValue::$tag { value })
399    };
400    (return_value: $result:ident, option_ref: $tag:ident $($more:tt)*) => {
401        $result.map(|value| CachedDataItemValueRef::$tag { value })
402    };
403    (return_value: $result:ident, option_ref_mut: $tag:ident $($more:tt)*) => {
404        $result.map(|value| CachedDataItemValueRefMut::$tag { value })
405    };
406
407    // Input value handling
408    (input_value: $input:ident, option_value: $tag:ident $($more:tt)*) => {
409        $input.map(|value| {
410            if let CachedDataItemValue::$tag { value } = value {
411                value
412            } else {
413                unreachable!()
414            }
415        })
416    };
417
418}
419
420macro_rules! generate_inner_storage {
421    ($($config:tt)*) => {
422        impl InnerStorage {
423            pub fn add(&mut self, item: CachedDataItem) -> bool {
424                use crate::data_storage::Storage;
425                $crate::generate_inner_storage_internal!(CachedDataItem: self, item, value, none, add(value): $($config)*);
426                self.dynamic.add(item)
427            }
428
429            pub fn insert(&mut self, item: CachedDataItem) -> Option<CachedDataItemValue> {
430                use crate::data_storage::Storage;
431                $crate::generate_inner_storage_internal!(CachedDataItem: self, item, value, option_value, insert(value): $($config)*);
432                self.dynamic.insert(item)
433            }
434
435            pub fn remove(&mut self, key: &CachedDataItemKey) -> Option<CachedDataItemValue> {
436                use crate::data_storage::Storage;
437                $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_value, remove(): $($config)*);
438                self.dynamic.remove(key)
439            }
440
441            pub fn count(&self, ty: CachedDataItemType) -> usize {
442                use crate::data_storage::Storage;
443                $crate::generate_inner_storage_internal!(CachedDataItemType: self, ty, none, len(): $($config)*);
444                self.dynamic.count(ty)
445            }
446
447            pub fn get(&self, key: &CachedDataItemKey) -> Option<CachedDataItemValueRef<'_>> {
448                use crate::data_storage::Storage;
449                $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_ref, get(): $($config)*);
450                self.dynamic.get(key)
451            }
452
453            pub fn contains_key(&self, key: &CachedDataItemKey) -> bool {
454                use crate::data_storage::Storage;
455                $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, none, contains_key(): $($config)*);
456                self.dynamic.contains_key(key)
457            }
458
459            pub fn get_mut(&mut self, key: &CachedDataItemKey) -> Option<CachedDataItemValueRefMut<'_>> {
460                use crate::data_storage::Storage;
461                $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_ref_mut, get_mut(): $($config)*);
462                self.dynamic.get_mut(key)
463            }
464
465            pub fn shrink_to_fit(&mut self, ty: CachedDataItemType) {
466                use crate::data_storage::Storage;
467                $crate::generate_inner_storage_internal!(CachedDataItemType: self, ty, none, shrink_to_fit(): $($config)*);
468                self.dynamic.shrink_to_fit(ty)
469            }
470
471            pub fn update(
472                &mut self,
473                key: CachedDataItemKey,
474                update: impl FnOnce(Option<CachedDataItemValue>) -> Option<CachedDataItemValue>,
475            ) {
476                use crate::data_storage::Storage;
477                $crate::generate_inner_storage_internal!(update: self, key, update: $($config)*);
478                self.dynamic.update(key, update)
479            }
480
481            pub fn extract_if<'l, F>(
482                &'l mut self,
483                ty: CachedDataItemType,
484                mut f: F,
485            ) -> impl Iterator<Item = CachedDataItem> + use<'l, F>
486            where
487                F: for<'a> FnMut(CachedDataItemKey, CachedDataItemValueRef<'a>) -> bool + 'l,
488            {
489                use crate::data_storage::Storage;
490                $crate::generate_inner_storage_internal!(extract_if: self, ty, f: $($config)*);
491                InnerStorageIter::Dynamic(self.dynamic.extract_if(ty, f))
492            }
493
494            pub fn get_mut_or_insert_with(
495                &mut self,
496                key: CachedDataItemKey,
497                f: impl FnOnce() -> CachedDataItemValue,
498            ) -> CachedDataItemValueRefMut<'_>
499            {
500                use crate::data_storage::Storage;
501                $crate::generate_inner_storage_internal!(get_mut_or_insert_with: self, key, f: $($config)*);
502                self.dynamic.get_mut_or_insert_with(key, f)
503            }
504
505            pub fn iter(
506                &self,
507                ty: CachedDataItemType,
508            ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)>
509            {
510                use crate::data_storage::Storage;
511                $crate::generate_inner_storage_internal!(iter: self, ty: $($config)*);
512                InnerStorageIter::Dynamic(self.dynamic.iter(ty))
513            }
514
515        }
516    };
517}
518
519generate_inner_storage!(
520    AggregationNumber => aggregation_number,
521    OutputDependent task => output_dependent,
522    Output => output,
523    Upper task => upper,
524);
525
526enum InnerStorageIter<A, B, C, D, E> {
527    AggregationNumber(A),
528    OutputDependent(B),
529    Output(C),
530    Upper(D),
531    Dynamic(E),
532}
533
534impl<T, A, B, C, D, E> Iterator for InnerStorageIter<A, B, C, D, E>
535where
536    A: Iterator<Item = T>,
537    B: Iterator<Item = T>,
538    C: Iterator<Item = T>,
539    D: Iterator<Item = T>,
540    E: Iterator<Item = T>,
541{
542    type Item = T;
543
544    fn next(&mut self) -> Option<Self::Item> {
545        match self {
546            InnerStorageIter::AggregationNumber(iter) => iter.next(),
547            InnerStorageIter::OutputDependent(iter) => iter.next(),
548            InnerStorageIter::Output(iter) => iter.next(),
549            InnerStorageIter::Upper(iter) => iter.next(),
550            InnerStorageIter::Dynamic(iter) => iter.next(),
551        }
552    }
553}
554
555impl InnerStorage {
556    pub fn iter_all(
557        &self,
558    ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)> {
559        use crate::data_storage::Storage;
560        self.dynamic
561            .iter_all()
562            .chain(self.aggregation_number.iter().map(|(_, value)| {
563                (
564                    CachedDataItemKey::AggregationNumber {},
565                    CachedDataItemValueRef::AggregationNumber { value },
566                )
567            }))
568            .chain(self.output.iter().map(|(_, value)| {
569                (
570                    CachedDataItemKey::Output {},
571                    CachedDataItemValueRef::Output { value },
572                )
573            }))
574            .chain(self.upper.iter().map(|(k, value)| {
575                (
576                    CachedDataItemKey::Upper { task: *k },
577                    CachedDataItemValueRef::Upper { value },
578                )
579            }))
580            .chain(self.output_dependent.iter().map(|(k, value)| {
581                (
582                    CachedDataItemKey::OutputDependent { task: *k },
583                    CachedDataItemValueRef::OutputDependent { value },
584                )
585            }))
586    }
587
588    pub fn len(&self) -> usize {
589        use crate::data_storage::Storage;
590        self.dynamic.len()
591            + self.aggregation_number.len()
592            + self.output.len()
593            + self.upper.len()
594            + self.output_dependent.len()
595    }
596}
597
598enum ModifiedState {
599    /// It was modified before snapshot mode was entered, but it was not accessed during snapshot
600    /// mode.
601    Modified,
602    /// Snapshot(Some):
603    /// It was modified before snapshot mode was entered and it was accessed again during snapshot
604    /// mode. A copy of the version of the item when snapshot mode was entered is stored here.
605    /// Snapshot(None):
606    /// It was not modified before snapshot mode was entered, but it was accessed during snapshot
607    /// mode. Or the snapshot was already taken out by the snapshot operation.
608    Snapshot(Option<Box<InnerStorageSnapshot>>),
609}
610
611pub struct Storage {
612    snapshot_mode: AtomicBool,
613    modified: FxDashMap<TaskId, ModifiedState>,
614    map: FxDashMap<TaskId, Box<InnerStorage>>,
615}
616
617impl Storage {
618    pub fn new(shard_amount: usize, small_preallocation: bool) -> Self {
619        let map_capacity: usize = if small_preallocation {
620            1024
621        } else {
622            1024 * 1024
623        };
624        let modified_capacity: usize = if small_preallocation { 0 } else { 1024 };
625
626        Self {
627            snapshot_mode: AtomicBool::new(false),
628            modified: FxDashMap::with_capacity_and_hasher_and_shard_amount(
629                modified_capacity,
630                Default::default(),
631                shard_amount,
632            ),
633            map: FxDashMap::with_capacity_and_hasher_and_shard_amount(
634                map_capacity,
635                Default::default(),
636                shard_amount,
637            ),
638        }
639    }
640
641    /// Processes every modified item (resp. a snapshot of it) with the given functions and returns
642    /// the results. Ends snapshot mode afterwards.
643    /// preprocess is potentially called within a lock, so it should be fast.
644    /// process is called outside of locks, so it could do more expensive operations.
645    pub fn take_snapshot<
646        'l,
647        T,
648        R,
649        PP: for<'a> Fn(TaskId, &'a InnerStorage) -> T + Sync,
650        P: Fn(TaskId, T) -> R + Sync,
651        PS: Fn(TaskId, Box<InnerStorageSnapshot>) -> R + Sync,
652    >(
653        &'l self,
654        preprocess: &'l PP,
655        process: &'l P,
656        process_snapshot: &'l PS,
657    ) -> Vec<SnapshotShard<'l, PP, P, PS>> {
658        if !self.snapshot_mode() {
659            self.start_snapshot();
660        }
661
662        let guard = Arc::new(SnapshotGuard { storage: self });
663
664        // The number of shards is much larger than the number of threads, so the effect of the
665        // locks held is negligible.
666        parallel::map_collect::<_, _, Vec<_>>(self.modified.shards(), |shard| {
667            let mut direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)> = Vec::new();
668            let mut modified: SmallVec<[TaskId; 4]> = SmallVec::new();
669            {
670                // Take the snapshots from the modified map
671                let guard = shard.write();
672                // Safety: guard must outlive the iterator.
673                for bucket in unsafe { guard.iter() } {
674                    // Safety: the guard guarantees that the bucket is not removed and the ptr
675                    // is valid.
676                    let (key, shared_value) = unsafe { bucket.as_mut() };
677                    let modified_state = shared_value.get_mut();
678                    match modified_state {
679                        ModifiedState::Modified => {
680                            modified.push(*key);
681                        }
682                        ModifiedState::Snapshot(snapshot) => {
683                            if let Some(snapshot) = snapshot.take() {
684                                direct_snapshots.push((*key, snapshot));
685                            }
686                        }
687                    }
688                }
689                // Safety: guard must outlive the iterator.
690                drop(guard);
691            }
692
693            SnapshotShard {
694                direct_snapshots,
695                modified,
696                storage: self,
697                guard: Some(guard.clone()),
698                process,
699                preprocess,
700                process_snapshot,
701            }
702        })
703    }
704
705    /// Start snapshot mode.
706    pub fn start_snapshot(&self) {
707        self.snapshot_mode
708            .store(true, std::sync::atomic::Ordering::Release);
709    }
710
711    /// End snapshot mode.
712    /// Items that have snapshots will be kept as modified since they have been accessed during the
713    /// snapshot mode. Items that are modified will be removed and considered as unmodified.
714    /// When items are accessed in future they will be marked as modified.
715    fn end_snapshot(&self) {
716        // We are still in snapshot mode, so all accessed items would be stored as snapshot.
717        // This means we can start by removing all modified items.
718        let mut removed_modified = Vec::new();
719        self.modified.retain(|key, inner| {
720            if matches!(inner, ModifiedState::Modified) {
721                removed_modified.push(*key);
722                false
723            } else {
724                true
725            }
726        });
727
728        // We also need to unset all the modified flags.
729        for key in removed_modified {
730            if let Some(mut inner) = self.map.get_mut(&key) {
731                let state = inner.state_mut();
732                state.set_data_modified(false);
733                state.set_meta_modified(false);
734            }
735        }
736
737        // Now modified only contains snapshots.
738        // We leave snapshot mode. Any access would be stored as modified and not as snapshot.
739        self.snapshot_mode
740            .store(false, std::sync::atomic::Ordering::Release);
741
742        // We can change all the snapshots to modified now.
743        let mut removed_snapshots = Vec::new();
744        for mut item in self.modified.iter_mut() {
745            match item.value() {
746                ModifiedState::Snapshot(_) => {
747                    removed_snapshots.push(*item.key());
748                    *item.value_mut() = ModifiedState::Modified;
749                }
750                ModifiedState::Modified => {
751                    // This means it was concurrently modified.
752                    // It's already in the correct state.
753                }
754            }
755        }
756
757        // And update the flags
758        for key in removed_snapshots {
759            if let Some(mut inner) = self.map.get_mut(&key) {
760                let state = inner.state_mut();
761                if state.meta_snapshot() {
762                    state.set_meta_snapshot(false);
763                    state.set_meta_modified(true);
764                }
765                if state.data_snapshot() {
766                    state.set_data_snapshot(false);
767                    state.set_data_modified(true);
768                }
769            }
770        }
771
772        // Remove excessive capacity in modified
773        self.modified.shrink_to_fit();
774    }
775
776    fn snapshot_mode(&self) -> bool {
777        self.snapshot_mode
778            .load(std::sync::atomic::Ordering::Acquire)
779    }
780
781    pub fn access_mut(&self, key: TaskId) -> StorageWriteGuard<'_> {
782        let inner = match self.map.entry(key) {
783            dashmap::mapref::entry::Entry::Occupied(e) => e.into_ref(),
784            dashmap::mapref::entry::Entry::Vacant(e) => e.insert(Box::new(InnerStorage::new())),
785        };
786        StorageWriteGuard {
787            storage: self,
788            inner: inner.into(),
789        }
790    }
791
792    pub fn access_pair_mut(
793        &self,
794        key1: TaskId,
795        key2: TaskId,
796    ) -> (StorageWriteGuard<'_>, StorageWriteGuard<'_>) {
797        let (a, b) = get_multiple_mut(&self.map, key1, key2, || Box::new(InnerStorage::new()));
798        (
799            StorageWriteGuard {
800                storage: self,
801                inner: a,
802            },
803            StorageWriteGuard {
804                storage: self,
805                inner: b,
806            },
807        )
808    }
809
810    pub fn drop_contents(&self) {
811        drop_contents(&self.map);
812        drop_contents(&self.modified);
813    }
814}
815
816pub struct StorageWriteGuard<'a> {
817    storage: &'a Storage,
818    inner: RefMut<'a, TaskId, Box<InnerStorage>>,
819}
820
821impl StorageWriteGuard<'_> {
822    /// Tracks mutation of this task
823    pub fn track_modification(&mut self, category: SpecificTaskDataCategory) {
824        let state = self.inner.state();
825        let snapshot = match category {
826            SpecificTaskDataCategory::Meta => state.meta_snapshot(),
827            SpecificTaskDataCategory::Data => state.data_snapshot(),
828        };
829        if !snapshot {
830            let modified = match category {
831                SpecificTaskDataCategory::Meta => state.meta_modified(),
832                SpecificTaskDataCategory::Data => state.data_modified(),
833            };
834            match (self.storage.snapshot_mode(), modified) {
835                (false, false) => {
836                    // Not in snapshot mode and item is unmodified
837                    if !state.any_snapshot() && !state.any_modified() {
838                        self.storage
839                            .modified
840                            .insert(*self.inner.key(), ModifiedState::Modified);
841                    }
842                    let state = self.inner.state_mut();
843                    match category {
844                        SpecificTaskDataCategory::Meta => state.set_meta_modified(true),
845                        SpecificTaskDataCategory::Data => state.set_data_modified(true),
846                    }
847                }
848                (false, true) => {
849                    // Not in snapshot mode and item is already modified
850                    // Do nothing
851                }
852                (true, false) => {
853                    // In snapshot mode and item is unmodified (so it's not part of the snapshot)
854                    if !state.any_snapshot() {
855                        self.storage
856                            .modified
857                            .insert(*self.inner.key(), ModifiedState::Snapshot(None));
858                    }
859                    let state = self.inner.state_mut();
860                    match category {
861                        SpecificTaskDataCategory::Meta => state.set_meta_snapshot(true),
862                        SpecificTaskDataCategory::Data => state.set_data_snapshot(true),
863                    }
864                }
865                (true, true) => {
866                    // In snapshot mode and item is modified (so it's part of the snapshot)
867                    // We need to store the original version that is part of the snapshot
868                    if !state.any_snapshot() {
869                        self.storage.modified.insert(
870                            *self.inner.key(),
871                            ModifiedState::Snapshot(Some(Box::new((&**self.inner).into()))),
872                        );
873                    }
874                    let state = self.inner.state_mut();
875                    match category {
876                        SpecificTaskDataCategory::Meta => state.set_meta_snapshot(true),
877                        SpecificTaskDataCategory::Data => state.set_data_snapshot(true),
878                    }
879                }
880            }
881        }
882    }
883}
884
885impl Deref for StorageWriteGuard<'_> {
886    type Target = InnerStorage;
887
888    fn deref(&self) -> &Self::Target {
889        &self.inner
890    }
891}
892
893impl DerefMut for StorageWriteGuard<'_> {
894    fn deref_mut(&mut self) -> &mut Self::Target {
895        &mut self.inner
896    }
897}
898
899macro_rules! count {
900    ($task:ident, $key:ident) => {{ $task.count($crate::data::CachedDataItemType::$key) }};
901}
902
903macro_rules! get {
904    ($task:ident, $key:ident $input:tt) => {{
905        #[allow(unused_imports)]
906        use $crate::backend::operation::TaskGuard;
907        if let Some($crate::data::CachedDataItemValueRef::$key {
908            value,
909        }) = $task.get(&$crate::data::CachedDataItemKey::$key $input) {
910            Some(value)
911        } else {
912            None
913        }
914    }};
915    ($task:ident, $key:ident) => {
916        $crate::backend::storage::get!($task, $key {})
917    };
918}
919
920macro_rules! get_mut {
921    ($task:ident, $key:ident $input:tt) => {{
922        #[allow(unused_imports)]
923        use $crate::backend::operation::TaskGuard;
924        if let Some($crate::data::CachedDataItemValueRefMut::$key {
925            value,
926        }) = $task.get_mut(&$crate::data::CachedDataItemKey::$key $input) {
927            let () = $crate::data::allow_mut_access::$key;
928            Some(value)
929        } else {
930            None
931        }
932    }};
933    ($task:ident, $key:ident) => {
934        $crate::backend::storage::get_mut!($task, $key {})
935    };
936}
937
938macro_rules! get_mut_or_insert_with {
939    ($task:ident, $key:ident $input:tt, $f:expr) => {{
940        #[allow(unused_imports)]
941        use $crate::backend::operation::TaskGuard;
942        let () = $crate::data::allow_mut_access::$key;
943        let functor = $f;
944        let $crate::data::CachedDataItemValueRefMut::$key {
945            value,
946        } = $task.get_mut_or_insert_with($crate::data::CachedDataItemKey::$key $input, move || $crate::data::CachedDataItemValue::$key { value: functor() }) else {
947            unreachable!()
948        };
949        value
950    }};
951    ($task:ident, $key:ident, $f:expr) => {
952        $crate::backend::storage::get_mut_or_insert_with!($task, $key {}, $f)
953    };
954}
955
956/// Creates an iterator over all [`CachedDataItemKey::$key`][crate::data::CachedDataItemKey]s in
957/// `$task` matching the given `$key_pattern`, optional `$value_pattern`, and optional `if $cond`.
958///
959/// Each element in the iterator is determined by `$iter_item`, which may use fields extracted by
960/// `$key_pattern` or `$value_pattern`.
961macro_rules! iter_many {
962    ($task:ident, $key:ident $key_pattern:tt $(if $cond:expr)? => $iter_item:expr) => {{
963        #[allow(unused_imports)]
964        use $crate::backend::operation::TaskGuard;
965        $task
966            .iter($crate::data::CachedDataItemType::$key)
967            .filter_map(|(key, _)| match key {
968                $crate::data::CachedDataItemKey::$key $key_pattern $(if $cond)? => Some(
969                    $iter_item
970                ),
971                _ => None,
972            })
973    }};
974    ($task:ident, $key:ident $input:tt $value_pattern:tt $(if $cond:expr)? => $iter_item:expr) => {{
975        #[allow(unused_imports)]
976        use $crate::backend::operation::TaskGuard;
977        $task
978            .iter($crate::data::CachedDataItemType::$key)
979            .filter_map(|(key, value)| match (key, value) {
980                (
981                    $crate::data::CachedDataItemKey::$key $input,
982                    $crate::data::CachedDataItemValueRef::$key { value: $value_pattern }
983                ) $(if $cond)? => Some($iter_item),
984                _ => None,
985            })
986    }};
987}
988
989/// A thin wrapper around [`iter_many`] that calls [`Iterator::collect`].
990///
991/// Note that the return type of [`Iterator::collect`] may be ambiguous in certain contexts, so
992/// using this macro may require explicit type annotations on variables.
993macro_rules! get_many {
994    ($($args:tt)*) => {
995        $crate::backend::storage::iter_many!($($args)*).collect()
996    };
997}
998
999macro_rules! update {
1000    ($task:ident, $key:ident $input:tt, $update:expr) => {{
1001        #[allow(unused_imports)]
1002        use $crate::backend::operation::TaskGuard;
1003        #[allow(unused_mut)]
1004        let mut update = $update;
1005        $task.update($crate::data::CachedDataItemKey::$key $input, |old| {
1006            update(old.and_then(|old| {
1007                if let $crate::data::CachedDataItemValue::$key { value } = old {
1008                    Some(value)
1009                } else {
1010                    None
1011                }
1012            }))
1013            .map(|new| $crate::data::CachedDataItemValue::$key { value: new })
1014        })
1015    }};
1016    ($task:ident, $key:ident, $update:expr) => {
1017        $crate::backend::storage::update!($task, $key {}, $update)
1018    };
1019}
1020
1021macro_rules! update_count {
1022    ($task:ident, $key:ident $input:tt, -$update:expr) => {{
1023        let update = $update;
1024        let mut state_change = false;
1025        $crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
1026            #[allow(unused_comparisons, reason = "type of update might be unsigned, where update < 0 is always false")]
1027            if let Some(old) = old {
1028                let new = old - update;
1029                state_change = old <= 0 && new > 0 || old > 0 && new <= 0;
1030                (new != 0).then_some(new)
1031            } else {
1032                state_change = update < 0;
1033                (update != 0).then_some(-update)
1034            }
1035        });
1036        state_change
1037    }};
1038    ($task:ident, $key:ident $input:tt, $update:expr) => {
1039        match $update {
1040            update => {
1041                let mut state_change = false;
1042                $crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
1043                    if let Some(old) = old {
1044                        let new = old + update;
1045                        state_change = old <= 0 && new > 0 || old > 0 && new <= 0;
1046                        (new != 0).then_some(new)
1047                    } else {
1048                        state_change = update > 0;
1049                        (update != 0).then_some(update)
1050                    }
1051                });
1052                state_change
1053            }
1054        }
1055    };
1056    ($task:ident, $key:ident, -$update:expr) => {
1057        $crate::backend::storage::update_count!($task, $key {}, -$update)
1058    };    ($task:ident, $key:ident, $update:expr) => {
1059        $crate::backend::storage::update_count!($task, $key {}, $update)
1060    };
1061}
1062
1063macro_rules! remove {
1064    ($task:ident, $key:ident $input:tt) => {{
1065        #[allow(unused_imports)]
1066        use $crate::backend::operation::TaskGuard;
1067        if let Some($crate::data::CachedDataItemValue::$key { value }) = $task.remove(
1068            &$crate::data::CachedDataItemKey::$key $input
1069        ) {
1070            Some(value)
1071        } else {
1072            None
1073        }
1074    }};
1075    ($task:ident, $key:ident) => {
1076        $crate::backend::storage::remove!($task, $key {})
1077    };
1078}
1079
1080pub(crate) use count;
1081pub(crate) use get;
1082pub(crate) use get_many;
1083pub(crate) use get_mut;
1084pub(crate) use get_mut_or_insert_with;
1085pub(crate) use iter_many;
1086pub(crate) use remove;
1087pub(crate) use update;
1088pub(crate) use update_count;
1089
1090pub struct SnapshotGuard<'l> {
1091    storage: &'l Storage,
1092}
1093
1094impl Drop for SnapshotGuard<'_> {
1095    fn drop(&mut self) {
1096        self.storage.end_snapshot();
1097    }
1098}
1099
1100pub struct SnapshotShard<'l, PP, P, PS> {
1101    direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)>,
1102    modified: SmallVec<[TaskId; 4]>,
1103    storage: &'l Storage,
1104    guard: Option<Arc<SnapshotGuard<'l>>>,
1105    process: &'l P,
1106    preprocess: &'l PP,
1107    process_snapshot: &'l PS,
1108}
1109
1110impl<'l, T, R, PP, P, PS> Iterator for SnapshotShard<'l, PP, P, PS>
1111where
1112    PP: for<'a> Fn(TaskId, &'a InnerStorage) -> T + Sync,
1113    P: Fn(TaskId, T) -> R + Sync,
1114    PS: Fn(TaskId, Box<InnerStorageSnapshot>) -> R + Sync,
1115{
1116    type Item = R;
1117
1118    fn next(&mut self) -> Option<Self::Item> {
1119        if let Some((task_id, snapshot)) = self.direct_snapshots.pop() {
1120            return Some((self.process_snapshot)(task_id, snapshot));
1121        }
1122        while let Some(task_id) = self.modified.pop() {
1123            let inner = self.storage.map.get(&task_id).unwrap();
1124            let state = inner.state();
1125            if !state.any_snapshot() {
1126                let preprocessed = (self.preprocess)(task_id, &inner);
1127                drop(inner);
1128                return Some((self.process)(task_id, preprocessed));
1129            } else {
1130                drop(inner);
1131                let maybe_snapshot = {
1132                    let mut modified_state = self.storage.modified.get_mut(&task_id).unwrap();
1133                    let ModifiedState::Snapshot(snapshot) = &mut *modified_state else {
1134                        unreachable!("The snapshot bit was set, so it must be in Snapshot state");
1135                    };
1136                    snapshot.take()
1137                };
1138                if let Some(snapshot) = maybe_snapshot {
1139                    return Some((self.process_snapshot)(task_id, snapshot));
1140                }
1141            }
1142        }
1143        self.guard = None;
1144        None
1145    }
1146}