1use std::{
2 hash::Hash,
3 ops::{Deref, DerefMut},
4 sync::{Arc, atomic::AtomicBool},
5 thread::available_parallelism,
6};
7
8use bitfield::bitfield;
9use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
10use smallvec::SmallVec;
11use turbo_tasks::{FxDashMap, TaskId};
12
13use crate::{
14 backend::dynamic_storage::DynamicStorage,
15 data::{
16 AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
17 CachedDataItemValue, CachedDataItemValueRef, CachedDataItemValueRefMut, OutputValue,
18 },
19 data_storage::{AutoMapStorage, OptionStorage},
20 utils::dash_map_multi::{RefMut, get_multiple_mut},
21};
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24pub enum TaskDataCategory {
25 Meta,
26 Data,
27 All,
28}
29
30impl TaskDataCategory {
31 pub fn into_specific(self) -> SpecificTaskDataCategory {
32 match self {
33 TaskDataCategory::Meta => SpecificTaskDataCategory::Meta,
34 TaskDataCategory::Data => SpecificTaskDataCategory::Data,
35 TaskDataCategory::All => unreachable!(),
36 }
37 }
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
41pub enum SpecificTaskDataCategory {
42 Meta,
43 Data,
44}
45
46impl IntoIterator for TaskDataCategory {
47 type Item = TaskDataCategory;
48
49 type IntoIter = TaskDataCategoryIterator;
50
51 fn into_iter(self) -> Self::IntoIter {
52 match self {
53 TaskDataCategory::Meta => TaskDataCategoryIterator::Meta,
54 TaskDataCategory::Data => TaskDataCategoryIterator::Data,
55 TaskDataCategory::All => TaskDataCategoryIterator::All,
56 }
57 }
58}
59
60pub enum TaskDataCategoryIterator {
61 All,
62 Meta,
63 Data,
64 None,
65}
66
67impl Iterator for TaskDataCategoryIterator {
68 type Item = TaskDataCategory;
69
70 fn next(&mut self) -> Option<Self::Item> {
71 match self {
72 TaskDataCategoryIterator::All => {
73 *self = TaskDataCategoryIterator::Data;
74 Some(TaskDataCategory::Meta)
75 }
76 TaskDataCategoryIterator::Meta => {
77 *self = TaskDataCategoryIterator::None;
78 Some(TaskDataCategory::Meta)
79 }
80 TaskDataCategoryIterator::Data => {
81 *self = TaskDataCategoryIterator::None;
82 Some(TaskDataCategory::Data)
83 }
84 TaskDataCategoryIterator::None => None,
85 }
86 }
87}
88
89bitfield! {
90 #[derive(Clone, Default)]
92 pub struct InnerStorageState(u32);
93 impl Debug;
94 pub meta_restored, set_meta_restored: 0;
95 pub data_restored, set_data_restored: 1;
96 pub meta_modified, set_meta_modified: 2;
98 pub data_modified, set_data_modified: 3;
99 pub meta_snapshot, set_meta_snapshot: 4;
101 pub data_snapshot, set_data_snapshot: 5;
102}
103
104impl InnerStorageState {
105 pub fn set_restored(&mut self, category: TaskDataCategory) {
106 match category {
107 TaskDataCategory::Meta => {
108 self.set_meta_restored(true);
109 }
110 TaskDataCategory::Data => {
111 self.set_data_restored(true);
112 }
113 TaskDataCategory::All => {
114 self.set_meta_restored(true);
115 self.set_data_restored(true);
116 }
117 }
118 }
119
120 pub fn is_restored(&self, category: TaskDataCategory) -> bool {
121 match category {
122 TaskDataCategory::Meta => self.meta_restored(),
123 TaskDataCategory::Data => self.data_restored(),
124 TaskDataCategory::All => self.meta_restored() && self.data_restored(),
125 }
126 }
127
128 pub fn any_snapshot(&self) -> bool {
129 self.meta_snapshot() || self.data_snapshot()
130 }
131
132 pub fn any_modified(&self) -> bool {
133 self.meta_modified() || self.data_modified()
134 }
135}
136
137pub struct InnerStorageSnapshot {
138 aggregation_number: OptionStorage<AggregationNumber>,
139 output_dependent: AutoMapStorage<TaskId, ()>,
140 output: OptionStorage<OutputValue>,
141 upper: AutoMapStorage<TaskId, u32>,
142 dynamic: DynamicStorage,
143 pub meta_modified: bool,
144 pub data_modified: bool,
145}
146
147impl From<&InnerStorage> for InnerStorageSnapshot {
148 fn from(inner: &InnerStorage) -> Self {
149 Self {
150 aggregation_number: inner.aggregation_number.clone(),
151 output_dependent: inner.output_dependent.clone(),
152 output: inner.output.clone(),
153 upper: inner.upper.clone(),
154 dynamic: inner.dynamic.snapshot_for_persisting(),
155 meta_modified: inner.state.meta_modified(),
156 data_modified: inner.state.data_modified(),
157 }
158 }
159}
160
161impl InnerStorageSnapshot {
162 pub fn iter_all(
163 &self,
164 ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)> {
165 use crate::data_storage::Storage;
166 self.dynamic
167 .iter_all()
168 .chain(self.aggregation_number.iter().map(|(_, value)| {
169 (
170 CachedDataItemKey::AggregationNumber {},
171 CachedDataItemValueRef::AggregationNumber { value },
172 )
173 }))
174 .chain(self.output.iter().map(|(_, value)| {
175 (
176 CachedDataItemKey::Output {},
177 CachedDataItemValueRef::Output { value },
178 )
179 }))
180 .chain(self.upper.iter().map(|(k, value)| {
181 (
182 CachedDataItemKey::Upper { task: *k },
183 CachedDataItemValueRef::Upper { value },
184 )
185 }))
186 .chain(self.output_dependent.iter().map(|(k, value)| {
187 (
188 CachedDataItemKey::OutputDependent { task: *k },
189 CachedDataItemValueRef::OutputDependent { value },
190 )
191 }))
192 }
193
194 pub fn len(&self) -> usize {
195 use crate::data_storage::Storage;
196 self.dynamic.len()
197 + self.aggregation_number.len()
198 + self.output.len()
199 + self.upper.len()
200 + self.output_dependent.len()
201 }
202}
203
204#[derive(Debug, Clone)]
205pub struct InnerStorage {
206 aggregation_number: OptionStorage<AggregationNumber>,
207 output_dependent: AutoMapStorage<TaskId, ()>,
208 output: OptionStorage<OutputValue>,
209 upper: AutoMapStorage<TaskId, u32>,
210 dynamic: DynamicStorage,
211 state: InnerStorageState,
212}
213
214impl InnerStorage {
215 fn new() -> Self {
216 Self {
217 aggregation_number: Default::default(),
218 output_dependent: Default::default(),
219 output: Default::default(),
220 upper: Default::default(),
221 dynamic: DynamicStorage::new(),
222 state: InnerStorageState::default(),
223 }
224 }
225
226 pub fn state(&self) -> &InnerStorageState {
227 &self.state
228 }
229
230 pub fn state_mut(&mut self) -> &mut InnerStorageState {
231 &mut self.state
232 }
233}
234
235#[macro_export]
236macro_rules! generate_inner_storage_internal {
237 (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $key_field:ident => $field:ident,) => {
239 if let CachedDataItem::$tag { $key_field, $value } = $item {
240 let result = $self.$field.$fn($key_field, $($args)*);
241 return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $key_field => $field);
242 }
243 };
244 (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident => $field:ident,) => {
245 if let CachedDataItem::$tag { $value } = $item {
246 let result = $self.$field.$fn((), $($args)*);
247 return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag => $field);
248 }
249 };
250 (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
251 $crate::generate_inner_storage_internal!(CachedDataItem: $self, $item, $value, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
252 $crate::generate_inner_storage_internal!(CachedDataItem: $self, $item, $value, $return_ty, $fn($($args)*): $($config)+)
253 };
254 (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $key_field:ident => $field:ident,) => {
256 if let CachedDataItemKey::$tag { $key_field } = $item {
257 let result = $self.$field.$fn($key_field, $($args)*);
258 return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $key_field => $field);
259 }
260 };
261 (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident => $field:ident,) => {
262 if let CachedDataItemKey::$tag { } = $item {
263 let result = $self.$field.$fn(&(), $($args)*);
264 return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag => $field);
265 }
266 };
267 (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
268 $crate::generate_inner_storage_internal!(CachedDataItemKey: $self, $item, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
269 $crate::generate_inner_storage_internal!(CachedDataItemKey: $self, $item, $return_ty, $fn($($args)*): $($config)+)
270 };
271 (CachedDataItemType: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident,) => {
273 if let CachedDataItemType::$tag = $item {
274 let result = $self.$field.$fn($($args)*);
275 return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $($key_field)? => $field);
276 }
277 };
278 (CachedDataItemType: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
279 $crate::generate_inner_storage_internal!(CachedDataItemType: $self, $item, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
280 $crate::generate_inner_storage_internal!(CachedDataItemType: $self, $item, $return_ty, $fn($($args)*): $($config)+)
281 };
282
283 (update: $self:ident, $key:ident, $update:ident: $tag:ident $key_field:ident => $field:ident,) => {
285 if let CachedDataItemKey::$tag { $key_field } = $key {
286 $self.$field.update($key_field, |old| {
287 let old = old.map(|old| CachedDataItemValue::$tag { value: old });
288 let new = $update(old);
289 new.map(|new| if let CachedDataItemValue::$tag { value } = new {
290 value
291 } else {
292 unreachable!()
293 })
294 });
295 return;
296 }
297 };
298 (update: $self:ident, $key:ident, $update:ident: $tag:ident => $field:ident,) => {
299 if let CachedDataItemKey::$tag { } = $key {
300 $self.$field.update((), |old| {
301 let old = old.map(|old| CachedDataItemValue::$tag { value: old });
302 let new = $update(old);
303 new.map(|new| if let CachedDataItemValue::$tag { value } = new {
304 value
305 } else {
306 unreachable!()
307 })
308 });
309 return;
310 }
311 };
312 (update: $self:ident, $key:ident, $update:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
313 $crate::generate_inner_storage_internal!(update: $self, $key, $update: $tag $($key_field)? => $field,);
314 $crate::generate_inner_storage_internal!(update: $self, $key, $update: $($config)+)
315 };
316
317 (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident $key_field:ident => $field:ident,) => {
319 if let CachedDataItemKey::$tag { $key_field } = $key {
320 let value = $self.$field.get_mut_or_insert_with($key_field, || {
321 let value = $insert_with();
322 if let CachedDataItemValue::$tag { value } = value {
323 value
324 } else {
325 unreachable!()
326 }
327 });
328 return CachedDataItemValueRefMut::$tag { value };
329 }
330 };
331 (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident => $field:ident,) => {
332 if let CachedDataItemKey::$tag { } = $key {
333 let value = $self.$field.get_mut_or_insert_with((), || {
334 let value = $insert_with();
335 if let CachedDataItemValue::$tag { value } = value {
336 value
337 } else {
338 unreachable!()
339 }
340 });
341 return CachedDataItemValueRefMut::$tag { value };
342 }
343 };
344 (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
345 $crate::generate_inner_storage_internal!(get_mut_or_insert_with: $self, $key, $insert_with: $tag $($key_field)? => $field,);
346 $crate::generate_inner_storage_internal!(get_mut_or_insert_with: $self, $key, $insert_with: $($config)+)
347 };
348
349 (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident $key_field:ident => $field:ident,) => {
351 if let CachedDataItemType::$tag = $ty {
352 let iter = $self.$field.extract_if(move |key, value| {
353 $f(CachedDataItemKey::$tag { $key_field: *key }, CachedDataItemValueRef::$tag { value })
354 }).map(|($key_field, value)| CachedDataItem::$tag { $key_field, value });
355 return InnerStorageIter::$tag(iter);
356 }
357 };
358 (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident => $field:ident,) => {
359 if let CachedDataItemType::$tag = $ty {
360 let iter = $self.$field.extract_if(move |_, value| {
361 $f(CachedDataItemKey::$tag { }, CachedDataItemValueRef::$tag { value })
362 }).map(|(_, value)| CachedDataItem::$tag { value });
363 return InnerStorageIter::$tag(iter);
364 }
365 };
366 (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
367 $crate::generate_inner_storage_internal!(extract_if: $self, $ty, $f: $tag $($key_field)? => $field,);
368 $crate::generate_inner_storage_internal!(extract_if: $self, $ty, $f: $($config)+)
369 };
370
371 (iter: $self:ident, $ty:ident: $tag:ident $key_field:ident => $field:ident,) => {
373 if let CachedDataItemType::$tag = $ty {
374 let iter = $self.$field.iter().map(|($key_field, value)| (CachedDataItemKey::$tag { $key_field: *$key_field }, CachedDataItemValueRef::$tag { value }));
375 return InnerStorageIter::$tag(iter);
376 }
377 };
378 (iter: $self:ident, $ty:ident: $tag:ident => $field:ident,) => {
379 if let CachedDataItemType::$tag = $ty {
380 let iter = $self.$field.iter().map(|(_, value)| (CachedDataItemKey::$tag { }, CachedDataItemValueRef::$tag { value }));
381 return InnerStorageIter::$tag(iter);
382 }
383 };
384 (iter: $self:ident, $ty:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
385 $crate::generate_inner_storage_internal!(iter: $self, $ty: $tag $($key_field)? => $field,);
386 $crate::generate_inner_storage_internal!(iter: $self, $ty: $($config)+)
387 };
388
389
390 (return_value: $result:ident, none: $($more:tt)*) => {
392 $result
393 };
394 (return_value: $result:ident, option_value: $tag:ident $($more:tt)*) => {
395 $result.map(|value| CachedDataItemValue::$tag { value })
396 };
397 (return_value: $result:ident, option_ref: $tag:ident $($more:tt)*) => {
398 $result.map(|value| CachedDataItemValueRef::$tag { value })
399 };
400 (return_value: $result:ident, option_ref_mut: $tag:ident $($more:tt)*) => {
401 $result.map(|value| CachedDataItemValueRefMut::$tag { value })
402 };
403
404 (input_value: $input:ident, option_value: $tag:ident $($more:tt)*) => {
406 $input.map(|value| {
407 if let CachedDataItemValue::$tag { value } = value {
408 value
409 } else {
410 unreachable!()
411 }
412 })
413 };
414
415}
416
417macro_rules! generate_inner_storage {
418 ($($config:tt)*) => {
419 impl InnerStorage {
420 pub fn add(&mut self, item: CachedDataItem) -> bool {
421 use crate::data_storage::Storage;
422 $crate::generate_inner_storage_internal!(CachedDataItem: self, item, value, none, add(value): $($config)*);
423 self.dynamic.add(item)
424 }
425
426 pub fn insert(&mut self, item: CachedDataItem) -> Option<CachedDataItemValue> {
427 use crate::data_storage::Storage;
428 $crate::generate_inner_storage_internal!(CachedDataItem: self, item, value, option_value, insert(value): $($config)*);
429 self.dynamic.insert(item)
430 }
431
432 pub fn remove(&mut self, key: &CachedDataItemKey) -> Option<CachedDataItemValue> {
433 use crate::data_storage::Storage;
434 $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_value, remove(): $($config)*);
435 self.dynamic.remove(key)
436 }
437
438 pub fn count(&self, ty: CachedDataItemType) -> usize {
439 use crate::data_storage::Storage;
440 $crate::generate_inner_storage_internal!(CachedDataItemType: self, ty, none, len(): $($config)*);
441 self.dynamic.count(ty)
442 }
443
444 pub fn get(&self, key: &CachedDataItemKey) -> Option<CachedDataItemValueRef<'_>> {
445 use crate::data_storage::Storage;
446 $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_ref, get(): $($config)*);
447 self.dynamic.get(key)
448 }
449
450 pub fn contains_key(&self, key: &CachedDataItemKey) -> bool {
451 use crate::data_storage::Storage;
452 $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, none, contains_key(): $($config)*);
453 self.dynamic.contains_key(key)
454 }
455
456 pub fn get_mut(&mut self, key: &CachedDataItemKey) -> Option<CachedDataItemValueRefMut<'_>> {
457 use crate::data_storage::Storage;
458 $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_ref_mut, get_mut(): $($config)*);
459 self.dynamic.get_mut(key)
460 }
461
462 pub fn shrink_to_fit(&mut self, ty: CachedDataItemType) {
463 use crate::data_storage::Storage;
464 $crate::generate_inner_storage_internal!(CachedDataItemType: self, ty, none, shrink_to_fit(): $($config)*);
465 self.dynamic.shrink_to_fit(ty)
466 }
467
468 pub fn update(
469 &mut self,
470 key: CachedDataItemKey,
471 update: impl FnOnce(Option<CachedDataItemValue>) -> Option<CachedDataItemValue>,
472 ) {
473 use crate::data_storage::Storage;
474 $crate::generate_inner_storage_internal!(update: self, key, update: $($config)*);
475 self.dynamic.update(key, update)
476 }
477
478 pub fn extract_if<'l, F>(
479 &'l mut self,
480 ty: CachedDataItemType,
481 mut f: F,
482 ) -> impl Iterator<Item = CachedDataItem> + use<'l, F>
483 where
484 F: for<'a> FnMut(CachedDataItemKey, CachedDataItemValueRef<'a>) -> bool + 'l,
485 {
486 use crate::data_storage::Storage;
487 $crate::generate_inner_storage_internal!(extract_if: self, ty, f: $($config)*);
488 InnerStorageIter::Dynamic(self.dynamic.extract_if(ty, f))
489 }
490
491 pub fn get_mut_or_insert_with(
492 &mut self,
493 key: CachedDataItemKey,
494 f: impl FnOnce() -> CachedDataItemValue,
495 ) -> CachedDataItemValueRefMut<'_>
496 {
497 use crate::data_storage::Storage;
498 $crate::generate_inner_storage_internal!(get_mut_or_insert_with: self, key, f: $($config)*);
499 self.dynamic.get_mut_or_insert_with(key, f)
500 }
501
502 pub fn iter(
503 &self,
504 ty: CachedDataItemType,
505 ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)>
506 {
507 use crate::data_storage::Storage;
508 $crate::generate_inner_storage_internal!(iter: self, ty: $($config)*);
509 InnerStorageIter::Dynamic(self.dynamic.iter(ty))
510 }
511
512 }
513 };
514}
515
516generate_inner_storage!(
517 AggregationNumber => aggregation_number,
518 OutputDependent task => output_dependent,
519 Output => output,
520 Upper task => upper,
521);
522
523enum InnerStorageIter<A, B, C, D, E> {
524 AggregationNumber(A),
525 OutputDependent(B),
526 Output(C),
527 Upper(D),
528 Dynamic(E),
529}
530
531impl<T, A, B, C, D, E> Iterator for InnerStorageIter<A, B, C, D, E>
532where
533 A: Iterator<Item = T>,
534 B: Iterator<Item = T>,
535 C: Iterator<Item = T>,
536 D: Iterator<Item = T>,
537 E: Iterator<Item = T>,
538{
539 type Item = T;
540
541 fn next(&mut self) -> Option<Self::Item> {
542 match self {
543 InnerStorageIter::AggregationNumber(iter) => iter.next(),
544 InnerStorageIter::OutputDependent(iter) => iter.next(),
545 InnerStorageIter::Output(iter) => iter.next(),
546 InnerStorageIter::Upper(iter) => iter.next(),
547 InnerStorageIter::Dynamic(iter) => iter.next(),
548 }
549 }
550}
551
552impl InnerStorage {
553 pub fn iter_all(
554 &self,
555 ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)> {
556 use crate::data_storage::Storage;
557 self.dynamic
558 .iter_all()
559 .chain(self.aggregation_number.iter().map(|(_, value)| {
560 (
561 CachedDataItemKey::AggregationNumber {},
562 CachedDataItemValueRef::AggregationNumber { value },
563 )
564 }))
565 .chain(self.output.iter().map(|(_, value)| {
566 (
567 CachedDataItemKey::Output {},
568 CachedDataItemValueRef::Output { value },
569 )
570 }))
571 .chain(self.upper.iter().map(|(k, value)| {
572 (
573 CachedDataItemKey::Upper { task: *k },
574 CachedDataItemValueRef::Upper { value },
575 )
576 }))
577 .chain(self.output_dependent.iter().map(|(k, value)| {
578 (
579 CachedDataItemKey::OutputDependent { task: *k },
580 CachedDataItemValueRef::OutputDependent { value },
581 )
582 }))
583 }
584
585 pub fn len(&self) -> usize {
586 use crate::data_storage::Storage;
587 self.dynamic.len()
588 + self.aggregation_number.len()
589 + self.output.len()
590 + self.upper.len()
591 + self.output_dependent.len()
592 }
593}
594
595enum ModifiedState {
596 Modified,
599 Snapshot(Option<Box<InnerStorageSnapshot>>),
606}
607
608pub struct Storage {
609 snapshot_mode: AtomicBool,
610 modified: FxDashMap<TaskId, ModifiedState>,
611 map: FxDashMap<TaskId, Box<InnerStorage>>,
612}
613
614impl Storage {
615 pub fn new(small_preallocation: bool) -> Self {
616 let map_capacity: usize = if small_preallocation {
617 1024
618 } else {
619 1024 * 1024
620 };
621 let modified_capacity: usize = if small_preallocation { 0 } else { 1024 };
622 let shard_factor: usize = if small_preallocation { 4 } else { 64 };
623
624 let shard_amount =
625 (available_parallelism().map_or(4, |v| v.get()) * shard_factor).next_power_of_two();
626
627 Self {
628 snapshot_mode: AtomicBool::new(false),
629 modified: FxDashMap::with_capacity_and_hasher_and_shard_amount(
630 modified_capacity,
631 Default::default(),
632 shard_amount,
633 ),
634 map: FxDashMap::with_capacity_and_hasher_and_shard_amount(
635 map_capacity,
636 Default::default(),
637 shard_amount,
638 ),
639 }
640 }
641
642 pub fn take_snapshot<
647 'l,
648 T,
649 R,
650 PP: for<'a> Fn(TaskId, &'a InnerStorage) -> T + Sync,
651 P: Fn(TaskId, T) -> R + Sync,
652 PS: Fn(TaskId, Box<InnerStorageSnapshot>) -> R + Sync,
653 >(
654 &'l self,
655 preprocess: &'l PP,
656 process: &'l P,
657 process_snapshot: &'l PS,
658 ) -> Vec<SnapshotShard<'l, PP, P, PS>> {
659 if !self.snapshot_mode() {
660 self.start_snapshot();
661 }
662
663 let guard = Arc::new(SnapshotGuard { storage: self });
664
665 self.modified
668 .shards()
669 .par_iter()
670 .with_max_len(1)
671 .map(|shard| {
672 let mut direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)> = Vec::new();
673 let mut modified: SmallVec<[TaskId; 4]> = SmallVec::new();
674 {
675 let guard = shard.write();
677 for bucket in unsafe { guard.iter() } {
679 let (key, shared_value) = unsafe { bucket.as_mut() };
682 let modified_state = shared_value.get_mut();
683 match modified_state {
684 ModifiedState::Modified => {
685 modified.push(*key);
686 }
687 ModifiedState::Snapshot(snapshot) => {
688 if let Some(snapshot) = snapshot.take() {
689 direct_snapshots.push((*key, snapshot));
690 }
691 }
692 }
693 }
694 drop(guard);
696 }
697
698 SnapshotShard {
699 direct_snapshots,
700 modified,
701 storage: self,
702 guard: Some(guard.clone()),
703 process,
704 preprocess,
705 process_snapshot,
706 }
707 })
708 .collect::<Vec<_>>()
709 }
710
711 pub fn start_snapshot(&self) {
713 self.snapshot_mode
714 .store(true, std::sync::atomic::Ordering::Release);
715 }
716
717 fn end_snapshot(&self) {
722 let mut removed_modified = Vec::new();
725 self.modified.retain(|key, inner| {
726 if matches!(inner, ModifiedState::Modified) {
727 removed_modified.push(*key);
728 false
729 } else {
730 true
731 }
732 });
733
734 for key in removed_modified {
736 if let Some(mut inner) = self.map.get_mut(&key) {
737 let state = inner.state_mut();
738 state.set_data_modified(false);
739 state.set_meta_modified(false);
740 }
741 }
742
743 self.snapshot_mode
746 .store(false, std::sync::atomic::Ordering::Release);
747
748 let mut removed_snapshots = Vec::new();
750 for mut item in self.modified.iter_mut() {
751 match item.value() {
752 ModifiedState::Snapshot(_) => {
753 removed_snapshots.push(*item.key());
754 *item.value_mut() = ModifiedState::Modified;
755 }
756 ModifiedState::Modified => {
757 }
760 }
761 }
762
763 for key in removed_snapshots {
765 if let Some(mut inner) = self.map.get_mut(&key) {
766 let state = inner.state_mut();
767 if state.meta_snapshot() {
768 state.set_meta_snapshot(false);
769 state.set_meta_modified(true);
770 }
771 if state.data_snapshot() {
772 state.set_data_snapshot(false);
773 state.set_data_modified(true);
774 }
775 }
776 }
777
778 self.modified.shrink_to_fit();
780 }
781
782 fn snapshot_mode(&self) -> bool {
783 self.snapshot_mode
784 .load(std::sync::atomic::Ordering::Acquire)
785 }
786
787 pub fn access_mut(&self, key: TaskId) -> StorageWriteGuard<'_> {
788 let inner = match self.map.entry(key) {
789 dashmap::mapref::entry::Entry::Occupied(e) => e.into_ref(),
790 dashmap::mapref::entry::Entry::Vacant(e) => e.insert(Box::new(InnerStorage::new())),
791 };
792 StorageWriteGuard {
793 storage: self,
794 inner: inner.into(),
795 }
796 }
797
798 pub fn access_pair_mut(
799 &self,
800 key1: TaskId,
801 key2: TaskId,
802 ) -> (StorageWriteGuard<'_>, StorageWriteGuard<'_>) {
803 let (a, b) = get_multiple_mut(&self.map, key1, key2, || Box::new(InnerStorage::new()));
804 (
805 StorageWriteGuard {
806 storage: self,
807 inner: a,
808 },
809 StorageWriteGuard {
810 storage: self,
811 inner: b,
812 },
813 )
814 }
815}
816
817pub struct StorageWriteGuard<'a> {
818 storage: &'a Storage,
819 inner: RefMut<'a, TaskId, Box<InnerStorage>>,
820}
821
822impl StorageWriteGuard<'_> {
823 pub fn track_modification(&mut self, category: SpecificTaskDataCategory) {
825 let state = self.inner.state();
826 let snapshot = match category {
827 SpecificTaskDataCategory::Meta => state.meta_snapshot(),
828 SpecificTaskDataCategory::Data => state.data_snapshot(),
829 };
830 if !snapshot {
831 let modified = match category {
832 SpecificTaskDataCategory::Meta => state.meta_modified(),
833 SpecificTaskDataCategory::Data => state.data_modified(),
834 };
835 match (self.storage.snapshot_mode(), modified) {
836 (false, false) => {
837 if !state.any_snapshot() && !state.any_modified() {
839 self.storage
840 .modified
841 .insert(*self.inner.key(), ModifiedState::Modified);
842 }
843 let state = self.inner.state_mut();
844 match category {
845 SpecificTaskDataCategory::Meta => state.set_meta_modified(true),
846 SpecificTaskDataCategory::Data => state.set_data_modified(true),
847 }
848 }
849 (false, true) => {
850 }
853 (true, false) => {
854 if !state.any_snapshot() {
856 self.storage
857 .modified
858 .insert(*self.inner.key(), ModifiedState::Snapshot(None));
859 }
860 let state = self.inner.state_mut();
861 match category {
862 SpecificTaskDataCategory::Meta => state.set_meta_snapshot(true),
863 SpecificTaskDataCategory::Data => state.set_data_snapshot(true),
864 }
865 }
866 (true, true) => {
867 if !state.any_snapshot() {
870 self.storage.modified.insert(
871 *self.inner.key(),
872 ModifiedState::Snapshot(Some(Box::new((&**self.inner).into()))),
873 );
874 }
875 let state = self.inner.state_mut();
876 match category {
877 SpecificTaskDataCategory::Meta => state.set_meta_snapshot(true),
878 SpecificTaskDataCategory::Data => state.set_data_snapshot(true),
879 }
880 }
881 }
882 }
883 }
884}
885
886impl Deref for StorageWriteGuard<'_> {
887 type Target = InnerStorage;
888
889 fn deref(&self) -> &Self::Target {
890 &self.inner
891 }
892}
893
894impl DerefMut for StorageWriteGuard<'_> {
895 fn deref_mut(&mut self) -> &mut Self::Target {
896 &mut self.inner
897 }
898}
899
900macro_rules! count {
901 ($task:ident, $key:ident) => {{ $task.count($crate::data::CachedDataItemType::$key) }};
902}
903
904macro_rules! get {
905 ($task:ident, $key:ident $input:tt) => {{
906 #[allow(unused_imports)]
907 use $crate::backend::operation::TaskGuard;
908 if let Some($crate::data::CachedDataItemValueRef::$key {
909 value,
910 }) = $task.get(&$crate::data::CachedDataItemKey::$key $input) {
911 Some(value)
912 } else {
913 None
914 }
915 }};
916 ($task:ident, $key:ident) => {
917 $crate::backend::storage::get!($task, $key {})
918 };
919}
920
921macro_rules! get_mut {
922 ($task:ident, $key:ident $input:tt) => {{
923 #[allow(unused_imports)]
924 use $crate::backend::operation::TaskGuard;
925 if let Some($crate::data::CachedDataItemValueRefMut::$key {
926 value,
927 }) = $task.get_mut(&$crate::data::CachedDataItemKey::$key $input) {
928 let () = $crate::data::allow_mut_access::$key;
929 Some(value)
930 } else {
931 None
932 }
933 }};
934 ($task:ident, $key:ident) => {
935 $crate::backend::storage::get_mut!($task, $key {})
936 };
937}
938
939macro_rules! get_mut_or_insert_with {
940 ($task:ident, $key:ident $input:tt, $f:expr) => {{
941 #[allow(unused_imports)]
942 use $crate::backend::operation::TaskGuard;
943 let () = $crate::data::allow_mut_access::$key;
944 let functor = $f;
945 let $crate::data::CachedDataItemValueRefMut::$key {
946 value,
947 } = $task.get_mut_or_insert_with($crate::data::CachedDataItemKey::$key $input, move || $crate::data::CachedDataItemValue::$key { value: functor() }) else {
948 unreachable!()
949 };
950 value
951 }};
952 ($task:ident, $key:ident, $f:expr) => {
953 $crate::backend::storage::get_mut_or_insert_with!($task, $key {}, $f)
954 };
955}
956
957macro_rules! iter_many {
963 ($task:ident, $key:ident $key_pattern:tt $(if $cond:expr)? => $iter_item:expr) => {{
964 #[allow(unused_imports)]
965 use $crate::backend::operation::TaskGuard;
966 $task
967 .iter($crate::data::CachedDataItemType::$key)
968 .filter_map(|(key, _)| match key {
969 $crate::data::CachedDataItemKey::$key $key_pattern $(if $cond)? => Some(
970 $iter_item
971 ),
972 _ => None,
973 })
974 }};
975 ($task:ident, $key:ident $input:tt $value_pattern:tt $(if $cond:expr)? => $iter_item:expr) => {{
976 #[allow(unused_imports)]
977 use $crate::backend::operation::TaskGuard;
978 $task
979 .iter($crate::data::CachedDataItemType::$key)
980 .filter_map(|(key, value)| match (key, value) {
981 (
982 $crate::data::CachedDataItemKey::$key $input,
983 $crate::data::CachedDataItemValueRef::$key { value: $value_pattern }
984 ) $(if $cond)? => Some($iter_item),
985 _ => None,
986 })
987 }};
988}
989
990macro_rules! get_many {
995 ($($args:tt)*) => {
996 $crate::backend::storage::iter_many!($($args)*).collect()
997 };
998}
999
1000macro_rules! update {
1001 ($task:ident, $key:ident $input:tt, $update:expr) => {{
1002 #[allow(unused_imports)]
1003 use $crate::backend::operation::TaskGuard;
1004 #[allow(unused_mut)]
1005 let mut update = $update;
1006 $task.update($crate::data::CachedDataItemKey::$key $input, |old| {
1007 update(old.and_then(|old| {
1008 if let $crate::data::CachedDataItemValue::$key { value } = old {
1009 Some(value)
1010 } else {
1011 None
1012 }
1013 }))
1014 .map(|new| $crate::data::CachedDataItemValue::$key { value: new })
1015 })
1016 }};
1017 ($task:ident, $key:ident, $update:expr) => {
1018 $crate::backend::storage::update!($task, $key {}, $update)
1019 };
1020}
1021
1022macro_rules! update_count {
1023 ($task:ident, $key:ident $input:tt, -$update:expr) => {{
1024 let update = $update;
1025 let mut state_change = false;
1026 $crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
1027 #[allow(unused_comparisons, reason = "type of update might be unsigned, where update < 0 is always false")]
1028 if let Some(old) = old {
1029 let new = old - update;
1030 state_change = old <= 0 && new > 0 || old > 0 && new <= 0;
1031 (new != 0).then_some(new)
1032 } else {
1033 state_change = update < 0;
1034 (update != 0).then_some(-update)
1035 }
1036 });
1037 state_change
1038 }};
1039 ($task:ident, $key:ident $input:tt, $update:expr) => {
1040 match $update {
1041 update => {
1042 let mut state_change = false;
1043 $crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
1044 if let Some(old) = old {
1045 let new = old + update;
1046 state_change = old <= 0 && new > 0 || old > 0 && new <= 0;
1047 (new != 0).then_some(new)
1048 } else {
1049 state_change = update > 0;
1050 (update != 0).then_some(update)
1051 }
1052 });
1053 state_change
1054 }
1055 }
1056 };
1057 ($task:ident, $key:ident, -$update:expr) => {
1058 $crate::backend::storage::update_count!($task, $key {}, -$update)
1059 }; ($task:ident, $key:ident, $update:expr) => {
1060 $crate::backend::storage::update_count!($task, $key {}, $update)
1061 };
1062}
1063
1064macro_rules! remove {
1065 ($task:ident, $key:ident $input:tt) => {{
1066 #[allow(unused_imports)]
1067 use $crate::backend::operation::TaskGuard;
1068 if let Some($crate::data::CachedDataItemValue::$key { value }) = $task.remove(
1069 &$crate::data::CachedDataItemKey::$key $input
1070 ) {
1071 Some(value)
1072 } else {
1073 None
1074 }
1075 }};
1076 ($task:ident, $key:ident) => {
1077 $crate::backend::storage::remove!($task, $key {})
1078 };
1079}
1080
1081pub(crate) use count;
1082pub(crate) use get;
1083pub(crate) use get_many;
1084pub(crate) use get_mut;
1085pub(crate) use get_mut_or_insert_with;
1086pub(crate) use iter_many;
1087pub(crate) use remove;
1088pub(crate) use update;
1089pub(crate) use update_count;
1090
1091pub struct SnapshotGuard<'l> {
1092 storage: &'l Storage,
1093}
1094
1095impl Drop for SnapshotGuard<'_> {
1096 fn drop(&mut self) {
1097 self.storage.end_snapshot();
1098 }
1099}
1100
1101pub struct SnapshotShard<'l, PP, P, PS> {
1102 direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)>,
1103 modified: SmallVec<[TaskId; 4]>,
1104 storage: &'l Storage,
1105 guard: Option<Arc<SnapshotGuard<'l>>>,
1106 process: &'l P,
1107 preprocess: &'l PP,
1108 process_snapshot: &'l PS,
1109}
1110
1111impl<'l, T, R, PP, P, PS> Iterator for SnapshotShard<'l, PP, P, PS>
1112where
1113 PP: for<'a> Fn(TaskId, &'a InnerStorage) -> T + Sync,
1114 P: Fn(TaskId, T) -> R + Sync,
1115 PS: Fn(TaskId, Box<InnerStorageSnapshot>) -> R + Sync,
1116{
1117 type Item = R;
1118
1119 fn next(&mut self) -> Option<Self::Item> {
1120 if let Some((task_id, snapshot)) = self.direct_snapshots.pop() {
1121 return Some((self.process_snapshot)(task_id, snapshot));
1122 }
1123 while let Some(task_id) = self.modified.pop() {
1124 let inner = self.storage.map.get(&task_id).unwrap();
1125 let state = inner.state();
1126 if !state.any_snapshot() {
1127 let preprocessed = (self.preprocess)(task_id, &inner);
1128 drop(inner);
1129 return Some((self.process)(task_id, preprocessed));
1130 } else {
1131 drop(inner);
1132 let maybe_snapshot = {
1133 let mut modified_state = self.storage.modified.get_mut(&task_id).unwrap();
1134 let ModifiedState::Snapshot(snapshot) = &mut *modified_state else {
1135 unreachable!("The snapshot bit was set, so it must be in Snapshot state");
1136 };
1137 snapshot.take()
1138 };
1139 if let Some(snapshot) = maybe_snapshot {
1140 return Some((self.process_snapshot)(task_id, snapshot));
1141 }
1142 }
1143 }
1144 self.guard = None;
1145 None
1146 }
1147}