turbo_tasks_backend/backend/
storage.rs

1use std::{
2    hash::Hash,
3    ops::{Deref, DerefMut},
4    sync::{Arc, atomic::AtomicBool},
5    thread::available_parallelism,
6};
7
8use bitfield::bitfield;
9use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
10use turbo_tasks::{FxDashMap, TaskId};
11
12use crate::{
13    backend::dynamic_storage::DynamicStorage,
14    data::{
15        AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
16        CachedDataItemValue, CachedDataItemValueRef, CachedDataItemValueRefMut, OutputValue,
17    },
18    data_storage::{AutoMapStorage, OptionStorage},
19    utils::dash_map_multi::{RefMut, get_multiple_mut},
20};
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
23pub enum TaskDataCategory {
24    Meta,
25    Data,
26    All,
27}
28
29impl TaskDataCategory {
30    pub fn into_specific(self) -> SpecificTaskDataCategory {
31        match self {
32            TaskDataCategory::Meta => SpecificTaskDataCategory::Meta,
33            TaskDataCategory::Data => SpecificTaskDataCategory::Data,
34            TaskDataCategory::All => unreachable!(),
35        }
36    }
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
40pub enum SpecificTaskDataCategory {
41    Meta,
42    Data,
43}
44
45impl IntoIterator for TaskDataCategory {
46    type Item = TaskDataCategory;
47
48    type IntoIter = TaskDataCategoryIterator;
49
50    fn into_iter(self) -> Self::IntoIter {
51        match self {
52            TaskDataCategory::Meta => TaskDataCategoryIterator::Meta,
53            TaskDataCategory::Data => TaskDataCategoryIterator::Data,
54            TaskDataCategory::All => TaskDataCategoryIterator::All,
55        }
56    }
57}
58
59pub enum TaskDataCategoryIterator {
60    All,
61    Meta,
62    Data,
63    None,
64}
65
66impl Iterator for TaskDataCategoryIterator {
67    type Item = TaskDataCategory;
68
69    fn next(&mut self) -> Option<Self::Item> {
70        match self {
71            TaskDataCategoryIterator::All => {
72                *self = TaskDataCategoryIterator::Data;
73                Some(TaskDataCategory::Meta)
74            }
75            TaskDataCategoryIterator::Meta => {
76                *self = TaskDataCategoryIterator::None;
77                Some(TaskDataCategory::Meta)
78            }
79            TaskDataCategoryIterator::Data => {
80                *self = TaskDataCategoryIterator::None;
81                Some(TaskDataCategory::Data)
82            }
83            TaskDataCategoryIterator::None => None,
84        }
85    }
86}
87
88bitfield! {
89    // Note: Due to alignment in InnerStorage it doesn't matter if this struct is 1 or 4 bytes.
90    #[derive(Clone, Default)]
91    pub struct InnerStorageState(u32);
92    impl Debug;
93    pub meta_restored, set_meta_restored: 0;
94    pub data_restored, set_data_restored: 1;
95    /// Item was modified before snapshot mode was entered.
96    pub meta_modified, set_meta_modified: 2;
97    pub data_modified, set_data_modified: 3;
98    /// Item was modified after snapshot mode was entered. A snapshot was taken.
99    pub meta_snapshot, set_meta_snapshot: 4;
100    pub data_snapshot, set_data_snapshot: 4;
101}
102
103impl InnerStorageState {
104    pub fn set_restored(&mut self, category: TaskDataCategory) {
105        match category {
106            TaskDataCategory::Meta => {
107                self.set_meta_restored(true);
108            }
109            TaskDataCategory::Data => {
110                self.set_data_restored(true);
111            }
112            TaskDataCategory::All => {
113                self.set_meta_restored(true);
114                self.set_data_restored(true);
115            }
116        }
117    }
118
119    pub fn is_restored(&self, category: TaskDataCategory) -> bool {
120        match category {
121            TaskDataCategory::Meta => self.meta_restored(),
122            TaskDataCategory::Data => self.data_restored(),
123            TaskDataCategory::All => self.meta_restored() && self.data_restored(),
124        }
125    }
126
127    pub fn any_snapshot(&self) -> bool {
128        self.meta_snapshot() || self.data_snapshot()
129    }
130
131    pub fn any_modified(&self) -> bool {
132        self.meta_modified() || self.data_modified()
133    }
134}
135
136pub struct InnerStorageSnapshot {
137    aggregation_number: OptionStorage<AggregationNumber>,
138    output_dependent: AutoMapStorage<TaskId, ()>,
139    output: OptionStorage<OutputValue>,
140    upper: AutoMapStorage<TaskId, i32>,
141    dynamic: DynamicStorage,
142    pub meta_modified: bool,
143    pub data_modified: bool,
144}
145
146impl From<&InnerStorage> for InnerStorageSnapshot {
147    fn from(inner: &InnerStorage) -> Self {
148        Self {
149            aggregation_number: inner.aggregation_number.clone(),
150            output_dependent: inner.output_dependent.clone(),
151            output: inner.output.clone(),
152            upper: inner.upper.clone(),
153            dynamic: inner.dynamic.snapshot_for_persisting(),
154            meta_modified: inner.state.meta_modified(),
155            data_modified: inner.state.data_modified(),
156        }
157    }
158}
159
160impl InnerStorageSnapshot {
161    pub fn iter_all(
162        &self,
163    ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)> {
164        use crate::data_storage::Storage;
165        self.dynamic
166            .iter_all()
167            .chain(self.aggregation_number.iter().map(|(_, value)| {
168                (
169                    CachedDataItemKey::AggregationNumber {},
170                    CachedDataItemValueRef::AggregationNumber { value },
171                )
172            }))
173            .chain(self.output.iter().map(|(_, value)| {
174                (
175                    CachedDataItemKey::Output {},
176                    CachedDataItemValueRef::Output { value },
177                )
178            }))
179            .chain(self.upper.iter().map(|(k, value)| {
180                (
181                    CachedDataItemKey::Upper { task: *k },
182                    CachedDataItemValueRef::Upper { value },
183                )
184            }))
185            .chain(self.output_dependent.iter().map(|(k, value)| {
186                (
187                    CachedDataItemKey::OutputDependent { task: *k },
188                    CachedDataItemValueRef::OutputDependent { value },
189                )
190            }))
191    }
192
193    pub fn len(&self) -> usize {
194        use crate::data_storage::Storage;
195        self.dynamic.len()
196            + self.aggregation_number.len()
197            + self.output.len()
198            + self.upper.len()
199            + self.output_dependent.len()
200    }
201}
202
203#[derive(Debug, Clone)]
204pub struct InnerStorage {
205    aggregation_number: OptionStorage<AggregationNumber>,
206    output_dependent: AutoMapStorage<TaskId, ()>,
207    output: OptionStorage<OutputValue>,
208    upper: AutoMapStorage<TaskId, i32>,
209    dynamic: DynamicStorage,
210    state: InnerStorageState,
211}
212
213impl InnerStorage {
214    fn new() -> Self {
215        Self {
216            aggregation_number: Default::default(),
217            output_dependent: Default::default(),
218            output: Default::default(),
219            upper: Default::default(),
220            dynamic: DynamicStorage::new(),
221            state: InnerStorageState::default(),
222        }
223    }
224
225    pub fn state(&self) -> &InnerStorageState {
226        &self.state
227    }
228
229    pub fn state_mut(&mut self) -> &mut InnerStorageState {
230        &mut self.state
231    }
232}
233
234#[macro_export]
235macro_rules! generate_inner_storage_internal {
236    // Matching on CachedDataItem with a $value
237    (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $key_field:ident => $field:ident,) => {
238        if let CachedDataItem::$tag { $key_field, $value } = $item {
239            let result = $self.$field.$fn($key_field, $($args)*);
240            return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $key_field => $field);
241        }
242    };
243    (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident => $field:ident,) => {
244        if let CachedDataItem::$tag { $value } = $item {
245            let result = $self.$field.$fn((), $($args)*);
246            return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag => $field);
247        }
248    };
249    (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
250        $crate::generate_inner_storage_internal!(CachedDataItem: $self, $item, $value, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
251        $crate::generate_inner_storage_internal!(CachedDataItem: $self, $item, $value, $return_ty, $fn($($args)*): $($config)+)
252    };
253    // Matching on CachedDataItemKey without a $value
254    (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $key_field:ident => $field:ident,) => {
255        if let CachedDataItemKey::$tag { $key_field } = $item {
256            let result = $self.$field.$fn($key_field, $($args)*);
257            return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $key_field => $field);
258        }
259    };
260    (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident => $field:ident,) => {
261        if let CachedDataItemKey::$tag { } = $item {
262            let result = $self.$field.$fn(&(), $($args)*);
263            return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag => $field);
264        }
265    };
266    (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
267        $crate::generate_inner_storage_internal!(CachedDataItemKey: $self, $item, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
268        $crate::generate_inner_storage_internal!(CachedDataItemKey: $self, $item, $return_ty, $fn($($args)*): $($config)+)
269    };
270    // Matching on CachedDataItemType without a $value
271    (CachedDataItemType: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident,) => {
272        if let CachedDataItemType::$tag = $item {
273            let result = $self.$field.$fn($($args)*);
274            return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $($key_field)? => $field);
275        }
276    };
277    (CachedDataItemType: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
278        $crate::generate_inner_storage_internal!(CachedDataItemType: $self, $item, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
279        $crate::generate_inner_storage_internal!(CachedDataItemType: $self, $item, $return_ty, $fn($($args)*): $($config)+)
280    };
281
282    // fn update
283    (update: $self:ident, $key:ident, $update:ident: $tag:ident $key_field:ident => $field:ident,) => {
284        if let CachedDataItemKey::$tag { $key_field } = $key {
285            $self.$field.update($key_field, |old| {
286                let old = old.map(|old| CachedDataItemValue::$tag { value: old });
287                let new = $update(old);
288                new.map(|new| if let CachedDataItemValue::$tag { value } = new {
289                    value
290                } else {
291                    unreachable!()
292                })
293            });
294            return;
295        }
296    };
297    (update: $self:ident, $key:ident, $update:ident: $tag:ident => $field:ident,) => {
298        if let CachedDataItemKey::$tag { } = $key {
299            $self.$field.update((), |old| {
300                let old = old.map(|old| CachedDataItemValue::$tag { value: old });
301                let new = $update(old);
302                new.map(|new| if let CachedDataItemValue::$tag { value } = new {
303                    value
304                } else {
305                    unreachable!()
306                })
307            });
308            return;
309        }
310    };
311    (update: $self:ident, $key:ident, $update:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
312        $crate::generate_inner_storage_internal!(update: $self, $key, $update: $tag $($key_field)? => $field,);
313        $crate::generate_inner_storage_internal!(update: $self, $key, $update: $($config)+)
314    };
315
316    // fn get_mut_or_insert_with
317    (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident $key_field:ident => $field:ident,) => {
318        if let CachedDataItemKey::$tag { $key_field } = $key {
319            let value = $self.$field.get_mut_or_insert_with($key_field, || {
320                let value = $insert_with();
321                if let CachedDataItemValue::$tag { value } = value {
322                    value
323                } else {
324                    unreachable!()
325                }
326            });
327            return CachedDataItemValueRefMut::$tag { value };
328        }
329    };
330    (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident => $field:ident,) => {
331        if let CachedDataItemKey::$tag { } = $key {
332            let value = $self.$field.get_mut_or_insert_with((), || {
333                let value = $insert_with();
334                if let CachedDataItemValue::$tag { value } = value {
335                    value
336                } else {
337                    unreachable!()
338                }
339            });
340            return CachedDataItemValueRefMut::$tag { value };
341        }
342    };
343    (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
344        $crate::generate_inner_storage_internal!(get_mut_or_insert_with: $self, $key, $insert_with: $tag $($key_field)? => $field,);
345        $crate::generate_inner_storage_internal!(get_mut_or_insert_with: $self, $key, $insert_with: $($config)+)
346    };
347
348    // fn extract_if
349    (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident $key_field:ident => $field:ident,) => {
350        if let CachedDataItemType::$tag = $ty {
351            let iter = $self.$field.extract_if(move |key, value| {
352                $f(CachedDataItemKey::$tag { $key_field: *key }, CachedDataItemValueRef::$tag { value })
353            }).map(|($key_field, value)| CachedDataItem::$tag { $key_field, value });
354            return InnerStorageIter::$tag(iter);
355        }
356    };
357    (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident => $field:ident,) => {
358        if let CachedDataItemType::$tag = $ty {
359            let iter = $self.$field.extract_if(move |_, value| {
360                $f(CachedDataItemKey::$tag { }, CachedDataItemValueRef::$tag { value })
361            }).map(|(_, value)| CachedDataItem::$tag { value });
362            return InnerStorageIter::$tag(iter);
363        }
364    };
365    (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
366        $crate::generate_inner_storage_internal!(extract_if: $self, $ty, $f: $tag $($key_field)? => $field,);
367        $crate::generate_inner_storage_internal!(extract_if: $self, $ty, $f: $($config)+)
368    };
369
370    // fn iter
371    (iter: $self:ident, $ty:ident: $tag:ident $key_field:ident => $field:ident,) => {
372        if let CachedDataItemType::$tag = $ty {
373            let iter = $self.$field.iter().map(|($key_field, value)| (CachedDataItemKey::$tag { $key_field: *$key_field }, CachedDataItemValueRef::$tag { value }));
374            return InnerStorageIter::$tag(iter);
375        }
376    };
377    (iter: $self:ident, $ty:ident: $tag:ident => $field:ident,) => {
378        if let CachedDataItemType::$tag = $ty {
379            let iter = $self.$field.iter().map(|(_, value)| (CachedDataItemKey::$tag { }, CachedDataItemValueRef::$tag { value }));
380            return InnerStorageIter::$tag(iter);
381        }
382    };
383    (iter: $self:ident, $ty:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
384        $crate::generate_inner_storage_internal!(iter: $self, $ty: $tag $($key_field)? => $field,);
385        $crate::generate_inner_storage_internal!(iter: $self, $ty: $($config)+)
386    };
387
388
389    // Return value handling
390    (return_value: $result:ident, none: $($more:tt)*) => {
391        $result
392    };
393    (return_value: $result:ident, option_value: $tag:ident $($more:tt)*) => {
394        $result.map(|value| CachedDataItemValue::$tag { value })
395    };
396    (return_value: $result:ident, option_ref: $tag:ident $($more:tt)*) => {
397        $result.map(|value| CachedDataItemValueRef::$tag { value })
398    };
399    (return_value: $result:ident, option_ref_mut: $tag:ident $($more:tt)*) => {
400        $result.map(|value| CachedDataItemValueRefMut::$tag { value })
401    };
402
403    // Input value handling
404    (input_value: $input:ident, option_value: $tag:ident $($more:tt)*) => {
405        $input.map(|value| {
406            if let CachedDataItemValue::$tag { value } = value {
407                value
408            } else {
409                unreachable!()
410            }
411        })
412    };
413
414}
415
416macro_rules! generate_inner_storage {
417    ($($config:tt)*) => {
418        impl InnerStorage {
419            pub fn add(&mut self, item: CachedDataItem) -> bool {
420                use crate::data_storage::Storage;
421                $crate::generate_inner_storage_internal!(CachedDataItem: self, item, value, none, add(value): $($config)*);
422                self.dynamic.add(item)
423            }
424
425            pub fn insert(&mut self, item: CachedDataItem) -> Option<CachedDataItemValue> {
426                use crate::data_storage::Storage;
427                $crate::generate_inner_storage_internal!(CachedDataItem: self, item, value, option_value, insert(value): $($config)*);
428                self.dynamic.insert(item)
429            }
430
431            pub fn remove(&mut self, key: &CachedDataItemKey) -> Option<CachedDataItemValue> {
432                use crate::data_storage::Storage;
433                $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_value, remove(): $($config)*);
434                self.dynamic.remove(key)
435            }
436
437            pub fn count(&self, ty: CachedDataItemType) -> usize {
438                use crate::data_storage::Storage;
439                $crate::generate_inner_storage_internal!(CachedDataItemType: self, ty, none, len(): $($config)*);
440                self.dynamic.count(ty)
441            }
442
443            pub fn get(&self, key: &CachedDataItemKey) -> Option<CachedDataItemValueRef> {
444                use crate::data_storage::Storage;
445                $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_ref, get(): $($config)*);
446                self.dynamic.get(key)
447            }
448
449            pub fn contains_key(&self, key: &CachedDataItemKey) -> bool {
450                use crate::data_storage::Storage;
451                $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, none, contains_key(): $($config)*);
452                self.dynamic.contains_key(key)
453            }
454
455            pub fn get_mut(&mut self, key: &CachedDataItemKey) -> Option<CachedDataItemValueRefMut> {
456                use crate::data_storage::Storage;
457                $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_ref_mut, get_mut(): $($config)*);
458                self.dynamic.get_mut(key)
459            }
460
461            pub fn shrink_to_fit(&mut self, ty: CachedDataItemType) {
462                use crate::data_storage::Storage;
463                $crate::generate_inner_storage_internal!(CachedDataItemType: self, ty, none, shrink_to_fit(): $($config)*);
464                self.dynamic.shrink_to_fit(ty)
465            }
466
467            pub fn update(
468                &mut self,
469                key: CachedDataItemKey,
470                update: impl FnOnce(Option<CachedDataItemValue>) -> Option<CachedDataItemValue>,
471            ) {
472                use crate::data_storage::Storage;
473                $crate::generate_inner_storage_internal!(update: self, key, update: $($config)*);
474                self.dynamic.update(key, update)
475            }
476
477            pub fn extract_if<'l, F>(
478                &'l mut self,
479                ty: CachedDataItemType,
480                mut f: F,
481            ) -> impl Iterator<Item = CachedDataItem> + use<'l, F>
482            where
483                F: for<'a> FnMut(CachedDataItemKey, CachedDataItemValueRef<'a>) -> bool + 'l,
484            {
485                use crate::data_storage::Storage;
486                $crate::generate_inner_storage_internal!(extract_if: self, ty, f: $($config)*);
487                InnerStorageIter::Dynamic(self.dynamic.extract_if(ty, f))
488            }
489
490            pub fn get_mut_or_insert_with(
491                &mut self,
492                key: CachedDataItemKey,
493                f: impl FnOnce() -> CachedDataItemValue,
494            ) -> CachedDataItemValueRefMut<'_>
495            {
496                use crate::data_storage::Storage;
497                $crate::generate_inner_storage_internal!(get_mut_or_insert_with: self, key, f: $($config)*);
498                self.dynamic.get_mut_or_insert_with(key, f)
499            }
500
501            pub fn iter(
502                &self,
503                ty: CachedDataItemType,
504            ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)>
505            {
506                use crate::data_storage::Storage;
507                $crate::generate_inner_storage_internal!(iter: self, ty: $($config)*);
508                InnerStorageIter::Dynamic(self.dynamic.iter(ty))
509            }
510
511        }
512    };
513}
514
515generate_inner_storage!(
516    AggregationNumber => aggregation_number,
517    OutputDependent task => output_dependent,
518    Output => output,
519    Upper task => upper,
520);
521
522enum InnerStorageIter<A, B, C, D, E> {
523    AggregationNumber(A),
524    OutputDependent(B),
525    Output(C),
526    Upper(D),
527    Dynamic(E),
528}
529
530impl<T, A, B, C, D, E> Iterator for InnerStorageIter<A, B, C, D, E>
531where
532    A: Iterator<Item = T>,
533    B: Iterator<Item = T>,
534    C: Iterator<Item = T>,
535    D: Iterator<Item = T>,
536    E: Iterator<Item = T>,
537{
538    type Item = T;
539
540    fn next(&mut self) -> Option<Self::Item> {
541        match self {
542            InnerStorageIter::AggregationNumber(iter) => iter.next(),
543            InnerStorageIter::OutputDependent(iter) => iter.next(),
544            InnerStorageIter::Output(iter) => iter.next(),
545            InnerStorageIter::Upper(iter) => iter.next(),
546            InnerStorageIter::Dynamic(iter) => iter.next(),
547        }
548    }
549}
550
551impl InnerStorage {
552    pub fn iter_all(
553        &self,
554    ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)> {
555        use crate::data_storage::Storage;
556        self.dynamic
557            .iter_all()
558            .chain(self.aggregation_number.iter().map(|(_, value)| {
559                (
560                    CachedDataItemKey::AggregationNumber {},
561                    CachedDataItemValueRef::AggregationNumber { value },
562                )
563            }))
564            .chain(self.output.iter().map(|(_, value)| {
565                (
566                    CachedDataItemKey::Output {},
567                    CachedDataItemValueRef::Output { value },
568                )
569            }))
570            .chain(self.upper.iter().map(|(k, value)| {
571                (
572                    CachedDataItemKey::Upper { task: *k },
573                    CachedDataItemValueRef::Upper { value },
574                )
575            }))
576            .chain(self.output_dependent.iter().map(|(k, value)| {
577                (
578                    CachedDataItemKey::OutputDependent { task: *k },
579                    CachedDataItemValueRef::OutputDependent { value },
580                )
581            }))
582    }
583
584    pub fn len(&self) -> usize {
585        use crate::data_storage::Storage;
586        self.dynamic.len()
587            + self.aggregation_number.len()
588            + self.output.len()
589            + self.upper.len()
590            + self.output_dependent.len()
591    }
592}
593
594enum ModifiedState {
595    /// It was modified before snapshot mode was entered, but it was not accessed during snapshot
596    /// mode.
597    Modified,
598    /// Snapshot(Some):
599    /// It was modified before snapshot mode was entered and it was accessed again during snapshot
600    /// mode. A copy of the version of the item when snapshot mode was entered is stored here.
601    /// Snapshot(None):
602    /// It was not modified before snapshot mode was entered, but it was accessed during snapshot
603    /// mode. Or the snapshot was already taken out by the snapshot operation.
604    Snapshot(Option<Box<InnerStorageSnapshot>>),
605}
606
607pub struct Storage {
608    snapshot_mode: AtomicBool,
609    modified: FxDashMap<TaskId, ModifiedState>,
610    map: FxDashMap<TaskId, Box<InnerStorage>>,
611}
612
613impl Storage {
614    pub fn new() -> Self {
615        let shard_amount =
616            (available_parallelism().map_or(4, |v| v.get()) * 64).next_power_of_two();
617        Self {
618            snapshot_mode: AtomicBool::new(false),
619            modified: FxDashMap::with_capacity_and_hasher_and_shard_amount(
620                1024,
621                Default::default(),
622                shard_amount,
623            ),
624            map: FxDashMap::with_capacity_and_hasher_and_shard_amount(
625                1024 * 1024,
626                Default::default(),
627                shard_amount,
628            ),
629        }
630    }
631
632    /// Processes every modified item (resp. a snapshot of it) with the given functions and returns
633    /// the results. Ends snapshot mode afterwards.
634    /// preprocess is potentially called within a lock, so it should be fast.
635    /// process is called outside of locks, so it could do more expensive operations.
636    pub fn take_snapshot<
637        'l,
638        T,
639        R,
640        PP: for<'a> Fn(TaskId, &'a InnerStorage) -> T + Sync,
641        P: Fn(TaskId, T) -> R + Sync,
642        PS: Fn(TaskId, Box<InnerStorageSnapshot>) -> R + Sync,
643    >(
644        &'l self,
645        preprocess: &'l PP,
646        process: &'l P,
647        process_snapshot: &'l PS,
648    ) -> Vec<SnapshotShard<'l, PP, P, PS>> {
649        if !self.snapshot_mode() {
650            self.start_snapshot();
651        }
652
653        let guard = Arc::new(SnapshotGuard { storage: self });
654
655        // The number of shards is much larger than the number of threads, so the effect of the
656        // locks held is negligible.
657        self.modified
658            .shards()
659            .par_iter()
660            .with_max_len(1)
661            .map(|shard| {
662                let mut direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)> = Vec::new();
663                let mut modified: Vec<TaskId> = Vec::new();
664                {
665                    // Take the snapshots from the modified map
666                    let guard = shard.write();
667                    // Safety: guard must outlive the iterator.
668                    for bucket in unsafe { guard.iter() } {
669                        // Safety: the guard guarantees that the bucket is not removed and the ptr
670                        // is valid.
671                        let (key, shared_value) = unsafe { bucket.as_mut() };
672                        let modified_state = shared_value.get_mut();
673                        match modified_state {
674                            ModifiedState::Modified => {
675                                modified.push(*key);
676                            }
677                            ModifiedState::Snapshot(snapshot) => {
678                                if let Some(snapshot) = snapshot.take() {
679                                    direct_snapshots.push((*key, snapshot));
680                                }
681                            }
682                        }
683                    }
684                    // Safety: guard must outlive the iterator.
685                    drop(guard);
686                }
687
688                SnapshotShard {
689                    direct_snapshots,
690                    modified,
691                    storage: self,
692                    guard: Some(guard.clone()),
693                    process,
694                    preprocess,
695                    process_snapshot,
696                }
697            })
698            .collect::<Vec<_>>()
699    }
700
701    /// Start snapshot mode.
702    pub fn start_snapshot(&self) {
703        self.snapshot_mode
704            .store(true, std::sync::atomic::Ordering::Release);
705    }
706
707    /// End snapshot mode.
708    /// Items that have snapshots will be kept as modified since they have been accessed during the
709    /// snapshot mode. Items that are modified will be removed and considered as unmodified.
710    /// When items are accessed in future they will be marked as modified.
711    fn end_snapshot(&self) {
712        // We are still in snapshot mode, so all accessed items would be stored as snapshot.
713        // This means we can start by removing all modified items.
714        let mut removed_modified = Vec::new();
715        self.modified.retain(|key, inner| {
716            if matches!(inner, ModifiedState::Modified) {
717                removed_modified.push(*key);
718                false
719            } else {
720                true
721            }
722        });
723
724        // We also need to unset all the modified flags.
725        for key in removed_modified {
726            if let Some(mut inner) = self.map.get_mut(&key) {
727                let state = inner.state_mut();
728                state.set_data_modified(false);
729                state.set_meta_modified(false);
730            }
731        }
732
733        // Now modified only contains snapshots.
734        // We leave snapshot mode. Any access would be stored as modified and not as snapshot.
735        self.snapshot_mode
736            .store(false, std::sync::atomic::Ordering::Release);
737
738        // We can change all the snapshots to modified now.
739        let mut removed_snapshots = Vec::new();
740        for mut item in self.modified.iter_mut() {
741            match item.value() {
742                ModifiedState::Snapshot(_) => {
743                    removed_snapshots.push(*item.key());
744                    *item.value_mut() = ModifiedState::Modified;
745                }
746                ModifiedState::Modified => {
747                    // This means it was concurrently modified.
748                    // It's already in the correct state.
749                }
750            }
751        }
752
753        // And update the flags
754        for key in removed_snapshots {
755            if let Some(mut inner) = self.map.get_mut(&key) {
756                let state = inner.state_mut();
757                if state.meta_snapshot() {
758                    state.set_meta_snapshot(false);
759                    state.set_meta_modified(true);
760                }
761                if state.data_snapshot() {
762                    state.set_data_snapshot(false);
763                    state.set_data_modified(true);
764                }
765            }
766        }
767
768        // Remove excessive capacity in modified
769        self.modified.shrink_to_fit();
770    }
771
772    fn snapshot_mode(&self) -> bool {
773        self.snapshot_mode
774            .load(std::sync::atomic::Ordering::Acquire)
775    }
776
777    pub fn access_mut(&self, key: TaskId) -> StorageWriteGuard<'_> {
778        let inner = match self.map.entry(key) {
779            dashmap::mapref::entry::Entry::Occupied(e) => e.into_ref(),
780            dashmap::mapref::entry::Entry::Vacant(e) => e.insert(Box::new(InnerStorage::new())),
781        };
782        StorageWriteGuard {
783            storage: self,
784            inner: inner.into(),
785        }
786    }
787
788    pub fn access_pair_mut(
789        &self,
790        key1: TaskId,
791        key2: TaskId,
792    ) -> (StorageWriteGuard<'_>, StorageWriteGuard<'_>) {
793        let (a, b) = get_multiple_mut(&self.map, key1, key2, || Box::new(InnerStorage::new()));
794        (
795            StorageWriteGuard {
796                storage: self,
797                inner: a,
798            },
799            StorageWriteGuard {
800                storage: self,
801                inner: b,
802            },
803        )
804    }
805}
806
807pub struct StorageWriteGuard<'a> {
808    storage: &'a Storage,
809    inner: RefMut<'a, TaskId, Box<InnerStorage>>,
810}
811
812impl StorageWriteGuard<'_> {
813    /// Tracks mutation of this task
814    pub fn track_modification(&mut self, category: SpecificTaskDataCategory) {
815        let state = self.inner.state();
816        let snapshot = match category {
817            SpecificTaskDataCategory::Meta => state.meta_snapshot(),
818            SpecificTaskDataCategory::Data => state.data_snapshot(),
819        };
820        if !snapshot {
821            let modified = match category {
822                SpecificTaskDataCategory::Meta => state.meta_modified(),
823                SpecificTaskDataCategory::Data => state.data_modified(),
824            };
825            match (self.storage.snapshot_mode(), modified) {
826                (false, false) => {
827                    // Not in snapshot mode and item is unmodified
828                    if !state.any_snapshot() && !state.any_modified() {
829                        self.storage
830                            .modified
831                            .insert(*self.inner.key(), ModifiedState::Modified);
832                    }
833                    let state = self.inner.state_mut();
834                    match category {
835                        SpecificTaskDataCategory::Meta => state.set_meta_modified(true),
836                        SpecificTaskDataCategory::Data => state.set_data_modified(true),
837                    }
838                }
839                (false, true) => {
840                    // Not in snapshot mode and item is already modfied
841                    // Do nothing
842                }
843                (true, false) => {
844                    // In snapshot mode and item is unmodified (so it's not part of the snapshot)
845                    if !state.any_snapshot() {
846                        self.storage
847                            .modified
848                            .insert(*self.inner.key(), ModifiedState::Snapshot(None));
849                    }
850                    let state = self.inner.state_mut();
851                    match category {
852                        SpecificTaskDataCategory::Meta => state.set_meta_snapshot(true),
853                        SpecificTaskDataCategory::Data => state.set_data_snapshot(true),
854                    }
855                }
856                (true, true) => {
857                    // In snapshot mode and item is modified (so it's part of the snapshot)
858                    // We need to store the original version that is part of the snapshot
859                    if !state.any_snapshot() {
860                        self.storage.modified.insert(
861                            *self.inner.key(),
862                            ModifiedState::Snapshot(Some(Box::new((&**self.inner).into()))),
863                        );
864                    }
865                    let state = self.inner.state_mut();
866                    match category {
867                        SpecificTaskDataCategory::Meta => state.set_meta_snapshot(true),
868                        SpecificTaskDataCategory::Data => state.set_data_snapshot(true),
869                    }
870                }
871            }
872        }
873    }
874}
875
876impl Deref for StorageWriteGuard<'_> {
877    type Target = InnerStorage;
878
879    fn deref(&self) -> &Self::Target {
880        &self.inner
881    }
882}
883
884impl DerefMut for StorageWriteGuard<'_> {
885    fn deref_mut(&mut self) -> &mut Self::Target {
886        &mut self.inner
887    }
888}
889
890macro_rules! count {
891    ($task:ident, $key:ident) => {{ $task.count($crate::data::CachedDataItemType::$key) }};
892}
893
894macro_rules! get {
895    ($task:ident, $key:ident $input:tt) => {{
896        #[allow(unused_imports)]
897        use $crate::backend::operation::TaskGuard;
898        if let Some($crate::data::CachedDataItemValueRef::$key {
899            value,
900        }) = $task.get(&$crate::data::CachedDataItemKey::$key $input) {
901            Some(value)
902        } else {
903            None
904        }
905    }};
906    ($task:ident, $key:ident) => {
907        $crate::backend::storage::get!($task, $key {})
908    };
909}
910
911macro_rules! get_mut {
912    ($task:ident, $key:ident $input:tt) => {{
913        #[allow(unused_imports)]
914        use $crate::backend::operation::TaskGuard;
915        if let Some($crate::data::CachedDataItemValueRefMut::$key {
916            value,
917        }) = $task.get_mut(&$crate::data::CachedDataItemKey::$key $input) {
918            let () = $crate::data::allow_mut_access::$key;
919            Some(value)
920        } else {
921            None
922        }
923    }};
924    ($task:ident, $key:ident) => {
925        $crate::backend::storage::get_mut!($task, $key {})
926    };
927}
928
929macro_rules! get_mut_or_insert_with {
930    ($task:ident, $key:ident $input:tt, $f:expr) => {{
931        #[allow(unused_imports)]
932        use $crate::backend::operation::TaskGuard;
933        let () = $crate::data::allow_mut_access::$key;
934        let functor = $f;
935        let $crate::data::CachedDataItemValueRefMut::$key {
936            value,
937        } = $task.get_mut_or_insert_with($crate::data::CachedDataItemKey::$key $input, move || $crate::data::CachedDataItemValue::$key { value: functor() }) else {
938            unreachable!()
939        };
940        value
941    }};
942    ($task:ident, $key:ident, $f:expr) => {
943        $crate::backend::storage::get_mut_or_insert_with!($task, $key {}, $f)
944    };
945}
946
947/// Creates an iterator over all [`CachedDataItemKey::$key`][crate::data::CachedDataItemKey]s in
948/// `$task` matching the given `$key_pattern`, optional `$value_pattern`, and optional `if $cond`.
949///
950/// Each element in the iterator is determined by `$iter_item`, which may use fields extracted by
951/// `$key_pattern` or `$value_pattern`.
952macro_rules! iter_many {
953    ($task:ident, $key:ident $key_pattern:tt $(if $cond:expr)? => $iter_item:expr) => {{
954        #[allow(unused_imports)]
955        use $crate::backend::operation::TaskGuard;
956        $task
957            .iter($crate::data::CachedDataItemType::$key)
958            .filter_map(|(key, _)| match key {
959                $crate::data::CachedDataItemKey::$key $key_pattern $(if $cond)? => Some(
960                    $iter_item
961                ),
962                _ => None,
963            })
964    }};
965    ($task:ident, $key:ident $input:tt $value_pattern:tt $(if $cond:expr)? => $iter_item:expr) => {{
966        #[allow(unused_imports)]
967        use $crate::backend::operation::TaskGuard;
968        $task
969            .iter($crate::data::CachedDataItemType::$key)
970            .filter_map(|(key, value)| match (key, value) {
971                (
972                    $crate::data::CachedDataItemKey::$key $input,
973                    $crate::data::CachedDataItemValueRef::$key { value: $value_pattern }
974                ) $(if $cond)? => Some($iter_item),
975                _ => None,
976            })
977    }};
978}
979
980/// A thin wrapper around [`iter_many`] that calls [`Iterator::collect`].
981///
982/// Note that the return type of [`Iterator::collect`] may be ambiguous in certain contexts, so
983/// using this macro may require explicit type annotations on variables.
984macro_rules! get_many {
985    ($($args:tt)*) => {
986        $crate::backend::storage::iter_many!($($args)*).collect()
987    };
988}
989
990macro_rules! update {
991    ($task:ident, $key:ident $input:tt, $update:expr) => {{
992        #[allow(unused_imports)]
993        use $crate::backend::operation::TaskGuard;
994        #[allow(unused_mut)]
995        let mut update = $update;
996        $task.update($crate::data::CachedDataItemKey::$key $input, |old| {
997            update(old.and_then(|old| {
998                if let $crate::data::CachedDataItemValue::$key { value } = old {
999                    Some(value)
1000                } else {
1001                    None
1002                }
1003            }))
1004            .map(|new| $crate::data::CachedDataItemValue::$key { value: new })
1005        })
1006    }};
1007    ($task:ident, $key:ident, $update:expr) => {
1008        $crate::backend::storage::update!($task, $key {}, $update)
1009    };
1010}
1011
1012macro_rules! update_count {
1013    ($task:ident, $key:ident $input:tt, -$update:expr) => {{
1014        let update = $update;
1015        let mut state_change = false;
1016        $crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
1017            #[allow(unused_comparisons, reason = "type of update might be unsigned, where update < 0 is always false")]
1018            if let Some(old) = old {
1019                let new = old - update;
1020                state_change = old <= 0 && new > 0 || old > 0 && new <= 0;
1021                (new != 0).then_some(new)
1022            } else {
1023                state_change = update < 0;
1024                (update != 0).then_some(-update)
1025            }
1026        });
1027        state_change
1028    }};
1029    ($task:ident, $key:ident $input:tt, $update:expr) => {
1030        match $update {
1031            update => {
1032                let mut state_change = false;
1033                $crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
1034                    if let Some(old) = old {
1035                        let new = old + update;
1036                        state_change = old <= 0 && new > 0 || old > 0 && new <= 0;
1037                        (new != 0).then_some(new)
1038                    } else {
1039                        state_change = update > 0;
1040                        (update != 0).then_some(update)
1041                    }
1042                });
1043                state_change
1044            }
1045        }
1046    };
1047    ($task:ident, $key:ident, -$update:expr) => {
1048        $crate::backend::storage::update_count!($task, $key {}, -$update)
1049    };    ($task:ident, $key:ident, $update:expr) => {
1050        $crate::backend::storage::update_count!($task, $key {}, $update)
1051    };
1052}
1053
1054macro_rules! remove {
1055    ($task:ident, $key:ident $input:tt) => {{
1056        #[allow(unused_imports)]
1057        use $crate::backend::operation::TaskGuard;
1058        if let Some($crate::data::CachedDataItemValue::$key { value }) = $task.remove(
1059            &$crate::data::CachedDataItemKey::$key $input
1060        ) {
1061            Some(value)
1062        } else {
1063            None
1064        }
1065    }};
1066    ($task:ident, $key:ident) => {
1067        $crate::backend::storage::remove!($task, $key {})
1068    };
1069}
1070
1071pub(crate) use count;
1072pub(crate) use get;
1073pub(crate) use get_many;
1074pub(crate) use get_mut;
1075pub(crate) use get_mut_or_insert_with;
1076pub(crate) use iter_many;
1077pub(crate) use remove;
1078pub(crate) use update;
1079pub(crate) use update_count;
1080
1081pub struct SnapshotGuard<'l> {
1082    storage: &'l Storage,
1083}
1084
1085impl Drop for SnapshotGuard<'_> {
1086    fn drop(&mut self) {
1087        self.storage.end_snapshot();
1088    }
1089}
1090
1091pub struct SnapshotShard<'l, PP, P, PS> {
1092    direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)>,
1093    modified: Vec<TaskId>,
1094    storage: &'l Storage,
1095    guard: Option<Arc<SnapshotGuard<'l>>>,
1096    process: &'l P,
1097    preprocess: &'l PP,
1098    process_snapshot: &'l PS,
1099}
1100
1101impl<'l, T, R, PP, P, PS> Iterator for SnapshotShard<'l, PP, P, PS>
1102where
1103    PP: for<'a> Fn(TaskId, &'a InnerStorage) -> T + Sync,
1104    P: Fn(TaskId, T) -> R + Sync,
1105    PS: Fn(TaskId, Box<InnerStorageSnapshot>) -> R + Sync,
1106{
1107    type Item = R;
1108
1109    fn next(&mut self) -> Option<Self::Item> {
1110        if let Some((task_id, snapshot)) = self.direct_snapshots.pop() {
1111            return Some((self.process_snapshot)(task_id, snapshot));
1112        }
1113        while let Some(task_id) = self.modified.pop() {
1114            let inner = self.storage.map.get(&task_id).unwrap();
1115            let state = inner.state();
1116            if !state.any_snapshot() {
1117                let preprocessed = (self.preprocess)(task_id, &inner);
1118                drop(inner);
1119                return Some((self.process)(task_id, preprocessed));
1120            } else {
1121                drop(inner);
1122                let maybe_snapshot = {
1123                    let mut modified_state = self.storage.modified.get_mut(&task_id).unwrap();
1124                    let ModifiedState::Snapshot(snapshot) = &mut *modified_state else {
1125                        unreachable!("The snapshot bit was set, so it must be in Snapshot state");
1126                    };
1127                    snapshot.take()
1128                };
1129                if let Some(snapshot) = maybe_snapshot {
1130                    return Some((self.process_snapshot)(task_id, snapshot));
1131                }
1132            }
1133        }
1134        self.guard = None;
1135        None
1136    }
1137}