turbo_tasks_backend/backend/
storage.rs

1use std::{
2    hash::Hash,
3    ops::{Deref, DerefMut},
4    sync::{Arc, atomic::AtomicBool},
5};
6
7use bitfield::bitfield;
8use smallvec::SmallVec;
9use turbo_tasks::{FxDashMap, TaskId, parallel};
10
11use crate::{
12    backend::dynamic_storage::DynamicStorage,
13    data::{
14        AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
15        CachedDataItemValue, CachedDataItemValueRef, CachedDataItemValueRefMut, OutputValue,
16    },
17    data_storage::{AutoMapStorage, OptionStorage},
18    utils::{
19        dash_map_drop_contents::drop_contents,
20        dash_map_multi::{RefMut, get_multiple_mut},
21    },
22};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
25pub enum TaskDataCategory {
26    Meta,
27    Data,
28    All,
29}
30
31impl TaskDataCategory {
32    pub fn into_specific(self) -> SpecificTaskDataCategory {
33        match self {
34            TaskDataCategory::Meta => SpecificTaskDataCategory::Meta,
35            TaskDataCategory::Data => SpecificTaskDataCategory::Data,
36            TaskDataCategory::All => unreachable!(),
37        }
38    }
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
42pub enum SpecificTaskDataCategory {
43    Meta,
44    Data,
45}
46
47impl IntoIterator for TaskDataCategory {
48    type Item = TaskDataCategory;
49
50    type IntoIter = TaskDataCategoryIterator;
51
52    fn into_iter(self) -> Self::IntoIter {
53        match self {
54            TaskDataCategory::Meta => TaskDataCategoryIterator::Meta,
55            TaskDataCategory::Data => TaskDataCategoryIterator::Data,
56            TaskDataCategory::All => TaskDataCategoryIterator::All,
57        }
58    }
59}
60
61pub enum TaskDataCategoryIterator {
62    All,
63    Meta,
64    Data,
65    None,
66}
67
68impl Iterator for TaskDataCategoryIterator {
69    type Item = TaskDataCategory;
70
71    fn next(&mut self) -> Option<Self::Item> {
72        match self {
73            TaskDataCategoryIterator::All => {
74                *self = TaskDataCategoryIterator::Data;
75                Some(TaskDataCategory::Meta)
76            }
77            TaskDataCategoryIterator::Meta => {
78                *self = TaskDataCategoryIterator::None;
79                Some(TaskDataCategory::Meta)
80            }
81            TaskDataCategoryIterator::Data => {
82                *self = TaskDataCategoryIterator::None;
83                Some(TaskDataCategory::Data)
84            }
85            TaskDataCategoryIterator::None => None,
86        }
87    }
88}
89
90bitfield! {
91    // Note: Due to alignment in InnerStorage it doesn't matter if this struct is 1 or 4 bytes.
92    #[derive(Clone, Default)]
93    pub struct InnerStorageState(u32);
94    impl Debug;
95    pub meta_restored, set_meta_restored: 0;
96    pub data_restored, set_data_restored: 1;
97    /// Item was modified before snapshot mode was entered.
98    pub meta_modified, set_meta_modified: 2;
99    pub data_modified, set_data_modified: 3;
100    /// Item was modified after snapshot mode was entered. A snapshot was taken.
101    pub meta_snapshot, set_meta_snapshot: 4;
102    pub data_snapshot, set_data_snapshot: 5;
103    /// Prefetched dependencies
104    pub prefetched, set_prefetched: 6;
105}
106
107impl InnerStorageState {
108    pub fn set_restored(&mut self, category: TaskDataCategory) {
109        match category {
110            TaskDataCategory::Meta => {
111                self.set_meta_restored(true);
112            }
113            TaskDataCategory::Data => {
114                self.set_data_restored(true);
115            }
116            TaskDataCategory::All => {
117                self.set_meta_restored(true);
118                self.set_data_restored(true);
119            }
120        }
121    }
122
123    pub fn is_restored(&self, category: TaskDataCategory) -> bool {
124        match category {
125            TaskDataCategory::Meta => self.meta_restored(),
126            TaskDataCategory::Data => self.data_restored(),
127            TaskDataCategory::All => self.meta_restored() && self.data_restored(),
128        }
129    }
130
131    pub fn any_snapshot(&self) -> bool {
132        self.meta_snapshot() || self.data_snapshot()
133    }
134
135    pub fn any_modified(&self) -> bool {
136        self.meta_modified() || self.data_modified()
137    }
138}
139
140pub struct InnerStorageSnapshot {
141    aggregation_number: OptionStorage<AggregationNumber>,
142    output_dependent: AutoMapStorage<TaskId, ()>,
143    output: OptionStorage<OutputValue>,
144    upper: AutoMapStorage<TaskId, u32>,
145    dynamic: DynamicStorage,
146    pub meta_modified: bool,
147    pub data_modified: bool,
148}
149
150impl From<&InnerStorage> for InnerStorageSnapshot {
151    fn from(inner: &InnerStorage) -> Self {
152        Self {
153            aggregation_number: inner.aggregation_number.clone(),
154            output_dependent: inner.output_dependent.clone(),
155            output: inner.output.clone(),
156            upper: inner.upper.clone(),
157            dynamic: inner.dynamic.snapshot_for_persisting(),
158            meta_modified: inner.state.meta_modified(),
159            data_modified: inner.state.data_modified(),
160        }
161    }
162}
163
164impl InnerStorageSnapshot {
165    pub fn iter_all(
166        &self,
167    ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)> {
168        use crate::data_storage::Storage;
169        self.dynamic
170            .iter_all()
171            .chain(self.aggregation_number.iter().map(|(_, value)| {
172                (
173                    CachedDataItemKey::AggregationNumber {},
174                    CachedDataItemValueRef::AggregationNumber { value },
175                )
176            }))
177            .chain(self.output.iter().map(|(_, value)| {
178                (
179                    CachedDataItemKey::Output {},
180                    CachedDataItemValueRef::Output { value },
181                )
182            }))
183            .chain(self.upper.iter().map(|(k, value)| {
184                (
185                    CachedDataItemKey::Upper { task: *k },
186                    CachedDataItemValueRef::Upper { value },
187                )
188            }))
189            .chain(self.output_dependent.iter().map(|(k, value)| {
190                (
191                    CachedDataItemKey::OutputDependent { task: *k },
192                    CachedDataItemValueRef::OutputDependent { value },
193                )
194            }))
195    }
196
197    pub fn len(&self) -> usize {
198        use crate::data_storage::Storage;
199        self.dynamic.len()
200            + self.aggregation_number.len()
201            + self.output.len()
202            + self.upper.len()
203            + self.output_dependent.len()
204    }
205}
206
207#[derive(Debug, Clone)]
208pub struct InnerStorage {
209    aggregation_number: OptionStorage<AggregationNumber>,
210    output_dependent: AutoMapStorage<TaskId, ()>,
211    output: OptionStorage<OutputValue>,
212    upper: AutoMapStorage<TaskId, u32>,
213    dynamic: DynamicStorage,
214    state: InnerStorageState,
215}
216
217impl InnerStorage {
218    fn new() -> Self {
219        Self {
220            aggregation_number: Default::default(),
221            output_dependent: Default::default(),
222            output: Default::default(),
223            upper: Default::default(),
224            dynamic: DynamicStorage::new(),
225            state: InnerStorageState::default(),
226        }
227    }
228
229    pub fn state(&self) -> &InnerStorageState {
230        &self.state
231    }
232
233    pub fn state_mut(&mut self) -> &mut InnerStorageState {
234        &mut self.state
235    }
236}
237
238#[macro_export]
239macro_rules! generate_inner_storage_internal {
240    // Matching on CachedDataItem with a $value
241    (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $key_field:ident => $field:ident,) => {
242        if let CachedDataItem::$tag { $key_field, $value } = $item {
243            let result = $self.$field.$fn($key_field, $($args)*);
244            return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $key_field => $field);
245        }
246    };
247    (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident => $field:ident,) => {
248        if let CachedDataItem::$tag { $value } = $item {
249            let result = $self.$field.$fn((), $($args)*);
250            return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag => $field);
251        }
252    };
253    (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
254        $crate::generate_inner_storage_internal!(CachedDataItem: $self, $item, $value, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
255        $crate::generate_inner_storage_internal!(CachedDataItem: $self, $item, $value, $return_ty, $fn($($args)*): $($config)+)
256    };
257    // Matching on CachedDataItemKey without a $value
258    (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $key_field:ident => $field:ident,) => {
259        if let CachedDataItemKey::$tag { $key_field } = $item {
260            let result = $self.$field.$fn($key_field, $($args)*);
261            return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $key_field => $field);
262        }
263    };
264    (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident => $field:ident,) => {
265        if let CachedDataItemKey::$tag { } = $item {
266            let result = $self.$field.$fn(&(), $($args)*);
267            return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag => $field);
268        }
269    };
270    (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
271        $crate::generate_inner_storage_internal!(CachedDataItemKey: $self, $item, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
272        $crate::generate_inner_storage_internal!(CachedDataItemKey: $self, $item, $return_ty, $fn($($args)*): $($config)+)
273    };
274    // Matching on CachedDataItemType without a $value
275    (CachedDataItemType: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident,) => {
276        if let CachedDataItemType::$tag = $item {
277            let result = $self.$field.$fn($($args)*);
278            return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $($key_field)? => $field);
279        }
280    };
281    (CachedDataItemType: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
282        $crate::generate_inner_storage_internal!(CachedDataItemType: $self, $item, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
283        $crate::generate_inner_storage_internal!(CachedDataItemType: $self, $item, $return_ty, $fn($($args)*): $($config)+)
284    };
285
286    // fn update
287    (update: $self:ident, $key:ident, $update:ident: $tag:ident $key_field:ident => $field:ident,) => {
288        if let CachedDataItemKey::$tag { $key_field } = $key {
289            $self.$field.update($key_field, |old| {
290                let old = old.map(|old| CachedDataItemValue::$tag { value: old });
291                let new = $update(old);
292                new.map(|new| if let CachedDataItemValue::$tag { value } = new {
293                    value
294                } else {
295                    unreachable!()
296                })
297            });
298            return;
299        }
300    };
301    (update: $self:ident, $key:ident, $update:ident: $tag:ident => $field:ident,) => {
302        if let CachedDataItemKey::$tag { } = $key {
303            $self.$field.update((), |old| {
304                let old = old.map(|old| CachedDataItemValue::$tag { value: old });
305                let new = $update(old);
306                new.map(|new| if let CachedDataItemValue::$tag { value } = new {
307                    value
308                } else {
309                    unreachable!()
310                })
311            });
312            return;
313        }
314    };
315    (update: $self:ident, $key:ident, $update:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
316        $crate::generate_inner_storage_internal!(update: $self, $key, $update: $tag $($key_field)? => $field,);
317        $crate::generate_inner_storage_internal!(update: $self, $key, $update: $($config)+)
318    };
319
320    // fn extend
321    (extend: $self:ident, $ty:ident, $items:ident: $tag:ident $key_field:ident => $field:ident,) => {
322        if let CachedDataItemType::$tag = $ty {
323            return $self.$field.extend($items.map(|item| {
324                let pair = turbo_tasks::KeyValuePair::into_key_and_value(item);
325                if let (CachedDataItemKey::$tag { $key_field }, CachedDataItemValue::$tag { value }) = pair {
326                    ($key_field, value)
327                } else {
328                    unreachable!()
329                }
330            }));
331        }
332    };
333    (extend: $self:ident, $ty:ident, $items:ident: $tag:ident => $field:ident,) => {
334        if let CachedDataItemType::$tag = $ty {
335            return $self.$field.extend($items.map(|item| {
336                let pair = turbo_tasks::KeyValuePair::into_key_and_value(item);
337                if let (_, CachedDataItemValue::$tag { value }) = pair {
338                    ((), value)
339                } else {
340                    unreachable!()
341                }
342            }));
343        }
344    };
345    (extend: $self:ident, $ty:ident, $items:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
346        $crate::generate_inner_storage_internal!(extend: $self, $ty, $items: $tag $($key_field)? => $field,);
347        $crate::generate_inner_storage_internal!(extend: $self, $ty, $items: $($config)+)
348    };
349
350    // fn get_mut_or_insert_with
351    (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident $key_field:ident => $field:ident,) => {
352        if let CachedDataItemKey::$tag { $key_field } = $key {
353            let value = $self.$field.get_mut_or_insert_with($key_field, || {
354                let value = $insert_with();
355                if let CachedDataItemValue::$tag { value } = value {
356                    value
357                } else {
358                    unreachable!()
359                }
360            });
361            return CachedDataItemValueRefMut::$tag { value };
362        }
363    };
364    (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident => $field:ident,) => {
365        if let CachedDataItemKey::$tag { } = $key {
366            let value = $self.$field.get_mut_or_insert_with((), || {
367                let value = $insert_with();
368                if let CachedDataItemValue::$tag { value } = value {
369                    value
370                } else {
371                    unreachable!()
372                }
373            });
374            return CachedDataItemValueRefMut::$tag { value };
375        }
376    };
377    (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
378        $crate::generate_inner_storage_internal!(get_mut_or_insert_with: $self, $key, $insert_with: $tag $($key_field)? => $field,);
379        $crate::generate_inner_storage_internal!(get_mut_or_insert_with: $self, $key, $insert_with: $($config)+)
380    };
381
382    // fn extract_if
383    (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident $key_field:ident => $field:ident,) => {
384        if let CachedDataItemType::$tag = $ty {
385            let iter = $self.$field.extract_if(move |key, value| {
386                $f(CachedDataItemKey::$tag { $key_field: *key }, CachedDataItemValueRef::$tag { value })
387            }).map(|($key_field, value)| CachedDataItem::$tag { $key_field, value });
388            return InnerStorageIter::$tag(iter);
389        }
390    };
391    (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident => $field:ident,) => {
392        if let CachedDataItemType::$tag = $ty {
393            let iter = $self.$field.extract_if(move |_, value| {
394                $f(CachedDataItemKey::$tag { }, CachedDataItemValueRef::$tag { value })
395            }).map(|(_, value)| CachedDataItem::$tag { value });
396            return InnerStorageIter::$tag(iter);
397        }
398    };
399    (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
400        $crate::generate_inner_storage_internal!(extract_if: $self, $ty, $f: $tag $($key_field)? => $field,);
401        $crate::generate_inner_storage_internal!(extract_if: $self, $ty, $f: $($config)+)
402    };
403
404    // fn iter
405    (iter: $self:ident, $ty:ident: $tag:ident $key_field:ident => $field:ident,) => {
406        if let CachedDataItemType::$tag = $ty {
407            let iter = $self.$field.iter().map(|($key_field, value)| (CachedDataItemKey::$tag { $key_field: *$key_field }, CachedDataItemValueRef::$tag { value }));
408            return InnerStorageIter::$tag(iter);
409        }
410    };
411    (iter: $self:ident, $ty:ident: $tag:ident => $field:ident,) => {
412        if let CachedDataItemType::$tag = $ty {
413            let iter = $self.$field.iter().map(|(_, value)| (CachedDataItemKey::$tag { }, CachedDataItemValueRef::$tag { value }));
414            return InnerStorageIter::$tag(iter);
415        }
416    };
417    (iter: $self:ident, $ty:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
418        $crate::generate_inner_storage_internal!(iter: $self, $ty: $tag $($key_field)? => $field,);
419        $crate::generate_inner_storage_internal!(iter: $self, $ty: $($config)+)
420    };
421
422
423    // Return value handling
424    (return_value: $result:ident, none: $($more:tt)*) => {
425        $result
426    };
427    (return_value: $result:ident, option_value: $tag:ident $($more:tt)*) => {
428        $result.map(|value| CachedDataItemValue::$tag { value })
429    };
430    (return_value: $result:ident, option_ref: $tag:ident $($more:tt)*) => {
431        $result.map(|value| CachedDataItemValueRef::$tag { value })
432    };
433    (return_value: $result:ident, option_ref_mut: $tag:ident $($more:tt)*) => {
434        $result.map(|value| CachedDataItemValueRefMut::$tag { value })
435    };
436
437    // Input value handling
438    (input_value: $input:ident, option_value: $tag:ident $($more:tt)*) => {
439        $input.map(|value| {
440            if let CachedDataItemValue::$tag { value } = value {
441                value
442            } else {
443                unreachable!()
444            }
445        })
446    };
447
448}
449
450macro_rules! generate_inner_storage {
451    ($($config:tt)*) => {
452        impl InnerStorage {
453            pub fn add(&mut self, item: CachedDataItem) -> bool {
454                use crate::data_storage::Storage;
455                $crate::generate_inner_storage_internal!(CachedDataItem: self, item, value, none, add(value): $($config)*);
456                self.dynamic.add(item)
457            }
458
459            pub fn extend(&mut self, ty: CachedDataItemType, items: impl Iterator<Item = CachedDataItem>) -> bool {
460                use crate::data_storage::Storage;
461                $crate::generate_inner_storage_internal!(extend: self, ty, items: $($config)*);
462                self.dynamic.extend(ty, items)
463            }
464
465            pub fn insert(&mut self, item: CachedDataItem) -> Option<CachedDataItemValue> {
466                use crate::data_storage::Storage;
467                $crate::generate_inner_storage_internal!(CachedDataItem: self, item, value, option_value, insert(value): $($config)*);
468                self.dynamic.insert(item)
469            }
470
471            pub fn remove(&mut self, key: &CachedDataItemKey) -> Option<CachedDataItemValue> {
472                use crate::data_storage::Storage;
473                $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_value, remove(): $($config)*);
474                self.dynamic.remove(key)
475            }
476
477            pub fn count(&self, ty: CachedDataItemType) -> usize {
478                use crate::data_storage::Storage;
479                $crate::generate_inner_storage_internal!(CachedDataItemType: self, ty, none, len(): $($config)*);
480                self.dynamic.count(ty)
481            }
482
483            pub fn get(&self, key: &CachedDataItemKey) -> Option<CachedDataItemValueRef<'_>> {
484                use crate::data_storage::Storage;
485                $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_ref, get(): $($config)*);
486                self.dynamic.get(key)
487            }
488
489            pub fn contains_key(&self, key: &CachedDataItemKey) -> bool {
490                use crate::data_storage::Storage;
491                $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, none, contains_key(): $($config)*);
492                self.dynamic.contains_key(key)
493            }
494
495            pub fn get_mut(&mut self, key: &CachedDataItemKey) -> Option<CachedDataItemValueRefMut<'_>> {
496                use crate::data_storage::Storage;
497                $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_ref_mut, get_mut(): $($config)*);
498                self.dynamic.get_mut(key)
499            }
500
501            pub fn shrink_to_fit(&mut self, ty: CachedDataItemType) {
502                use crate::data_storage::Storage;
503                $crate::generate_inner_storage_internal!(CachedDataItemType: self, ty, none, shrink_to_fit(): $($config)*);
504                self.dynamic.shrink_to_fit(ty)
505            }
506
507            pub fn update(
508                &mut self,
509                key: CachedDataItemKey,
510                update: impl FnOnce(Option<CachedDataItemValue>) -> Option<CachedDataItemValue>,
511            ) {
512                use crate::data_storage::Storage;
513                $crate::generate_inner_storage_internal!(update: self, key, update: $($config)*);
514                self.dynamic.update(key, update)
515            }
516
517            pub fn extract_if<'l, F>(
518                &'l mut self,
519                ty: CachedDataItemType,
520                mut f: F,
521            ) -> impl Iterator<Item = CachedDataItem> + use<'l, F>
522            where
523                F: for<'a> FnMut(CachedDataItemKey, CachedDataItemValueRef<'a>) -> bool + 'l,
524            {
525                use crate::data_storage::Storage;
526                $crate::generate_inner_storage_internal!(extract_if: self, ty, f: $($config)*);
527                InnerStorageIter::Dynamic(self.dynamic.extract_if(ty, f))
528            }
529
530            pub fn get_mut_or_insert_with(
531                &mut self,
532                key: CachedDataItemKey,
533                f: impl FnOnce() -> CachedDataItemValue,
534            ) -> CachedDataItemValueRefMut<'_>
535            {
536                use crate::data_storage::Storage;
537                $crate::generate_inner_storage_internal!(get_mut_or_insert_with: self, key, f: $($config)*);
538                self.dynamic.get_mut_or_insert_with(key, f)
539            }
540
541            pub fn iter(
542                &self,
543                ty: CachedDataItemType,
544            ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)>
545            {
546                use crate::data_storage::Storage;
547                $crate::generate_inner_storage_internal!(iter: self, ty: $($config)*);
548                InnerStorageIter::Dynamic(self.dynamic.iter(ty))
549            }
550
551        }
552    };
553}
554
555generate_inner_storage!(
556    AggregationNumber => aggregation_number,
557    OutputDependent task => output_dependent,
558    Output => output,
559    Upper task => upper,
560);
561
562enum InnerStorageIter<A, B, C, D, E> {
563    AggregationNumber(A),
564    OutputDependent(B),
565    Output(C),
566    Upper(D),
567    Dynamic(E),
568}
569
570impl<T, A, B, C, D, E> Iterator for InnerStorageIter<A, B, C, D, E>
571where
572    A: Iterator<Item = T>,
573    B: Iterator<Item = T>,
574    C: Iterator<Item = T>,
575    D: Iterator<Item = T>,
576    E: Iterator<Item = T>,
577{
578    type Item = T;
579
580    fn next(&mut self) -> Option<Self::Item> {
581        match self {
582            InnerStorageIter::AggregationNumber(iter) => iter.next(),
583            InnerStorageIter::OutputDependent(iter) => iter.next(),
584            InnerStorageIter::Output(iter) => iter.next(),
585            InnerStorageIter::Upper(iter) => iter.next(),
586            InnerStorageIter::Dynamic(iter) => iter.next(),
587        }
588    }
589}
590
591impl InnerStorage {
592    pub fn iter_all(
593        &self,
594    ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)> {
595        use crate::data_storage::Storage;
596        self.dynamic
597            .iter_all()
598            .chain(self.aggregation_number.iter().map(|(_, value)| {
599                (
600                    CachedDataItemKey::AggregationNumber {},
601                    CachedDataItemValueRef::AggregationNumber { value },
602                )
603            }))
604            .chain(self.output.iter().map(|(_, value)| {
605                (
606                    CachedDataItemKey::Output {},
607                    CachedDataItemValueRef::Output { value },
608                )
609            }))
610            .chain(self.upper.iter().map(|(k, value)| {
611                (
612                    CachedDataItemKey::Upper { task: *k },
613                    CachedDataItemValueRef::Upper { value },
614                )
615            }))
616            .chain(self.output_dependent.iter().map(|(k, value)| {
617                (
618                    CachedDataItemKey::OutputDependent { task: *k },
619                    CachedDataItemValueRef::OutputDependent { value },
620                )
621            }))
622    }
623
624    pub fn len(&self) -> usize {
625        use crate::data_storage::Storage;
626        self.dynamic.len()
627            + self.aggregation_number.len()
628            + self.output.len()
629            + self.upper.len()
630            + self.output_dependent.len()
631    }
632}
633
634enum ModifiedState {
635    /// It was modified before snapshot mode was entered, but it was not accessed during snapshot
636    /// mode.
637    Modified,
638    /// Snapshot(Some):
639    /// It was modified before snapshot mode was entered and it was accessed again during snapshot
640    /// mode. A copy of the version of the item when snapshot mode was entered is stored here.
641    /// Snapshot(None):
642    /// It was not modified before snapshot mode was entered, but it was accessed during snapshot
643    /// mode. Or the snapshot was already taken out by the snapshot operation.
644    Snapshot(Option<Box<InnerStorageSnapshot>>),
645}
646
647pub struct Storage {
648    snapshot_mode: AtomicBool,
649    modified: FxDashMap<TaskId, ModifiedState>,
650    map: FxDashMap<TaskId, Box<InnerStorage>>,
651}
652
653impl Storage {
654    pub fn new(shard_amount: usize, small_preallocation: bool) -> Self {
655        let map_capacity: usize = if small_preallocation {
656            1024
657        } else {
658            1024 * 1024
659        };
660        let modified_capacity: usize = if small_preallocation { 0 } else { 1024 };
661
662        Self {
663            snapshot_mode: AtomicBool::new(false),
664            modified: FxDashMap::with_capacity_and_hasher_and_shard_amount(
665                modified_capacity,
666                Default::default(),
667                shard_amount,
668            ),
669            map: FxDashMap::with_capacity_and_hasher_and_shard_amount(
670                map_capacity,
671                Default::default(),
672                shard_amount,
673            ),
674        }
675    }
676
677    /// Processes every modified item (resp. a snapshot of it) with the given functions and returns
678    /// the results. Ends snapshot mode afterwards.
679    /// preprocess is potentially called within a lock, so it should be fast.
680    /// process is called outside of locks, so it could do more expensive operations.
681    pub fn take_snapshot<
682        'l,
683        T,
684        R,
685        PP: for<'a> Fn(TaskId, &'a InnerStorage) -> T + Sync,
686        P: Fn(TaskId, T) -> R + Sync,
687        PS: Fn(TaskId, Box<InnerStorageSnapshot>) -> R + Sync,
688    >(
689        &'l self,
690        preprocess: &'l PP,
691        process: &'l P,
692        process_snapshot: &'l PS,
693    ) -> Vec<SnapshotShard<'l, PP, P, PS>> {
694        if !self.snapshot_mode() {
695            self.start_snapshot();
696        }
697
698        let guard = Arc::new(SnapshotGuard { storage: self });
699
700        // The number of shards is much larger than the number of threads, so the effect of the
701        // locks held is negligible.
702        parallel::map_collect::<_, _, Vec<_>>(self.modified.shards(), |shard| {
703            let mut direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)> = Vec::new();
704            let mut modified: SmallVec<[TaskId; 4]> = SmallVec::new();
705            {
706                // Take the snapshots from the modified map
707                let guard = shard.write();
708                // Safety: guard must outlive the iterator.
709                for bucket in unsafe { guard.iter() } {
710                    // Safety: the guard guarantees that the bucket is not removed and the ptr
711                    // is valid.
712                    let (key, shared_value) = unsafe { bucket.as_mut() };
713                    let modified_state = shared_value.get_mut();
714                    match modified_state {
715                        ModifiedState::Modified => {
716                            modified.push(*key);
717                        }
718                        ModifiedState::Snapshot(snapshot) => {
719                            if let Some(snapshot) = snapshot.take() {
720                                direct_snapshots.push((*key, snapshot));
721                            }
722                        }
723                    }
724                }
725                // Safety: guard must outlive the iterator.
726                drop(guard);
727            }
728
729            SnapshotShard {
730                direct_snapshots,
731                modified,
732                storage: self,
733                guard: Some(guard.clone()),
734                process,
735                preprocess,
736                process_snapshot,
737            }
738        })
739    }
740
741    /// Start snapshot mode.
742    pub fn start_snapshot(&self) {
743        self.snapshot_mode
744            .store(true, std::sync::atomic::Ordering::Release);
745    }
746
747    /// End snapshot mode.
748    /// Items that have snapshots will be kept as modified since they have been accessed during the
749    /// snapshot mode. Items that are modified will be removed and considered as unmodified.
750    /// When items are accessed in future they will be marked as modified.
751    fn end_snapshot(&self) {
752        // We are still in snapshot mode, so all accessed items would be stored as snapshot.
753        // This means we can start by removing all modified items.
754        let mut removed_modified = Vec::new();
755        self.modified.retain(|key, inner| {
756            if matches!(inner, ModifiedState::Modified) {
757                removed_modified.push(*key);
758                false
759            } else {
760                true
761            }
762        });
763
764        // We also need to unset all the modified flags.
765        for key in removed_modified {
766            if let Some(mut inner) = self.map.get_mut(&key) {
767                let state = inner.state_mut();
768                state.set_data_modified(false);
769                state.set_meta_modified(false);
770            }
771        }
772
773        // Now modified only contains snapshots.
774        // We leave snapshot mode. Any access would be stored as modified and not as snapshot.
775        self.snapshot_mode
776            .store(false, std::sync::atomic::Ordering::Release);
777
778        // We can change all the snapshots to modified now.
779        let mut removed_snapshots = Vec::new();
780        for mut item in self.modified.iter_mut() {
781            match item.value() {
782                ModifiedState::Snapshot(_) => {
783                    removed_snapshots.push(*item.key());
784                    *item.value_mut() = ModifiedState::Modified;
785                }
786                ModifiedState::Modified => {
787                    // This means it was concurrently modified.
788                    // It's already in the correct state.
789                }
790            }
791        }
792
793        // And update the flags
794        for key in removed_snapshots {
795            if let Some(mut inner) = self.map.get_mut(&key) {
796                let state = inner.state_mut();
797                if state.meta_snapshot() {
798                    state.set_meta_snapshot(false);
799                    state.set_meta_modified(true);
800                }
801                if state.data_snapshot() {
802                    state.set_data_snapshot(false);
803                    state.set_data_modified(true);
804                }
805            }
806        }
807
808        // Remove excessive capacity in modified
809        self.modified.shrink_to_fit();
810    }
811
812    fn snapshot_mode(&self) -> bool {
813        self.snapshot_mode
814            .load(std::sync::atomic::Ordering::Acquire)
815    }
816
817    pub fn access_mut(&self, key: TaskId) -> StorageWriteGuard<'_> {
818        let inner = match self.map.entry(key) {
819            dashmap::mapref::entry::Entry::Occupied(e) => e.into_ref(),
820            dashmap::mapref::entry::Entry::Vacant(e) => e.insert(Box::new(InnerStorage::new())),
821        };
822        StorageWriteGuard {
823            storage: self,
824            inner: inner.into(),
825        }
826    }
827
828    pub fn access_pair_mut(
829        &self,
830        key1: TaskId,
831        key2: TaskId,
832    ) -> (StorageWriteGuard<'_>, StorageWriteGuard<'_>) {
833        let (a, b) = get_multiple_mut(&self.map, key1, key2, || Box::new(InnerStorage::new()));
834        (
835            StorageWriteGuard {
836                storage: self,
837                inner: a,
838            },
839            StorageWriteGuard {
840                storage: self,
841                inner: b,
842            },
843        )
844    }
845
846    pub fn drop_contents(&self) {
847        drop_contents(&self.map);
848        drop_contents(&self.modified);
849    }
850}
851
852pub struct StorageWriteGuard<'a> {
853    storage: &'a Storage,
854    inner: RefMut<'a, TaskId, Box<InnerStorage>>,
855}
856
857impl StorageWriteGuard<'_> {
858    /// Tracks mutation of this task
859    pub fn track_modification(&mut self, category: SpecificTaskDataCategory) {
860        let state = self.inner.state();
861        let snapshot = match category {
862            SpecificTaskDataCategory::Meta => state.meta_snapshot(),
863            SpecificTaskDataCategory::Data => state.data_snapshot(),
864        };
865        if !snapshot {
866            let modified = match category {
867                SpecificTaskDataCategory::Meta => state.meta_modified(),
868                SpecificTaskDataCategory::Data => state.data_modified(),
869            };
870            match (self.storage.snapshot_mode(), modified) {
871                (false, false) => {
872                    // Not in snapshot mode and item is unmodified
873                    if !state.any_snapshot() && !state.any_modified() {
874                        self.storage
875                            .modified
876                            .insert(*self.inner.key(), ModifiedState::Modified);
877                    }
878                    let state = self.inner.state_mut();
879                    match category {
880                        SpecificTaskDataCategory::Meta => state.set_meta_modified(true),
881                        SpecificTaskDataCategory::Data => state.set_data_modified(true),
882                    }
883                }
884                (false, true) => {
885                    // Not in snapshot mode and item is already modified
886                    // Do nothing
887                }
888                (true, false) => {
889                    // In snapshot mode and item is unmodified (so it's not part of the snapshot)
890                    if !state.any_snapshot() {
891                        self.storage
892                            .modified
893                            .insert(*self.inner.key(), ModifiedState::Snapshot(None));
894                    }
895                    let state = self.inner.state_mut();
896                    match category {
897                        SpecificTaskDataCategory::Meta => state.set_meta_snapshot(true),
898                        SpecificTaskDataCategory::Data => state.set_data_snapshot(true),
899                    }
900                }
901                (true, true) => {
902                    // In snapshot mode and item is modified (so it's part of the snapshot)
903                    // We need to store the original version that is part of the snapshot
904                    if !state.any_snapshot() {
905                        self.storage.modified.insert(
906                            *self.inner.key(),
907                            ModifiedState::Snapshot(Some(Box::new((&**self.inner).into()))),
908                        );
909                    }
910                    let state = self.inner.state_mut();
911                    match category {
912                        SpecificTaskDataCategory::Meta => state.set_meta_snapshot(true),
913                        SpecificTaskDataCategory::Data => state.set_data_snapshot(true),
914                    }
915                }
916            }
917        }
918    }
919}
920
921impl Deref for StorageWriteGuard<'_> {
922    type Target = InnerStorage;
923
924    fn deref(&self) -> &Self::Target {
925        &self.inner
926    }
927}
928
929impl DerefMut for StorageWriteGuard<'_> {
930    fn deref_mut(&mut self) -> &mut Self::Target {
931        &mut self.inner
932    }
933}
934
935macro_rules! count {
936    ($task:ident, $key:ident) => {{ $task.count($crate::data::CachedDataItemType::$key) }};
937}
938
939macro_rules! get {
940    ($task:ident, $key:ident $input:tt) => {{
941        #[allow(unused_imports)]
942        use $crate::backend::operation::TaskGuard;
943        if let Some($crate::data::CachedDataItemValueRef::$key {
944            value,
945        }) = $task.get(&$crate::data::CachedDataItemKey::$key $input) {
946            Some(value)
947        } else {
948            None
949        }
950    }};
951    ($task:ident, $key:ident) => {
952        $crate::backend::storage::get!($task, $key {})
953    };
954}
955
956macro_rules! get_mut {
957    ($task:ident, $key:ident $input:tt) => {{
958        #[allow(unused_imports)]
959        use $crate::backend::operation::TaskGuard;
960        if let Some($crate::data::CachedDataItemValueRefMut::$key {
961            value,
962        }) = $task.get_mut(&$crate::data::CachedDataItemKey::$key $input) {
963            let () = $crate::data::allow_mut_access::$key;
964            Some(value)
965        } else {
966            None
967        }
968    }};
969    ($task:ident, $key:ident) => {
970        $crate::backend::storage::get_mut!($task, $key {})
971    };
972}
973
974macro_rules! get_mut_or_insert_with {
975    ($task:ident, $key:ident $input:tt, $f:expr) => {{
976        #[allow(unused_imports)]
977        use $crate::backend::operation::TaskGuard;
978        let () = $crate::data::allow_mut_access::$key;
979        let functor = $f;
980        let $crate::data::CachedDataItemValueRefMut::$key {
981            value,
982        } = $task.get_mut_or_insert_with($crate::data::CachedDataItemKey::$key $input, move || $crate::data::CachedDataItemValue::$key { value: functor() }) else {
983            unreachable!()
984        };
985        value
986    }};
987    ($task:ident, $key:ident, $f:expr) => {
988        $crate::backend::storage::get_mut_or_insert_with!($task, $key {}, $f)
989    };
990}
991
992/// Creates an iterator over all [`CachedDataItemKey::$key`][crate::data::CachedDataItemKey]s in
993/// `$task` matching the given `$key_pattern`, optional `$value_pattern`, and optional `if $cond`.
994///
995/// Each element in the iterator is determined by `$iter_item`, which may use fields extracted by
996/// `$key_pattern` or `$value_pattern`.
997macro_rules! iter_many {
998    ($task:ident, $key:ident $key_pattern:tt $(if $cond:expr)? => $iter_item:expr) => {{
999        #[allow(unused_imports)]
1000        use $crate::backend::operation::TaskGuard;
1001        $task
1002            .iter($crate::data::CachedDataItemType::$key)
1003            .filter_map(|(key, _)| match key {
1004                $crate::data::CachedDataItemKey::$key $key_pattern $(if $cond)? => Some(
1005                    $iter_item
1006                ),
1007                _ => None,
1008            })
1009    }};
1010    ($task:ident, $key:ident $input:tt $value_pattern:tt $(if $cond:expr)? => $iter_item:expr) => {{
1011        #[allow(unused_imports)]
1012        use $crate::backend::operation::TaskGuard;
1013        $task
1014            .iter($crate::data::CachedDataItemType::$key)
1015            .filter_map(|(key, value)| match (key, value) {
1016                (
1017                    $crate::data::CachedDataItemKey::$key $input,
1018                    $crate::data::CachedDataItemValueRef::$key { value: $value_pattern }
1019                ) $(if $cond)? => Some($iter_item),
1020                _ => None,
1021            })
1022    }};
1023}
1024
1025/// A thin wrapper around [`iter_many`] that calls [`Iterator::collect`].
1026///
1027/// Note that the return type of [`Iterator::collect`] may be ambiguous in certain contexts, so
1028/// using this macro may require explicit type annotations on variables.
1029macro_rules! get_many {
1030    ($($args:tt)*) => {
1031        $crate::backend::storage::iter_many!($($args)*).collect()
1032    };
1033}
1034
1035macro_rules! update {
1036    ($task:ident, $key:ident $input:tt, $update:expr) => {{
1037        #[allow(unused_imports)]
1038        use $crate::backend::operation::TaskGuard;
1039        #[allow(unused_mut)]
1040        let mut update = $update;
1041        $task.update($crate::data::CachedDataItemKey::$key $input, |old| {
1042            update(old.and_then(|old| {
1043                if let $crate::data::CachedDataItemValue::$key { value } = old {
1044                    Some(value)
1045                } else {
1046                    None
1047                }
1048            }))
1049            .map(|new| $crate::data::CachedDataItemValue::$key { value: new })
1050        })
1051    }};
1052    ($task:ident, $key:ident, $update:expr) => {
1053        $crate::backend::storage::update!($task, $key {}, $update)
1054    };
1055}
1056
1057macro_rules! update_count {
1058    ($task:ident, $key:ident $input:tt, -$update:expr) => {{
1059        let update = $update;
1060        let mut state_change = false;
1061        $crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
1062            #[allow(unused_comparisons, reason = "type of update might be unsigned, where update < 0 is always false")]
1063            if let Some(old) = old {
1064                let new = old - update;
1065                state_change = old <= 0 && new > 0 || old > 0 && new <= 0;
1066                (new != 0).then_some(new)
1067            } else {
1068                state_change = update < 0;
1069                (update != 0).then_some(-update)
1070            }
1071        });
1072        state_change
1073    }};
1074    ($task:ident, $key:ident $input:tt, $update:expr) => {
1075        match $update {
1076            update => {
1077                let mut state_change = false;
1078                $crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
1079                    if let Some(old) = old {
1080                        let new = old + update;
1081                        state_change = old <= 0 && new > 0 || old > 0 && new <= 0;
1082                        (new != 0).then_some(new)
1083                    } else {
1084                        state_change = update > 0;
1085                        (update != 0).then_some(update)
1086                    }
1087                });
1088                state_change
1089            }
1090        }
1091    };
1092    ($task:ident, $key:ident, -$update:expr) => {
1093        $crate::backend::storage::update_count!($task, $key {}, -$update)
1094    };    ($task:ident, $key:ident, $update:expr) => {
1095        $crate::backend::storage::update_count!($task, $key {}, $update)
1096    };
1097}
1098
1099macro_rules! remove {
1100    ($task:ident, $key:ident $input:tt) => {{
1101        #[allow(unused_imports)]
1102        use $crate::backend::operation::TaskGuard;
1103        if let Some($crate::data::CachedDataItemValue::$key { value }) = $task.remove(
1104            &$crate::data::CachedDataItemKey::$key $input
1105        ) {
1106            Some(value)
1107        } else {
1108            None
1109        }
1110    }};
1111    ($task:ident, $key:ident) => {
1112        $crate::backend::storage::remove!($task, $key {})
1113    };
1114}
1115
1116pub(crate) use count;
1117pub(crate) use get;
1118pub(crate) use get_many;
1119pub(crate) use get_mut;
1120pub(crate) use get_mut_or_insert_with;
1121pub(crate) use iter_many;
1122pub(crate) use remove;
1123pub(crate) use update;
1124pub(crate) use update_count;
1125
1126pub struct SnapshotGuard<'l> {
1127    storage: &'l Storage,
1128}
1129
1130impl Drop for SnapshotGuard<'_> {
1131    fn drop(&mut self) {
1132        self.storage.end_snapshot();
1133    }
1134}
1135
1136pub struct SnapshotShard<'l, PP, P, PS> {
1137    direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)>,
1138    modified: SmallVec<[TaskId; 4]>,
1139    storage: &'l Storage,
1140    guard: Option<Arc<SnapshotGuard<'l>>>,
1141    process: &'l P,
1142    preprocess: &'l PP,
1143    process_snapshot: &'l PS,
1144}
1145
1146impl<'l, T, R, PP, P, PS> Iterator for SnapshotShard<'l, PP, P, PS>
1147where
1148    PP: for<'a> Fn(TaskId, &'a InnerStorage) -> T + Sync,
1149    P: Fn(TaskId, T) -> R + Sync,
1150    PS: Fn(TaskId, Box<InnerStorageSnapshot>) -> R + Sync,
1151{
1152    type Item = R;
1153
1154    fn next(&mut self) -> Option<Self::Item> {
1155        if let Some((task_id, snapshot)) = self.direct_snapshots.pop() {
1156            return Some((self.process_snapshot)(task_id, snapshot));
1157        }
1158        while let Some(task_id) = self.modified.pop() {
1159            let inner = self.storage.map.get(&task_id).unwrap();
1160            let state = inner.state();
1161            if !state.any_snapshot() {
1162                let preprocessed = (self.preprocess)(task_id, &inner);
1163                drop(inner);
1164                return Some((self.process)(task_id, preprocessed));
1165            } else {
1166                drop(inner);
1167                let maybe_snapshot = {
1168                    let mut modified_state = self.storage.modified.get_mut(&task_id).unwrap();
1169                    let ModifiedState::Snapshot(snapshot) = &mut *modified_state else {
1170                        unreachable!("The snapshot bit was set, so it must be in Snapshot state");
1171                    };
1172                    snapshot.take()
1173                };
1174                if let Some(snapshot) = maybe_snapshot {
1175                    return Some((self.process_snapshot)(task_id, snapshot));
1176                }
1177            }
1178        }
1179        self.guard = None;
1180        None
1181    }
1182}