turbo_tasks_backend/backend/
storage.rs

1use std::{
2    hash::Hash,
3    ops::{Deref, DerefMut},
4    sync::{Arc, atomic::AtomicBool},
5    thread::available_parallelism,
6};
7
8use bitfield::bitfield;
9use smallvec::SmallVec;
10use turbo_tasks::{FxDashMap, TaskId, parallel};
11
12use crate::{
13    backend::dynamic_storage::DynamicStorage,
14    data::{
15        AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
16        CachedDataItemValue, CachedDataItemValueRef, CachedDataItemValueRefMut, OutputValue,
17    },
18    data_storage::{AutoMapStorage, OptionStorage},
19    utils::{
20        dash_map_drop_contents::drop_contents,
21        dash_map_multi::{RefMut, get_multiple_mut},
22    },
23};
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
26pub enum TaskDataCategory {
27    Meta,
28    Data,
29    All,
30}
31
32impl TaskDataCategory {
33    pub fn into_specific(self) -> SpecificTaskDataCategory {
34        match self {
35            TaskDataCategory::Meta => SpecificTaskDataCategory::Meta,
36            TaskDataCategory::Data => SpecificTaskDataCategory::Data,
37            TaskDataCategory::All => unreachable!(),
38        }
39    }
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
43pub enum SpecificTaskDataCategory {
44    Meta,
45    Data,
46}
47
48impl IntoIterator for TaskDataCategory {
49    type Item = TaskDataCategory;
50
51    type IntoIter = TaskDataCategoryIterator;
52
53    fn into_iter(self) -> Self::IntoIter {
54        match self {
55            TaskDataCategory::Meta => TaskDataCategoryIterator::Meta,
56            TaskDataCategory::Data => TaskDataCategoryIterator::Data,
57            TaskDataCategory::All => TaskDataCategoryIterator::All,
58        }
59    }
60}
61
62pub enum TaskDataCategoryIterator {
63    All,
64    Meta,
65    Data,
66    None,
67}
68
69impl Iterator for TaskDataCategoryIterator {
70    type Item = TaskDataCategory;
71
72    fn next(&mut self) -> Option<Self::Item> {
73        match self {
74            TaskDataCategoryIterator::All => {
75                *self = TaskDataCategoryIterator::Data;
76                Some(TaskDataCategory::Meta)
77            }
78            TaskDataCategoryIterator::Meta => {
79                *self = TaskDataCategoryIterator::None;
80                Some(TaskDataCategory::Meta)
81            }
82            TaskDataCategoryIterator::Data => {
83                *self = TaskDataCategoryIterator::None;
84                Some(TaskDataCategory::Data)
85            }
86            TaskDataCategoryIterator::None => None,
87        }
88    }
89}
90
91bitfield! {
92    // Note: Due to alignment in InnerStorage it doesn't matter if this struct is 1 or 4 bytes.
93    #[derive(Clone, Default)]
94    pub struct InnerStorageState(u32);
95    impl Debug;
96    pub meta_restored, set_meta_restored: 0;
97    pub data_restored, set_data_restored: 1;
98    /// Item was modified before snapshot mode was entered.
99    pub meta_modified, set_meta_modified: 2;
100    pub data_modified, set_data_modified: 3;
101    /// Item was modified after snapshot mode was entered. A snapshot was taken.
102    pub meta_snapshot, set_meta_snapshot: 4;
103    pub data_snapshot, set_data_snapshot: 5;
104    /// Prefetched dependencies
105    pub prefetched, set_prefetched: 6;
106}
107
108impl InnerStorageState {
109    pub fn set_restored(&mut self, category: TaskDataCategory) {
110        match category {
111            TaskDataCategory::Meta => {
112                self.set_meta_restored(true);
113            }
114            TaskDataCategory::Data => {
115                self.set_data_restored(true);
116            }
117            TaskDataCategory::All => {
118                self.set_meta_restored(true);
119                self.set_data_restored(true);
120            }
121        }
122    }
123
124    pub fn is_restored(&self, category: TaskDataCategory) -> bool {
125        match category {
126            TaskDataCategory::Meta => self.meta_restored(),
127            TaskDataCategory::Data => self.data_restored(),
128            TaskDataCategory::All => self.meta_restored() && self.data_restored(),
129        }
130    }
131
132    pub fn any_snapshot(&self) -> bool {
133        self.meta_snapshot() || self.data_snapshot()
134    }
135
136    pub fn any_modified(&self) -> bool {
137        self.meta_modified() || self.data_modified()
138    }
139}
140
141pub struct InnerStorageSnapshot {
142    aggregation_number: OptionStorage<AggregationNumber>,
143    output_dependent: AutoMapStorage<TaskId, ()>,
144    output: OptionStorage<OutputValue>,
145    upper: AutoMapStorage<TaskId, u32>,
146    dynamic: DynamicStorage,
147    pub meta_modified: bool,
148    pub data_modified: bool,
149}
150
151impl From<&InnerStorage> for InnerStorageSnapshot {
152    fn from(inner: &InnerStorage) -> Self {
153        Self {
154            aggregation_number: inner.aggregation_number.clone(),
155            output_dependent: inner.output_dependent.clone(),
156            output: inner.output.clone(),
157            upper: inner.upper.clone(),
158            dynamic: inner.dynamic.snapshot_for_persisting(),
159            meta_modified: inner.state.meta_modified(),
160            data_modified: inner.state.data_modified(),
161        }
162    }
163}
164
165impl InnerStorageSnapshot {
166    pub fn iter_all(
167        &self,
168    ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)> {
169        use crate::data_storage::Storage;
170        self.dynamic
171            .iter_all()
172            .chain(self.aggregation_number.iter().map(|(_, value)| {
173                (
174                    CachedDataItemKey::AggregationNumber {},
175                    CachedDataItemValueRef::AggregationNumber { value },
176                )
177            }))
178            .chain(self.output.iter().map(|(_, value)| {
179                (
180                    CachedDataItemKey::Output {},
181                    CachedDataItemValueRef::Output { value },
182                )
183            }))
184            .chain(self.upper.iter().map(|(k, value)| {
185                (
186                    CachedDataItemKey::Upper { task: *k },
187                    CachedDataItemValueRef::Upper { value },
188                )
189            }))
190            .chain(self.output_dependent.iter().map(|(k, value)| {
191                (
192                    CachedDataItemKey::OutputDependent { task: *k },
193                    CachedDataItemValueRef::OutputDependent { value },
194                )
195            }))
196    }
197
198    pub fn len(&self) -> usize {
199        use crate::data_storage::Storage;
200        self.dynamic.len()
201            + self.aggregation_number.len()
202            + self.output.len()
203            + self.upper.len()
204            + self.output_dependent.len()
205    }
206}
207
208#[derive(Debug, Clone)]
209pub struct InnerStorage {
210    aggregation_number: OptionStorage<AggregationNumber>,
211    output_dependent: AutoMapStorage<TaskId, ()>,
212    output: OptionStorage<OutputValue>,
213    upper: AutoMapStorage<TaskId, u32>,
214    dynamic: DynamicStorage,
215    state: InnerStorageState,
216}
217
218impl InnerStorage {
219    fn new() -> Self {
220        Self {
221            aggregation_number: Default::default(),
222            output_dependent: Default::default(),
223            output: Default::default(),
224            upper: Default::default(),
225            dynamic: DynamicStorage::new(),
226            state: InnerStorageState::default(),
227        }
228    }
229
230    pub fn state(&self) -> &InnerStorageState {
231        &self.state
232    }
233
234    pub fn state_mut(&mut self) -> &mut InnerStorageState {
235        &mut self.state
236    }
237}
238
239#[macro_export]
240macro_rules! generate_inner_storage_internal {
241    // Matching on CachedDataItem with a $value
242    (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $key_field:ident => $field:ident,) => {
243        if let CachedDataItem::$tag { $key_field, $value } = $item {
244            let result = $self.$field.$fn($key_field, $($args)*);
245            return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $key_field => $field);
246        }
247    };
248    (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident => $field:ident,) => {
249        if let CachedDataItem::$tag { $value } = $item {
250            let result = $self.$field.$fn((), $($args)*);
251            return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag => $field);
252        }
253    };
254    (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
255        $crate::generate_inner_storage_internal!(CachedDataItem: $self, $item, $value, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
256        $crate::generate_inner_storage_internal!(CachedDataItem: $self, $item, $value, $return_ty, $fn($($args)*): $($config)+)
257    };
258    // Matching on CachedDataItemKey without a $value
259    (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $key_field:ident => $field:ident,) => {
260        if let CachedDataItemKey::$tag { $key_field } = $item {
261            let result = $self.$field.$fn($key_field, $($args)*);
262            return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $key_field => $field);
263        }
264    };
265    (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident => $field:ident,) => {
266        if let CachedDataItemKey::$tag { } = $item {
267            let result = $self.$field.$fn(&(), $($args)*);
268            return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag => $field);
269        }
270    };
271    (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
272        $crate::generate_inner_storage_internal!(CachedDataItemKey: $self, $item, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
273        $crate::generate_inner_storage_internal!(CachedDataItemKey: $self, $item, $return_ty, $fn($($args)*): $($config)+)
274    };
275    // Matching on CachedDataItemType without a $value
276    (CachedDataItemType: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident,) => {
277        if let CachedDataItemType::$tag = $item {
278            let result = $self.$field.$fn($($args)*);
279            return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $($key_field)? => $field);
280        }
281    };
282    (CachedDataItemType: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
283        $crate::generate_inner_storage_internal!(CachedDataItemType: $self, $item, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
284        $crate::generate_inner_storage_internal!(CachedDataItemType: $self, $item, $return_ty, $fn($($args)*): $($config)+)
285    };
286
287    // fn update
288    (update: $self:ident, $key:ident, $update:ident: $tag:ident $key_field:ident => $field:ident,) => {
289        if let CachedDataItemKey::$tag { $key_field } = $key {
290            $self.$field.update($key_field, |old| {
291                let old = old.map(|old| CachedDataItemValue::$tag { value: old });
292                let new = $update(old);
293                new.map(|new| if let CachedDataItemValue::$tag { value } = new {
294                    value
295                } else {
296                    unreachable!()
297                })
298            });
299            return;
300        }
301    };
302    (update: $self:ident, $key:ident, $update:ident: $tag:ident => $field:ident,) => {
303        if let CachedDataItemKey::$tag { } = $key {
304            $self.$field.update((), |old| {
305                let old = old.map(|old| CachedDataItemValue::$tag { value: old });
306                let new = $update(old);
307                new.map(|new| if let CachedDataItemValue::$tag { value } = new {
308                    value
309                } else {
310                    unreachable!()
311                })
312            });
313            return;
314        }
315    };
316    (update: $self:ident, $key:ident, $update:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
317        $crate::generate_inner_storage_internal!(update: $self, $key, $update: $tag $($key_field)? => $field,);
318        $crate::generate_inner_storage_internal!(update: $self, $key, $update: $($config)+)
319    };
320
321    // fn get_mut_or_insert_with
322    (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident $key_field:ident => $field:ident,) => {
323        if let CachedDataItemKey::$tag { $key_field } = $key {
324            let value = $self.$field.get_mut_or_insert_with($key_field, || {
325                let value = $insert_with();
326                if let CachedDataItemValue::$tag { value } = value {
327                    value
328                } else {
329                    unreachable!()
330                }
331            });
332            return CachedDataItemValueRefMut::$tag { value };
333        }
334    };
335    (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident => $field:ident,) => {
336        if let CachedDataItemKey::$tag { } = $key {
337            let value = $self.$field.get_mut_or_insert_with((), || {
338                let value = $insert_with();
339                if let CachedDataItemValue::$tag { value } = value {
340                    value
341                } else {
342                    unreachable!()
343                }
344            });
345            return CachedDataItemValueRefMut::$tag { value };
346        }
347    };
348    (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
349        $crate::generate_inner_storage_internal!(get_mut_or_insert_with: $self, $key, $insert_with: $tag $($key_field)? => $field,);
350        $crate::generate_inner_storage_internal!(get_mut_or_insert_with: $self, $key, $insert_with: $($config)+)
351    };
352
353    // fn extract_if
354    (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident $key_field:ident => $field:ident,) => {
355        if let CachedDataItemType::$tag = $ty {
356            let iter = $self.$field.extract_if(move |key, value| {
357                $f(CachedDataItemKey::$tag { $key_field: *key }, CachedDataItemValueRef::$tag { value })
358            }).map(|($key_field, value)| CachedDataItem::$tag { $key_field, value });
359            return InnerStorageIter::$tag(iter);
360        }
361    };
362    (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident => $field:ident,) => {
363        if let CachedDataItemType::$tag = $ty {
364            let iter = $self.$field.extract_if(move |_, value| {
365                $f(CachedDataItemKey::$tag { }, CachedDataItemValueRef::$tag { value })
366            }).map(|(_, value)| CachedDataItem::$tag { value });
367            return InnerStorageIter::$tag(iter);
368        }
369    };
370    (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
371        $crate::generate_inner_storage_internal!(extract_if: $self, $ty, $f: $tag $($key_field)? => $field,);
372        $crate::generate_inner_storage_internal!(extract_if: $self, $ty, $f: $($config)+)
373    };
374
375    // fn iter
376    (iter: $self:ident, $ty:ident: $tag:ident $key_field:ident => $field:ident,) => {
377        if let CachedDataItemType::$tag = $ty {
378            let iter = $self.$field.iter().map(|($key_field, value)| (CachedDataItemKey::$tag { $key_field: *$key_field }, CachedDataItemValueRef::$tag { value }));
379            return InnerStorageIter::$tag(iter);
380        }
381    };
382    (iter: $self:ident, $ty:ident: $tag:ident => $field:ident,) => {
383        if let CachedDataItemType::$tag = $ty {
384            let iter = $self.$field.iter().map(|(_, value)| (CachedDataItemKey::$tag { }, CachedDataItemValueRef::$tag { value }));
385            return InnerStorageIter::$tag(iter);
386        }
387    };
388    (iter: $self:ident, $ty:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
389        $crate::generate_inner_storage_internal!(iter: $self, $ty: $tag $($key_field)? => $field,);
390        $crate::generate_inner_storage_internal!(iter: $self, $ty: $($config)+)
391    };
392
393
394    // Return value handling
395    (return_value: $result:ident, none: $($more:tt)*) => {
396        $result
397    };
398    (return_value: $result:ident, option_value: $tag:ident $($more:tt)*) => {
399        $result.map(|value| CachedDataItemValue::$tag { value })
400    };
401    (return_value: $result:ident, option_ref: $tag:ident $($more:tt)*) => {
402        $result.map(|value| CachedDataItemValueRef::$tag { value })
403    };
404    (return_value: $result:ident, option_ref_mut: $tag:ident $($more:tt)*) => {
405        $result.map(|value| CachedDataItemValueRefMut::$tag { value })
406    };
407
408    // Input value handling
409    (input_value: $input:ident, option_value: $tag:ident $($more:tt)*) => {
410        $input.map(|value| {
411            if let CachedDataItemValue::$tag { value } = value {
412                value
413            } else {
414                unreachable!()
415            }
416        })
417    };
418
419}
420
421macro_rules! generate_inner_storage {
422    ($($config:tt)*) => {
423        impl InnerStorage {
424            pub fn add(&mut self, item: CachedDataItem) -> bool {
425                use crate::data_storage::Storage;
426                $crate::generate_inner_storage_internal!(CachedDataItem: self, item, value, none, add(value): $($config)*);
427                self.dynamic.add(item)
428            }
429
430            pub fn insert(&mut self, item: CachedDataItem) -> Option<CachedDataItemValue> {
431                use crate::data_storage::Storage;
432                $crate::generate_inner_storage_internal!(CachedDataItem: self, item, value, option_value, insert(value): $($config)*);
433                self.dynamic.insert(item)
434            }
435
436            pub fn remove(&mut self, key: &CachedDataItemKey) -> Option<CachedDataItemValue> {
437                use crate::data_storage::Storage;
438                $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_value, remove(): $($config)*);
439                self.dynamic.remove(key)
440            }
441
442            pub fn count(&self, ty: CachedDataItemType) -> usize {
443                use crate::data_storage::Storage;
444                $crate::generate_inner_storage_internal!(CachedDataItemType: self, ty, none, len(): $($config)*);
445                self.dynamic.count(ty)
446            }
447
448            pub fn get(&self, key: &CachedDataItemKey) -> Option<CachedDataItemValueRef<'_>> {
449                use crate::data_storage::Storage;
450                $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_ref, get(): $($config)*);
451                self.dynamic.get(key)
452            }
453
454            pub fn contains_key(&self, key: &CachedDataItemKey) -> bool {
455                use crate::data_storage::Storage;
456                $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, none, contains_key(): $($config)*);
457                self.dynamic.contains_key(key)
458            }
459
460            pub fn get_mut(&mut self, key: &CachedDataItemKey) -> Option<CachedDataItemValueRefMut<'_>> {
461                use crate::data_storage::Storage;
462                $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_ref_mut, get_mut(): $($config)*);
463                self.dynamic.get_mut(key)
464            }
465
466            pub fn shrink_to_fit(&mut self, ty: CachedDataItemType) {
467                use crate::data_storage::Storage;
468                $crate::generate_inner_storage_internal!(CachedDataItemType: self, ty, none, shrink_to_fit(): $($config)*);
469                self.dynamic.shrink_to_fit(ty)
470            }
471
472            pub fn update(
473                &mut self,
474                key: CachedDataItemKey,
475                update: impl FnOnce(Option<CachedDataItemValue>) -> Option<CachedDataItemValue>,
476            ) {
477                use crate::data_storage::Storage;
478                $crate::generate_inner_storage_internal!(update: self, key, update: $($config)*);
479                self.dynamic.update(key, update)
480            }
481
482            pub fn extract_if<'l, F>(
483                &'l mut self,
484                ty: CachedDataItemType,
485                mut f: F,
486            ) -> impl Iterator<Item = CachedDataItem> + use<'l, F>
487            where
488                F: for<'a> FnMut(CachedDataItemKey, CachedDataItemValueRef<'a>) -> bool + 'l,
489            {
490                use crate::data_storage::Storage;
491                $crate::generate_inner_storage_internal!(extract_if: self, ty, f: $($config)*);
492                InnerStorageIter::Dynamic(self.dynamic.extract_if(ty, f))
493            }
494
495            pub fn get_mut_or_insert_with(
496                &mut self,
497                key: CachedDataItemKey,
498                f: impl FnOnce() -> CachedDataItemValue,
499            ) -> CachedDataItemValueRefMut<'_>
500            {
501                use crate::data_storage::Storage;
502                $crate::generate_inner_storage_internal!(get_mut_or_insert_with: self, key, f: $($config)*);
503                self.dynamic.get_mut_or_insert_with(key, f)
504            }
505
506            pub fn iter(
507                &self,
508                ty: CachedDataItemType,
509            ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)>
510            {
511                use crate::data_storage::Storage;
512                $crate::generate_inner_storage_internal!(iter: self, ty: $($config)*);
513                InnerStorageIter::Dynamic(self.dynamic.iter(ty))
514            }
515
516        }
517    };
518}
519
520generate_inner_storage!(
521    AggregationNumber => aggregation_number,
522    OutputDependent task => output_dependent,
523    Output => output,
524    Upper task => upper,
525);
526
527enum InnerStorageIter<A, B, C, D, E> {
528    AggregationNumber(A),
529    OutputDependent(B),
530    Output(C),
531    Upper(D),
532    Dynamic(E),
533}
534
535impl<T, A, B, C, D, E> Iterator for InnerStorageIter<A, B, C, D, E>
536where
537    A: Iterator<Item = T>,
538    B: Iterator<Item = T>,
539    C: Iterator<Item = T>,
540    D: Iterator<Item = T>,
541    E: Iterator<Item = T>,
542{
543    type Item = T;
544
545    fn next(&mut self) -> Option<Self::Item> {
546        match self {
547            InnerStorageIter::AggregationNumber(iter) => iter.next(),
548            InnerStorageIter::OutputDependent(iter) => iter.next(),
549            InnerStorageIter::Output(iter) => iter.next(),
550            InnerStorageIter::Upper(iter) => iter.next(),
551            InnerStorageIter::Dynamic(iter) => iter.next(),
552        }
553    }
554}
555
556impl InnerStorage {
557    pub fn iter_all(
558        &self,
559    ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)> {
560        use crate::data_storage::Storage;
561        self.dynamic
562            .iter_all()
563            .chain(self.aggregation_number.iter().map(|(_, value)| {
564                (
565                    CachedDataItemKey::AggregationNumber {},
566                    CachedDataItemValueRef::AggregationNumber { value },
567                )
568            }))
569            .chain(self.output.iter().map(|(_, value)| {
570                (
571                    CachedDataItemKey::Output {},
572                    CachedDataItemValueRef::Output { value },
573                )
574            }))
575            .chain(self.upper.iter().map(|(k, value)| {
576                (
577                    CachedDataItemKey::Upper { task: *k },
578                    CachedDataItemValueRef::Upper { value },
579                )
580            }))
581            .chain(self.output_dependent.iter().map(|(k, value)| {
582                (
583                    CachedDataItemKey::OutputDependent { task: *k },
584                    CachedDataItemValueRef::OutputDependent { value },
585                )
586            }))
587    }
588
589    pub fn len(&self) -> usize {
590        use crate::data_storage::Storage;
591        self.dynamic.len()
592            + self.aggregation_number.len()
593            + self.output.len()
594            + self.upper.len()
595            + self.output_dependent.len()
596    }
597}
598
599enum ModifiedState {
600    /// It was modified before snapshot mode was entered, but it was not accessed during snapshot
601    /// mode.
602    Modified,
603    /// Snapshot(Some):
604    /// It was modified before snapshot mode was entered and it was accessed again during snapshot
605    /// mode. A copy of the version of the item when snapshot mode was entered is stored here.
606    /// Snapshot(None):
607    /// It was not modified before snapshot mode was entered, but it was accessed during snapshot
608    /// mode. Or the snapshot was already taken out by the snapshot operation.
609    Snapshot(Option<Box<InnerStorageSnapshot>>),
610}
611
612pub struct Storage {
613    snapshot_mode: AtomicBool,
614    modified: FxDashMap<TaskId, ModifiedState>,
615    map: FxDashMap<TaskId, Box<InnerStorage>>,
616}
617
618impl Storage {
619    pub fn new(small_preallocation: bool) -> Self {
620        let map_capacity: usize = if small_preallocation {
621            1024
622        } else {
623            1024 * 1024
624        };
625        let modified_capacity: usize = if small_preallocation { 0 } else { 1024 };
626        let shard_factor: usize = if small_preallocation { 4 } else { 64 };
627
628        let shard_amount =
629            (available_parallelism().map_or(4, |v| v.get()) * shard_factor).next_power_of_two();
630
631        Self {
632            snapshot_mode: AtomicBool::new(false),
633            modified: FxDashMap::with_capacity_and_hasher_and_shard_amount(
634                modified_capacity,
635                Default::default(),
636                shard_amount,
637            ),
638            map: FxDashMap::with_capacity_and_hasher_and_shard_amount(
639                map_capacity,
640                Default::default(),
641                shard_amount,
642            ),
643        }
644    }
645
646    /// Processes every modified item (resp. a snapshot of it) with the given functions and returns
647    /// the results. Ends snapshot mode afterwards.
648    /// preprocess is potentially called within a lock, so it should be fast.
649    /// process is called outside of locks, so it could do more expensive operations.
650    pub fn take_snapshot<
651        'l,
652        T,
653        R,
654        PP: for<'a> Fn(TaskId, &'a InnerStorage) -> T + Sync,
655        P: Fn(TaskId, T) -> R + Sync,
656        PS: Fn(TaskId, Box<InnerStorageSnapshot>) -> R + Sync,
657    >(
658        &'l self,
659        preprocess: &'l PP,
660        process: &'l P,
661        process_snapshot: &'l PS,
662    ) -> Vec<SnapshotShard<'l, PP, P, PS>> {
663        if !self.snapshot_mode() {
664            self.start_snapshot();
665        }
666
667        let guard = Arc::new(SnapshotGuard { storage: self });
668
669        // The number of shards is much larger than the number of threads, so the effect of the
670        // locks held is negligible.
671        parallel::map_collect::<_, _, Vec<_>>(self.modified.shards(), |shard| {
672            let mut direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)> = Vec::new();
673            let mut modified: SmallVec<[TaskId; 4]> = SmallVec::new();
674            {
675                // Take the snapshots from the modified map
676                let guard = shard.write();
677                // Safety: guard must outlive the iterator.
678                for bucket in unsafe { guard.iter() } {
679                    // Safety: the guard guarantees that the bucket is not removed and the ptr
680                    // is valid.
681                    let (key, shared_value) = unsafe { bucket.as_mut() };
682                    let modified_state = shared_value.get_mut();
683                    match modified_state {
684                        ModifiedState::Modified => {
685                            modified.push(*key);
686                        }
687                        ModifiedState::Snapshot(snapshot) => {
688                            if let Some(snapshot) = snapshot.take() {
689                                direct_snapshots.push((*key, snapshot));
690                            }
691                        }
692                    }
693                }
694                // Safety: guard must outlive the iterator.
695                drop(guard);
696            }
697
698            SnapshotShard {
699                direct_snapshots,
700                modified,
701                storage: self,
702                guard: Some(guard.clone()),
703                process,
704                preprocess,
705                process_snapshot,
706            }
707        })
708    }
709
710    /// Start snapshot mode.
711    pub fn start_snapshot(&self) {
712        self.snapshot_mode
713            .store(true, std::sync::atomic::Ordering::Release);
714    }
715
716    /// End snapshot mode.
717    /// Items that have snapshots will be kept as modified since they have been accessed during the
718    /// snapshot mode. Items that are modified will be removed and considered as unmodified.
719    /// When items are accessed in future they will be marked as modified.
720    fn end_snapshot(&self) {
721        // We are still in snapshot mode, so all accessed items would be stored as snapshot.
722        // This means we can start by removing all modified items.
723        let mut removed_modified = Vec::new();
724        self.modified.retain(|key, inner| {
725            if matches!(inner, ModifiedState::Modified) {
726                removed_modified.push(*key);
727                false
728            } else {
729                true
730            }
731        });
732
733        // We also need to unset all the modified flags.
734        for key in removed_modified {
735            if let Some(mut inner) = self.map.get_mut(&key) {
736                let state = inner.state_mut();
737                state.set_data_modified(false);
738                state.set_meta_modified(false);
739            }
740        }
741
742        // Now modified only contains snapshots.
743        // We leave snapshot mode. Any access would be stored as modified and not as snapshot.
744        self.snapshot_mode
745            .store(false, std::sync::atomic::Ordering::Release);
746
747        // We can change all the snapshots to modified now.
748        let mut removed_snapshots = Vec::new();
749        for mut item in self.modified.iter_mut() {
750            match item.value() {
751                ModifiedState::Snapshot(_) => {
752                    removed_snapshots.push(*item.key());
753                    *item.value_mut() = ModifiedState::Modified;
754                }
755                ModifiedState::Modified => {
756                    // This means it was concurrently modified.
757                    // It's already in the correct state.
758                }
759            }
760        }
761
762        // And update the flags
763        for key in removed_snapshots {
764            if let Some(mut inner) = self.map.get_mut(&key) {
765                let state = inner.state_mut();
766                if state.meta_snapshot() {
767                    state.set_meta_snapshot(false);
768                    state.set_meta_modified(true);
769                }
770                if state.data_snapshot() {
771                    state.set_data_snapshot(false);
772                    state.set_data_modified(true);
773                }
774            }
775        }
776
777        // Remove excessive capacity in modified
778        self.modified.shrink_to_fit();
779    }
780
781    fn snapshot_mode(&self) -> bool {
782        self.snapshot_mode
783            .load(std::sync::atomic::Ordering::Acquire)
784    }
785
786    pub fn access_mut(&self, key: TaskId) -> StorageWriteGuard<'_> {
787        let inner = match self.map.entry(key) {
788            dashmap::mapref::entry::Entry::Occupied(e) => e.into_ref(),
789            dashmap::mapref::entry::Entry::Vacant(e) => e.insert(Box::new(InnerStorage::new())),
790        };
791        StorageWriteGuard {
792            storage: self,
793            inner: inner.into(),
794        }
795    }
796
797    pub fn access_pair_mut(
798        &self,
799        key1: TaskId,
800        key2: TaskId,
801    ) -> (StorageWriteGuard<'_>, StorageWriteGuard<'_>) {
802        let (a, b) = get_multiple_mut(&self.map, key1, key2, || Box::new(InnerStorage::new()));
803        (
804            StorageWriteGuard {
805                storage: self,
806                inner: a,
807            },
808            StorageWriteGuard {
809                storage: self,
810                inner: b,
811            },
812        )
813    }
814
815    pub fn drop_contents(&self) {
816        drop_contents(&self.map);
817        drop_contents(&self.modified);
818    }
819}
820
821pub struct StorageWriteGuard<'a> {
822    storage: &'a Storage,
823    inner: RefMut<'a, TaskId, Box<InnerStorage>>,
824}
825
826impl StorageWriteGuard<'_> {
827    /// Tracks mutation of this task
828    pub fn track_modification(&mut self, category: SpecificTaskDataCategory) {
829        let state = self.inner.state();
830        let snapshot = match category {
831            SpecificTaskDataCategory::Meta => state.meta_snapshot(),
832            SpecificTaskDataCategory::Data => state.data_snapshot(),
833        };
834        if !snapshot {
835            let modified = match category {
836                SpecificTaskDataCategory::Meta => state.meta_modified(),
837                SpecificTaskDataCategory::Data => state.data_modified(),
838            };
839            match (self.storage.snapshot_mode(), modified) {
840                (false, false) => {
841                    // Not in snapshot mode and item is unmodified
842                    if !state.any_snapshot() && !state.any_modified() {
843                        self.storage
844                            .modified
845                            .insert(*self.inner.key(), ModifiedState::Modified);
846                    }
847                    let state = self.inner.state_mut();
848                    match category {
849                        SpecificTaskDataCategory::Meta => state.set_meta_modified(true),
850                        SpecificTaskDataCategory::Data => state.set_data_modified(true),
851                    }
852                }
853                (false, true) => {
854                    // Not in snapshot mode and item is already modified
855                    // Do nothing
856                }
857                (true, false) => {
858                    // In snapshot mode and item is unmodified (so it's not part of the snapshot)
859                    if !state.any_snapshot() {
860                        self.storage
861                            .modified
862                            .insert(*self.inner.key(), ModifiedState::Snapshot(None));
863                    }
864                    let state = self.inner.state_mut();
865                    match category {
866                        SpecificTaskDataCategory::Meta => state.set_meta_snapshot(true),
867                        SpecificTaskDataCategory::Data => state.set_data_snapshot(true),
868                    }
869                }
870                (true, true) => {
871                    // In snapshot mode and item is modified (so it's part of the snapshot)
872                    // We need to store the original version that is part of the snapshot
873                    if !state.any_snapshot() {
874                        self.storage.modified.insert(
875                            *self.inner.key(),
876                            ModifiedState::Snapshot(Some(Box::new((&**self.inner).into()))),
877                        );
878                    }
879                    let state = self.inner.state_mut();
880                    match category {
881                        SpecificTaskDataCategory::Meta => state.set_meta_snapshot(true),
882                        SpecificTaskDataCategory::Data => state.set_data_snapshot(true),
883                    }
884                }
885            }
886        }
887    }
888}
889
890impl Deref for StorageWriteGuard<'_> {
891    type Target = InnerStorage;
892
893    fn deref(&self) -> &Self::Target {
894        &self.inner
895    }
896}
897
898impl DerefMut for StorageWriteGuard<'_> {
899    fn deref_mut(&mut self) -> &mut Self::Target {
900        &mut self.inner
901    }
902}
903
904macro_rules! count {
905    ($task:ident, $key:ident) => {{ $task.count($crate::data::CachedDataItemType::$key) }};
906}
907
908macro_rules! get {
909    ($task:ident, $key:ident $input:tt) => {{
910        #[allow(unused_imports)]
911        use $crate::backend::operation::TaskGuard;
912        if let Some($crate::data::CachedDataItemValueRef::$key {
913            value,
914        }) = $task.get(&$crate::data::CachedDataItemKey::$key $input) {
915            Some(value)
916        } else {
917            None
918        }
919    }};
920    ($task:ident, $key:ident) => {
921        $crate::backend::storage::get!($task, $key {})
922    };
923}
924
925macro_rules! get_mut {
926    ($task:ident, $key:ident $input:tt) => {{
927        #[allow(unused_imports)]
928        use $crate::backend::operation::TaskGuard;
929        if let Some($crate::data::CachedDataItemValueRefMut::$key {
930            value,
931        }) = $task.get_mut(&$crate::data::CachedDataItemKey::$key $input) {
932            let () = $crate::data::allow_mut_access::$key;
933            Some(value)
934        } else {
935            None
936        }
937    }};
938    ($task:ident, $key:ident) => {
939        $crate::backend::storage::get_mut!($task, $key {})
940    };
941}
942
943macro_rules! get_mut_or_insert_with {
944    ($task:ident, $key:ident $input:tt, $f:expr) => {{
945        #[allow(unused_imports)]
946        use $crate::backend::operation::TaskGuard;
947        let () = $crate::data::allow_mut_access::$key;
948        let functor = $f;
949        let $crate::data::CachedDataItemValueRefMut::$key {
950            value,
951        } = $task.get_mut_or_insert_with($crate::data::CachedDataItemKey::$key $input, move || $crate::data::CachedDataItemValue::$key { value: functor() }) else {
952            unreachable!()
953        };
954        value
955    }};
956    ($task:ident, $key:ident, $f:expr) => {
957        $crate::backend::storage::get_mut_or_insert_with!($task, $key {}, $f)
958    };
959}
960
961/// Creates an iterator over all [`CachedDataItemKey::$key`][crate::data::CachedDataItemKey]s in
962/// `$task` matching the given `$key_pattern`, optional `$value_pattern`, and optional `if $cond`.
963///
964/// Each element in the iterator is determined by `$iter_item`, which may use fields extracted by
965/// `$key_pattern` or `$value_pattern`.
966macro_rules! iter_many {
967    ($task:ident, $key:ident $key_pattern:tt $(if $cond:expr)? => $iter_item:expr) => {{
968        #[allow(unused_imports)]
969        use $crate::backend::operation::TaskGuard;
970        $task
971            .iter($crate::data::CachedDataItemType::$key)
972            .filter_map(|(key, _)| match key {
973                $crate::data::CachedDataItemKey::$key $key_pattern $(if $cond)? => Some(
974                    $iter_item
975                ),
976                _ => None,
977            })
978    }};
979    ($task:ident, $key:ident $input:tt $value_pattern:tt $(if $cond:expr)? => $iter_item:expr) => {{
980        #[allow(unused_imports)]
981        use $crate::backend::operation::TaskGuard;
982        $task
983            .iter($crate::data::CachedDataItemType::$key)
984            .filter_map(|(key, value)| match (key, value) {
985                (
986                    $crate::data::CachedDataItemKey::$key $input,
987                    $crate::data::CachedDataItemValueRef::$key { value: $value_pattern }
988                ) $(if $cond)? => Some($iter_item),
989                _ => None,
990            })
991    }};
992}
993
994/// A thin wrapper around [`iter_many`] that calls [`Iterator::collect`].
995///
996/// Note that the return type of [`Iterator::collect`] may be ambiguous in certain contexts, so
997/// using this macro may require explicit type annotations on variables.
998macro_rules! get_many {
999    ($($args:tt)*) => {
1000        $crate::backend::storage::iter_many!($($args)*).collect()
1001    };
1002}
1003
1004macro_rules! update {
1005    ($task:ident, $key:ident $input:tt, $update:expr) => {{
1006        #[allow(unused_imports)]
1007        use $crate::backend::operation::TaskGuard;
1008        #[allow(unused_mut)]
1009        let mut update = $update;
1010        $task.update($crate::data::CachedDataItemKey::$key $input, |old| {
1011            update(old.and_then(|old| {
1012                if let $crate::data::CachedDataItemValue::$key { value } = old {
1013                    Some(value)
1014                } else {
1015                    None
1016                }
1017            }))
1018            .map(|new| $crate::data::CachedDataItemValue::$key { value: new })
1019        })
1020    }};
1021    ($task:ident, $key:ident, $update:expr) => {
1022        $crate::backend::storage::update!($task, $key {}, $update)
1023    };
1024}
1025
1026macro_rules! update_count {
1027    ($task:ident, $key:ident $input:tt, -$update:expr) => {{
1028        let update = $update;
1029        let mut state_change = false;
1030        $crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
1031            #[allow(unused_comparisons, reason = "type of update might be unsigned, where update < 0 is always false")]
1032            if let Some(old) = old {
1033                let new = old - update;
1034                state_change = old <= 0 && new > 0 || old > 0 && new <= 0;
1035                (new != 0).then_some(new)
1036            } else {
1037                state_change = update < 0;
1038                (update != 0).then_some(-update)
1039            }
1040        });
1041        state_change
1042    }};
1043    ($task:ident, $key:ident $input:tt, $update:expr) => {
1044        match $update {
1045            update => {
1046                let mut state_change = false;
1047                $crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
1048                    if let Some(old) = old {
1049                        let new = old + update;
1050                        state_change = old <= 0 && new > 0 || old > 0 && new <= 0;
1051                        (new != 0).then_some(new)
1052                    } else {
1053                        state_change = update > 0;
1054                        (update != 0).then_some(update)
1055                    }
1056                });
1057                state_change
1058            }
1059        }
1060    };
1061    ($task:ident, $key:ident, -$update:expr) => {
1062        $crate::backend::storage::update_count!($task, $key {}, -$update)
1063    };    ($task:ident, $key:ident, $update:expr) => {
1064        $crate::backend::storage::update_count!($task, $key {}, $update)
1065    };
1066}
1067
1068macro_rules! remove {
1069    ($task:ident, $key:ident $input:tt) => {{
1070        #[allow(unused_imports)]
1071        use $crate::backend::operation::TaskGuard;
1072        if let Some($crate::data::CachedDataItemValue::$key { value }) = $task.remove(
1073            &$crate::data::CachedDataItemKey::$key $input
1074        ) {
1075            Some(value)
1076        } else {
1077            None
1078        }
1079    }};
1080    ($task:ident, $key:ident) => {
1081        $crate::backend::storage::remove!($task, $key {})
1082    };
1083}
1084
1085pub(crate) use count;
1086pub(crate) use get;
1087pub(crate) use get_many;
1088pub(crate) use get_mut;
1089pub(crate) use get_mut_or_insert_with;
1090pub(crate) use iter_many;
1091pub(crate) use remove;
1092pub(crate) use update;
1093pub(crate) use update_count;
1094
1095pub struct SnapshotGuard<'l> {
1096    storage: &'l Storage,
1097}
1098
1099impl Drop for SnapshotGuard<'_> {
1100    fn drop(&mut self) {
1101        self.storage.end_snapshot();
1102    }
1103}
1104
1105pub struct SnapshotShard<'l, PP, P, PS> {
1106    direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)>,
1107    modified: SmallVec<[TaskId; 4]>,
1108    storage: &'l Storage,
1109    guard: Option<Arc<SnapshotGuard<'l>>>,
1110    process: &'l P,
1111    preprocess: &'l PP,
1112    process_snapshot: &'l PS,
1113}
1114
1115impl<'l, T, R, PP, P, PS> Iterator for SnapshotShard<'l, PP, P, PS>
1116where
1117    PP: for<'a> Fn(TaskId, &'a InnerStorage) -> T + Sync,
1118    P: Fn(TaskId, T) -> R + Sync,
1119    PS: Fn(TaskId, Box<InnerStorageSnapshot>) -> R + Sync,
1120{
1121    type Item = R;
1122
1123    fn next(&mut self) -> Option<Self::Item> {
1124        if let Some((task_id, snapshot)) = self.direct_snapshots.pop() {
1125            return Some((self.process_snapshot)(task_id, snapshot));
1126        }
1127        while let Some(task_id) = self.modified.pop() {
1128            let inner = self.storage.map.get(&task_id).unwrap();
1129            let state = inner.state();
1130            if !state.any_snapshot() {
1131                let preprocessed = (self.preprocess)(task_id, &inner);
1132                drop(inner);
1133                return Some((self.process)(task_id, preprocessed));
1134            } else {
1135                drop(inner);
1136                let maybe_snapshot = {
1137                    let mut modified_state = self.storage.modified.get_mut(&task_id).unwrap();
1138                    let ModifiedState::Snapshot(snapshot) = &mut *modified_state else {
1139                        unreachable!("The snapshot bit was set, so it must be in Snapshot state");
1140                    };
1141                    snapshot.take()
1142                };
1143                if let Some(snapshot) = maybe_snapshot {
1144                    return Some((self.process_snapshot)(task_id, snapshot));
1145                }
1146            }
1147        }
1148        self.guard = None;
1149        None
1150    }
1151}