1use std::{
2 hash::Hash,
3 ops::{Deref, DerefMut},
4 sync::{Arc, atomic::AtomicBool},
5};
6
7use bitfield::bitfield;
8use smallvec::SmallVec;
9use turbo_tasks::{FxDashMap, TaskId, parallel};
10
11use crate::{
12 backend::dynamic_storage::DynamicStorage,
13 data::{
14 AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
15 CachedDataItemValue, CachedDataItemValueRef, CachedDataItemValueRefMut, OutputValue,
16 },
17 data_storage::{AutoMapStorage, OptionStorage},
18 utils::{
19 dash_map_drop_contents::drop_contents,
20 dash_map_multi::{RefMut, get_multiple_mut},
21 },
22};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
25pub enum TaskDataCategory {
26 Meta,
27 Data,
28 All,
29}
30
31impl TaskDataCategory {
32 pub fn into_specific(self) -> SpecificTaskDataCategory {
33 match self {
34 TaskDataCategory::Meta => SpecificTaskDataCategory::Meta,
35 TaskDataCategory::Data => SpecificTaskDataCategory::Data,
36 TaskDataCategory::All => unreachable!(),
37 }
38 }
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
42pub enum SpecificTaskDataCategory {
43 Meta,
44 Data,
45}
46
47impl IntoIterator for TaskDataCategory {
48 type Item = TaskDataCategory;
49
50 type IntoIter = TaskDataCategoryIterator;
51
52 fn into_iter(self) -> Self::IntoIter {
53 match self {
54 TaskDataCategory::Meta => TaskDataCategoryIterator::Meta,
55 TaskDataCategory::Data => TaskDataCategoryIterator::Data,
56 TaskDataCategory::All => TaskDataCategoryIterator::All,
57 }
58 }
59}
60
61pub enum TaskDataCategoryIterator {
62 All,
63 Meta,
64 Data,
65 None,
66}
67
68impl Iterator for TaskDataCategoryIterator {
69 type Item = TaskDataCategory;
70
71 fn next(&mut self) -> Option<Self::Item> {
72 match self {
73 TaskDataCategoryIterator::All => {
74 *self = TaskDataCategoryIterator::Data;
75 Some(TaskDataCategory::Meta)
76 }
77 TaskDataCategoryIterator::Meta => {
78 *self = TaskDataCategoryIterator::None;
79 Some(TaskDataCategory::Meta)
80 }
81 TaskDataCategoryIterator::Data => {
82 *self = TaskDataCategoryIterator::None;
83 Some(TaskDataCategory::Data)
84 }
85 TaskDataCategoryIterator::None => None,
86 }
87 }
88}
89
90bitfield! {
91 #[derive(Clone, Default)]
93 pub struct InnerStorageState(u32);
94 impl Debug;
95 pub meta_restored, set_meta_restored: 0;
96 pub data_restored, set_data_restored: 1;
97 pub meta_modified, set_meta_modified: 2;
99 pub data_modified, set_data_modified: 3;
100 pub meta_snapshot, set_meta_snapshot: 4;
102 pub data_snapshot, set_data_snapshot: 5;
103 pub prefetched, set_prefetched: 6;
105}
106
107impl InnerStorageState {
108 pub fn set_restored(&mut self, category: TaskDataCategory) {
109 match category {
110 TaskDataCategory::Meta => {
111 self.set_meta_restored(true);
112 }
113 TaskDataCategory::Data => {
114 self.set_data_restored(true);
115 }
116 TaskDataCategory::All => {
117 self.set_meta_restored(true);
118 self.set_data_restored(true);
119 }
120 }
121 }
122
123 pub fn is_restored(&self, category: TaskDataCategory) -> bool {
124 match category {
125 TaskDataCategory::Meta => self.meta_restored(),
126 TaskDataCategory::Data => self.data_restored(),
127 TaskDataCategory::All => self.meta_restored() && self.data_restored(),
128 }
129 }
130
131 pub fn any_snapshot(&self) -> bool {
132 self.meta_snapshot() || self.data_snapshot()
133 }
134
135 pub fn any_modified(&self) -> bool {
136 self.meta_modified() || self.data_modified()
137 }
138}
139
140pub struct InnerStorageSnapshot {
141 aggregation_number: OptionStorage<AggregationNumber>,
142 output_dependent: AutoMapStorage<TaskId, ()>,
143 output: OptionStorage<OutputValue>,
144 upper: AutoMapStorage<TaskId, u32>,
145 dynamic: DynamicStorage,
146 pub meta_modified: bool,
147 pub data_modified: bool,
148}
149
150impl From<&InnerStorage> for InnerStorageSnapshot {
151 fn from(inner: &InnerStorage) -> Self {
152 Self {
153 aggregation_number: inner.aggregation_number.clone(),
154 output_dependent: inner.output_dependent.clone(),
155 output: inner.output.clone(),
156 upper: inner.upper.clone(),
157 dynamic: inner.dynamic.snapshot_for_persisting(),
158 meta_modified: inner.state.meta_modified(),
159 data_modified: inner.state.data_modified(),
160 }
161 }
162}
163
164impl InnerStorageSnapshot {
165 pub fn iter_all(
166 &self,
167 ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)> {
168 use crate::data_storage::Storage;
169 self.dynamic
170 .iter_all()
171 .chain(self.aggregation_number.iter().map(|(_, value)| {
172 (
173 CachedDataItemKey::AggregationNumber {},
174 CachedDataItemValueRef::AggregationNumber { value },
175 )
176 }))
177 .chain(self.output.iter().map(|(_, value)| {
178 (
179 CachedDataItemKey::Output {},
180 CachedDataItemValueRef::Output { value },
181 )
182 }))
183 .chain(self.upper.iter().map(|(k, value)| {
184 (
185 CachedDataItemKey::Upper { task: *k },
186 CachedDataItemValueRef::Upper { value },
187 )
188 }))
189 .chain(self.output_dependent.iter().map(|(k, value)| {
190 (
191 CachedDataItemKey::OutputDependent { task: *k },
192 CachedDataItemValueRef::OutputDependent { value },
193 )
194 }))
195 }
196
197 pub fn len(&self) -> usize {
198 use crate::data_storage::Storage;
199 self.dynamic.len()
200 + self.aggregation_number.len()
201 + self.output.len()
202 + self.upper.len()
203 + self.output_dependent.len()
204 }
205}
206
207#[derive(Debug, Clone)]
208pub struct InnerStorage {
209 aggregation_number: OptionStorage<AggregationNumber>,
210 output_dependent: AutoMapStorage<TaskId, ()>,
211 output: OptionStorage<OutputValue>,
212 upper: AutoMapStorage<TaskId, u32>,
213 dynamic: DynamicStorage,
214 state: InnerStorageState,
215}
216
217impl InnerStorage {
218 fn new() -> Self {
219 Self {
220 aggregation_number: Default::default(),
221 output_dependent: Default::default(),
222 output: Default::default(),
223 upper: Default::default(),
224 dynamic: DynamicStorage::new(),
225 state: InnerStorageState::default(),
226 }
227 }
228
229 pub fn state(&self) -> &InnerStorageState {
230 &self.state
231 }
232
233 pub fn state_mut(&mut self) -> &mut InnerStorageState {
234 &mut self.state
235 }
236}
237
238#[macro_export]
239macro_rules! generate_inner_storage_internal {
240 (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $key_field:ident => $field:ident,) => {
242 if let CachedDataItem::$tag { $key_field, $value } = $item {
243 let result = $self.$field.$fn($key_field, $($args)*);
244 return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $key_field => $field);
245 }
246 };
247 (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident => $field:ident,) => {
248 if let CachedDataItem::$tag { $value } = $item {
249 let result = $self.$field.$fn((), $($args)*);
250 return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag => $field);
251 }
252 };
253 (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
254 $crate::generate_inner_storage_internal!(CachedDataItem: $self, $item, $value, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
255 $crate::generate_inner_storage_internal!(CachedDataItem: $self, $item, $value, $return_ty, $fn($($args)*): $($config)+)
256 };
257 (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $key_field:ident => $field:ident,) => {
259 if let CachedDataItemKey::$tag { $key_field } = $item {
260 let result = $self.$field.$fn($key_field, $($args)*);
261 return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $key_field => $field);
262 }
263 };
264 (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident => $field:ident,) => {
265 if let CachedDataItemKey::$tag { } = $item {
266 let result = $self.$field.$fn(&(), $($args)*);
267 return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag => $field);
268 }
269 };
270 (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
271 $crate::generate_inner_storage_internal!(CachedDataItemKey: $self, $item, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
272 $crate::generate_inner_storage_internal!(CachedDataItemKey: $self, $item, $return_ty, $fn($($args)*): $($config)+)
273 };
274 (CachedDataItemType: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident,) => {
276 if let CachedDataItemType::$tag = $item {
277 let result = $self.$field.$fn($($args)*);
278 return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $($key_field)? => $field);
279 }
280 };
281 (CachedDataItemType: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
282 $crate::generate_inner_storage_internal!(CachedDataItemType: $self, $item, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
283 $crate::generate_inner_storage_internal!(CachedDataItemType: $self, $item, $return_ty, $fn($($args)*): $($config)+)
284 };
285
286 (update: $self:ident, $key:ident, $update:ident: $tag:ident $key_field:ident => $field:ident,) => {
288 if let CachedDataItemKey::$tag { $key_field } = $key {
289 $self.$field.update($key_field, |old| {
290 let old = old.map(|old| CachedDataItemValue::$tag { value: old });
291 let new = $update(old);
292 new.map(|new| if let CachedDataItemValue::$tag { value } = new {
293 value
294 } else {
295 unreachable!()
296 })
297 });
298 return;
299 }
300 };
301 (update: $self:ident, $key:ident, $update:ident: $tag:ident => $field:ident,) => {
302 if let CachedDataItemKey::$tag { } = $key {
303 $self.$field.update((), |old| {
304 let old = old.map(|old| CachedDataItemValue::$tag { value: old });
305 let new = $update(old);
306 new.map(|new| if let CachedDataItemValue::$tag { value } = new {
307 value
308 } else {
309 unreachable!()
310 })
311 });
312 return;
313 }
314 };
315 (update: $self:ident, $key:ident, $update:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
316 $crate::generate_inner_storage_internal!(update: $self, $key, $update: $tag $($key_field)? => $field,);
317 $crate::generate_inner_storage_internal!(update: $self, $key, $update: $($config)+)
318 };
319
320 (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident $key_field:ident => $field:ident,) => {
322 if let CachedDataItemKey::$tag { $key_field } = $key {
323 let value = $self.$field.get_mut_or_insert_with($key_field, || {
324 let value = $insert_with();
325 if let CachedDataItemValue::$tag { value } = value {
326 value
327 } else {
328 unreachable!()
329 }
330 });
331 return CachedDataItemValueRefMut::$tag { value };
332 }
333 };
334 (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident => $field:ident,) => {
335 if let CachedDataItemKey::$tag { } = $key {
336 let value = $self.$field.get_mut_or_insert_with((), || {
337 let value = $insert_with();
338 if let CachedDataItemValue::$tag { value } = value {
339 value
340 } else {
341 unreachable!()
342 }
343 });
344 return CachedDataItemValueRefMut::$tag { value };
345 }
346 };
347 (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
348 $crate::generate_inner_storage_internal!(get_mut_or_insert_with: $self, $key, $insert_with: $tag $($key_field)? => $field,);
349 $crate::generate_inner_storage_internal!(get_mut_or_insert_with: $self, $key, $insert_with: $($config)+)
350 };
351
352 (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident $key_field:ident => $field:ident,) => {
354 if let CachedDataItemType::$tag = $ty {
355 let iter = $self.$field.extract_if(move |key, value| {
356 $f(CachedDataItemKey::$tag { $key_field: *key }, CachedDataItemValueRef::$tag { value })
357 }).map(|($key_field, value)| CachedDataItem::$tag { $key_field, value });
358 return InnerStorageIter::$tag(iter);
359 }
360 };
361 (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident => $field:ident,) => {
362 if let CachedDataItemType::$tag = $ty {
363 let iter = $self.$field.extract_if(move |_, value| {
364 $f(CachedDataItemKey::$tag { }, CachedDataItemValueRef::$tag { value })
365 }).map(|(_, value)| CachedDataItem::$tag { value });
366 return InnerStorageIter::$tag(iter);
367 }
368 };
369 (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
370 $crate::generate_inner_storage_internal!(extract_if: $self, $ty, $f: $tag $($key_field)? => $field,);
371 $crate::generate_inner_storage_internal!(extract_if: $self, $ty, $f: $($config)+)
372 };
373
374 (iter: $self:ident, $ty:ident: $tag:ident $key_field:ident => $field:ident,) => {
376 if let CachedDataItemType::$tag = $ty {
377 let iter = $self.$field.iter().map(|($key_field, value)| (CachedDataItemKey::$tag { $key_field: *$key_field }, CachedDataItemValueRef::$tag { value }));
378 return InnerStorageIter::$tag(iter);
379 }
380 };
381 (iter: $self:ident, $ty:ident: $tag:ident => $field:ident,) => {
382 if let CachedDataItemType::$tag = $ty {
383 let iter = $self.$field.iter().map(|(_, value)| (CachedDataItemKey::$tag { }, CachedDataItemValueRef::$tag { value }));
384 return InnerStorageIter::$tag(iter);
385 }
386 };
387 (iter: $self:ident, $ty:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
388 $crate::generate_inner_storage_internal!(iter: $self, $ty: $tag $($key_field)? => $field,);
389 $crate::generate_inner_storage_internal!(iter: $self, $ty: $($config)+)
390 };
391
392
393 (return_value: $result:ident, none: $($more:tt)*) => {
395 $result
396 };
397 (return_value: $result:ident, option_value: $tag:ident $($more:tt)*) => {
398 $result.map(|value| CachedDataItemValue::$tag { value })
399 };
400 (return_value: $result:ident, option_ref: $tag:ident $($more:tt)*) => {
401 $result.map(|value| CachedDataItemValueRef::$tag { value })
402 };
403 (return_value: $result:ident, option_ref_mut: $tag:ident $($more:tt)*) => {
404 $result.map(|value| CachedDataItemValueRefMut::$tag { value })
405 };
406
407 (input_value: $input:ident, option_value: $tag:ident $($more:tt)*) => {
409 $input.map(|value| {
410 if let CachedDataItemValue::$tag { value } = value {
411 value
412 } else {
413 unreachable!()
414 }
415 })
416 };
417
418}
419
420macro_rules! generate_inner_storage {
421 ($($config:tt)*) => {
422 impl InnerStorage {
423 pub fn add(&mut self, item: CachedDataItem) -> bool {
424 use crate::data_storage::Storage;
425 $crate::generate_inner_storage_internal!(CachedDataItem: self, item, value, none, add(value): $($config)*);
426 self.dynamic.add(item)
427 }
428
429 pub fn insert(&mut self, item: CachedDataItem) -> Option<CachedDataItemValue> {
430 use crate::data_storage::Storage;
431 $crate::generate_inner_storage_internal!(CachedDataItem: self, item, value, option_value, insert(value): $($config)*);
432 self.dynamic.insert(item)
433 }
434
435 pub fn remove(&mut self, key: &CachedDataItemKey) -> Option<CachedDataItemValue> {
436 use crate::data_storage::Storage;
437 $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_value, remove(): $($config)*);
438 self.dynamic.remove(key)
439 }
440
441 pub fn count(&self, ty: CachedDataItemType) -> usize {
442 use crate::data_storage::Storage;
443 $crate::generate_inner_storage_internal!(CachedDataItemType: self, ty, none, len(): $($config)*);
444 self.dynamic.count(ty)
445 }
446
447 pub fn get(&self, key: &CachedDataItemKey) -> Option<CachedDataItemValueRef<'_>> {
448 use crate::data_storage::Storage;
449 $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_ref, get(): $($config)*);
450 self.dynamic.get(key)
451 }
452
453 pub fn contains_key(&self, key: &CachedDataItemKey) -> bool {
454 use crate::data_storage::Storage;
455 $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, none, contains_key(): $($config)*);
456 self.dynamic.contains_key(key)
457 }
458
459 pub fn get_mut(&mut self, key: &CachedDataItemKey) -> Option<CachedDataItemValueRefMut<'_>> {
460 use crate::data_storage::Storage;
461 $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_ref_mut, get_mut(): $($config)*);
462 self.dynamic.get_mut(key)
463 }
464
465 pub fn shrink_to_fit(&mut self, ty: CachedDataItemType) {
466 use crate::data_storage::Storage;
467 $crate::generate_inner_storage_internal!(CachedDataItemType: self, ty, none, shrink_to_fit(): $($config)*);
468 self.dynamic.shrink_to_fit(ty)
469 }
470
471 pub fn update(
472 &mut self,
473 key: CachedDataItemKey,
474 update: impl FnOnce(Option<CachedDataItemValue>) -> Option<CachedDataItemValue>,
475 ) {
476 use crate::data_storage::Storage;
477 $crate::generate_inner_storage_internal!(update: self, key, update: $($config)*);
478 self.dynamic.update(key, update)
479 }
480
481 pub fn extract_if<'l, F>(
482 &'l mut self,
483 ty: CachedDataItemType,
484 mut f: F,
485 ) -> impl Iterator<Item = CachedDataItem> + use<'l, F>
486 where
487 F: for<'a> FnMut(CachedDataItemKey, CachedDataItemValueRef<'a>) -> bool + 'l,
488 {
489 use crate::data_storage::Storage;
490 $crate::generate_inner_storage_internal!(extract_if: self, ty, f: $($config)*);
491 InnerStorageIter::Dynamic(self.dynamic.extract_if(ty, f))
492 }
493
494 pub fn get_mut_or_insert_with(
495 &mut self,
496 key: CachedDataItemKey,
497 f: impl FnOnce() -> CachedDataItemValue,
498 ) -> CachedDataItemValueRefMut<'_>
499 {
500 use crate::data_storage::Storage;
501 $crate::generate_inner_storage_internal!(get_mut_or_insert_with: self, key, f: $($config)*);
502 self.dynamic.get_mut_or_insert_with(key, f)
503 }
504
505 pub fn iter(
506 &self,
507 ty: CachedDataItemType,
508 ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)>
509 {
510 use crate::data_storage::Storage;
511 $crate::generate_inner_storage_internal!(iter: self, ty: $($config)*);
512 InnerStorageIter::Dynamic(self.dynamic.iter(ty))
513 }
514
515 }
516 };
517}
518
519generate_inner_storage!(
520 AggregationNumber => aggregation_number,
521 OutputDependent task => output_dependent,
522 Output => output,
523 Upper task => upper,
524);
525
526enum InnerStorageIter<A, B, C, D, E> {
527 AggregationNumber(A),
528 OutputDependent(B),
529 Output(C),
530 Upper(D),
531 Dynamic(E),
532}
533
534impl<T, A, B, C, D, E> Iterator for InnerStorageIter<A, B, C, D, E>
535where
536 A: Iterator<Item = T>,
537 B: Iterator<Item = T>,
538 C: Iterator<Item = T>,
539 D: Iterator<Item = T>,
540 E: Iterator<Item = T>,
541{
542 type Item = T;
543
544 fn next(&mut self) -> Option<Self::Item> {
545 match self {
546 InnerStorageIter::AggregationNumber(iter) => iter.next(),
547 InnerStorageIter::OutputDependent(iter) => iter.next(),
548 InnerStorageIter::Output(iter) => iter.next(),
549 InnerStorageIter::Upper(iter) => iter.next(),
550 InnerStorageIter::Dynamic(iter) => iter.next(),
551 }
552 }
553}
554
555impl InnerStorage {
556 pub fn iter_all(
557 &self,
558 ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)> {
559 use crate::data_storage::Storage;
560 self.dynamic
561 .iter_all()
562 .chain(self.aggregation_number.iter().map(|(_, value)| {
563 (
564 CachedDataItemKey::AggregationNumber {},
565 CachedDataItemValueRef::AggregationNumber { value },
566 )
567 }))
568 .chain(self.output.iter().map(|(_, value)| {
569 (
570 CachedDataItemKey::Output {},
571 CachedDataItemValueRef::Output { value },
572 )
573 }))
574 .chain(self.upper.iter().map(|(k, value)| {
575 (
576 CachedDataItemKey::Upper { task: *k },
577 CachedDataItemValueRef::Upper { value },
578 )
579 }))
580 .chain(self.output_dependent.iter().map(|(k, value)| {
581 (
582 CachedDataItemKey::OutputDependent { task: *k },
583 CachedDataItemValueRef::OutputDependent { value },
584 )
585 }))
586 }
587
588 pub fn len(&self) -> usize {
589 use crate::data_storage::Storage;
590 self.dynamic.len()
591 + self.aggregation_number.len()
592 + self.output.len()
593 + self.upper.len()
594 + self.output_dependent.len()
595 }
596}
597
598enum ModifiedState {
599 Modified,
602 Snapshot(Option<Box<InnerStorageSnapshot>>),
609}
610
611pub struct Storage {
612 snapshot_mode: AtomicBool,
613 modified: FxDashMap<TaskId, ModifiedState>,
614 map: FxDashMap<TaskId, Box<InnerStorage>>,
615}
616
617impl Storage {
618 pub fn new(shard_amount: usize, small_preallocation: bool) -> Self {
619 let map_capacity: usize = if small_preallocation {
620 1024
621 } else {
622 1024 * 1024
623 };
624 let modified_capacity: usize = if small_preallocation { 0 } else { 1024 };
625
626 Self {
627 snapshot_mode: AtomicBool::new(false),
628 modified: FxDashMap::with_capacity_and_hasher_and_shard_amount(
629 modified_capacity,
630 Default::default(),
631 shard_amount,
632 ),
633 map: FxDashMap::with_capacity_and_hasher_and_shard_amount(
634 map_capacity,
635 Default::default(),
636 shard_amount,
637 ),
638 }
639 }
640
641 pub fn take_snapshot<
646 'l,
647 T,
648 R,
649 PP: for<'a> Fn(TaskId, &'a InnerStorage) -> T + Sync,
650 P: Fn(TaskId, T) -> R + Sync,
651 PS: Fn(TaskId, Box<InnerStorageSnapshot>) -> R + Sync,
652 >(
653 &'l self,
654 preprocess: &'l PP,
655 process: &'l P,
656 process_snapshot: &'l PS,
657 ) -> Vec<SnapshotShard<'l, PP, P, PS>> {
658 if !self.snapshot_mode() {
659 self.start_snapshot();
660 }
661
662 let guard = Arc::new(SnapshotGuard { storage: self });
663
664 parallel::map_collect::<_, _, Vec<_>>(self.modified.shards(), |shard| {
667 let mut direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)> = Vec::new();
668 let mut modified: SmallVec<[TaskId; 4]> = SmallVec::new();
669 {
670 let guard = shard.write();
672 for bucket in unsafe { guard.iter() } {
674 let (key, shared_value) = unsafe { bucket.as_mut() };
677 let modified_state = shared_value.get_mut();
678 match modified_state {
679 ModifiedState::Modified => {
680 modified.push(*key);
681 }
682 ModifiedState::Snapshot(snapshot) => {
683 if let Some(snapshot) = snapshot.take() {
684 direct_snapshots.push((*key, snapshot));
685 }
686 }
687 }
688 }
689 drop(guard);
691 }
692
693 SnapshotShard {
694 direct_snapshots,
695 modified,
696 storage: self,
697 guard: Some(guard.clone()),
698 process,
699 preprocess,
700 process_snapshot,
701 }
702 })
703 }
704
705 pub fn start_snapshot(&self) {
707 self.snapshot_mode
708 .store(true, std::sync::atomic::Ordering::Release);
709 }
710
711 fn end_snapshot(&self) {
716 let mut removed_modified = Vec::new();
719 self.modified.retain(|key, inner| {
720 if matches!(inner, ModifiedState::Modified) {
721 removed_modified.push(*key);
722 false
723 } else {
724 true
725 }
726 });
727
728 for key in removed_modified {
730 if let Some(mut inner) = self.map.get_mut(&key) {
731 let state = inner.state_mut();
732 state.set_data_modified(false);
733 state.set_meta_modified(false);
734 }
735 }
736
737 self.snapshot_mode
740 .store(false, std::sync::atomic::Ordering::Release);
741
742 let mut removed_snapshots = Vec::new();
744 for mut item in self.modified.iter_mut() {
745 match item.value() {
746 ModifiedState::Snapshot(_) => {
747 removed_snapshots.push(*item.key());
748 *item.value_mut() = ModifiedState::Modified;
749 }
750 ModifiedState::Modified => {
751 }
754 }
755 }
756
757 for key in removed_snapshots {
759 if let Some(mut inner) = self.map.get_mut(&key) {
760 let state = inner.state_mut();
761 if state.meta_snapshot() {
762 state.set_meta_snapshot(false);
763 state.set_meta_modified(true);
764 }
765 if state.data_snapshot() {
766 state.set_data_snapshot(false);
767 state.set_data_modified(true);
768 }
769 }
770 }
771
772 self.modified.shrink_to_fit();
774 }
775
776 fn snapshot_mode(&self) -> bool {
777 self.snapshot_mode
778 .load(std::sync::atomic::Ordering::Acquire)
779 }
780
781 pub fn access_mut(&self, key: TaskId) -> StorageWriteGuard<'_> {
782 let inner = match self.map.entry(key) {
783 dashmap::mapref::entry::Entry::Occupied(e) => e.into_ref(),
784 dashmap::mapref::entry::Entry::Vacant(e) => e.insert(Box::new(InnerStorage::new())),
785 };
786 StorageWriteGuard {
787 storage: self,
788 inner: inner.into(),
789 }
790 }
791
792 pub fn access_pair_mut(
793 &self,
794 key1: TaskId,
795 key2: TaskId,
796 ) -> (StorageWriteGuard<'_>, StorageWriteGuard<'_>) {
797 let (a, b) = get_multiple_mut(&self.map, key1, key2, || Box::new(InnerStorage::new()));
798 (
799 StorageWriteGuard {
800 storage: self,
801 inner: a,
802 },
803 StorageWriteGuard {
804 storage: self,
805 inner: b,
806 },
807 )
808 }
809
810 pub fn drop_contents(&self) {
811 drop_contents(&self.map);
812 drop_contents(&self.modified);
813 }
814}
815
816pub struct StorageWriteGuard<'a> {
817 storage: &'a Storage,
818 inner: RefMut<'a, TaskId, Box<InnerStorage>>,
819}
820
821impl StorageWriteGuard<'_> {
822 pub fn track_modification(&mut self, category: SpecificTaskDataCategory) {
824 let state = self.inner.state();
825 let snapshot = match category {
826 SpecificTaskDataCategory::Meta => state.meta_snapshot(),
827 SpecificTaskDataCategory::Data => state.data_snapshot(),
828 };
829 if !snapshot {
830 let modified = match category {
831 SpecificTaskDataCategory::Meta => state.meta_modified(),
832 SpecificTaskDataCategory::Data => state.data_modified(),
833 };
834 match (self.storage.snapshot_mode(), modified) {
835 (false, false) => {
836 if !state.any_snapshot() && !state.any_modified() {
838 self.storage
839 .modified
840 .insert(*self.inner.key(), ModifiedState::Modified);
841 }
842 let state = self.inner.state_mut();
843 match category {
844 SpecificTaskDataCategory::Meta => state.set_meta_modified(true),
845 SpecificTaskDataCategory::Data => state.set_data_modified(true),
846 }
847 }
848 (false, true) => {
849 }
852 (true, false) => {
853 if !state.any_snapshot() {
855 self.storage
856 .modified
857 .insert(*self.inner.key(), ModifiedState::Snapshot(None));
858 }
859 let state = self.inner.state_mut();
860 match category {
861 SpecificTaskDataCategory::Meta => state.set_meta_snapshot(true),
862 SpecificTaskDataCategory::Data => state.set_data_snapshot(true),
863 }
864 }
865 (true, true) => {
866 if !state.any_snapshot() {
869 self.storage.modified.insert(
870 *self.inner.key(),
871 ModifiedState::Snapshot(Some(Box::new((&**self.inner).into()))),
872 );
873 }
874 let state = self.inner.state_mut();
875 match category {
876 SpecificTaskDataCategory::Meta => state.set_meta_snapshot(true),
877 SpecificTaskDataCategory::Data => state.set_data_snapshot(true),
878 }
879 }
880 }
881 }
882 }
883}
884
885impl Deref for StorageWriteGuard<'_> {
886 type Target = InnerStorage;
887
888 fn deref(&self) -> &Self::Target {
889 &self.inner
890 }
891}
892
893impl DerefMut for StorageWriteGuard<'_> {
894 fn deref_mut(&mut self) -> &mut Self::Target {
895 &mut self.inner
896 }
897}
898
899macro_rules! count {
900 ($task:ident, $key:ident) => {{ $task.count($crate::data::CachedDataItemType::$key) }};
901}
902
903macro_rules! get {
904 ($task:ident, $key:ident $input:tt) => {{
905 #[allow(unused_imports)]
906 use $crate::backend::operation::TaskGuard;
907 if let Some($crate::data::CachedDataItemValueRef::$key {
908 value,
909 }) = $task.get(&$crate::data::CachedDataItemKey::$key $input) {
910 Some(value)
911 } else {
912 None
913 }
914 }};
915 ($task:ident, $key:ident) => {
916 $crate::backend::storage::get!($task, $key {})
917 };
918}
919
920macro_rules! get_mut {
921 ($task:ident, $key:ident $input:tt) => {{
922 #[allow(unused_imports)]
923 use $crate::backend::operation::TaskGuard;
924 if let Some($crate::data::CachedDataItemValueRefMut::$key {
925 value,
926 }) = $task.get_mut(&$crate::data::CachedDataItemKey::$key $input) {
927 let () = $crate::data::allow_mut_access::$key;
928 Some(value)
929 } else {
930 None
931 }
932 }};
933 ($task:ident, $key:ident) => {
934 $crate::backend::storage::get_mut!($task, $key {})
935 };
936}
937
938macro_rules! get_mut_or_insert_with {
939 ($task:ident, $key:ident $input:tt, $f:expr) => {{
940 #[allow(unused_imports)]
941 use $crate::backend::operation::TaskGuard;
942 let () = $crate::data::allow_mut_access::$key;
943 let functor = $f;
944 let $crate::data::CachedDataItemValueRefMut::$key {
945 value,
946 } = $task.get_mut_or_insert_with($crate::data::CachedDataItemKey::$key $input, move || $crate::data::CachedDataItemValue::$key { value: functor() }) else {
947 unreachable!()
948 };
949 value
950 }};
951 ($task:ident, $key:ident, $f:expr) => {
952 $crate::backend::storage::get_mut_or_insert_with!($task, $key {}, $f)
953 };
954}
955
956macro_rules! iter_many {
962 ($task:ident, $key:ident $key_pattern:tt $(if $cond:expr)? => $iter_item:expr) => {{
963 #[allow(unused_imports)]
964 use $crate::backend::operation::TaskGuard;
965 $task
966 .iter($crate::data::CachedDataItemType::$key)
967 .filter_map(|(key, _)| match key {
968 $crate::data::CachedDataItemKey::$key $key_pattern $(if $cond)? => Some(
969 $iter_item
970 ),
971 _ => None,
972 })
973 }};
974 ($task:ident, $key:ident $input:tt $value_pattern:tt $(if $cond:expr)? => $iter_item:expr) => {{
975 #[allow(unused_imports)]
976 use $crate::backend::operation::TaskGuard;
977 $task
978 .iter($crate::data::CachedDataItemType::$key)
979 .filter_map(|(key, value)| match (key, value) {
980 (
981 $crate::data::CachedDataItemKey::$key $input,
982 $crate::data::CachedDataItemValueRef::$key { value: $value_pattern }
983 ) $(if $cond)? => Some($iter_item),
984 _ => None,
985 })
986 }};
987}
988
989macro_rules! get_many {
994 ($($args:tt)*) => {
995 $crate::backend::storage::iter_many!($($args)*).collect()
996 };
997}
998
999macro_rules! update {
1000 ($task:ident, $key:ident $input:tt, $update:expr) => {{
1001 #[allow(unused_imports)]
1002 use $crate::backend::operation::TaskGuard;
1003 #[allow(unused_mut)]
1004 let mut update = $update;
1005 $task.update($crate::data::CachedDataItemKey::$key $input, |old| {
1006 update(old.and_then(|old| {
1007 if let $crate::data::CachedDataItemValue::$key { value } = old {
1008 Some(value)
1009 } else {
1010 None
1011 }
1012 }))
1013 .map(|new| $crate::data::CachedDataItemValue::$key { value: new })
1014 })
1015 }};
1016 ($task:ident, $key:ident, $update:expr) => {
1017 $crate::backend::storage::update!($task, $key {}, $update)
1018 };
1019}
1020
1021macro_rules! update_count {
1022 ($task:ident, $key:ident $input:tt, -$update:expr) => {{
1023 let update = $update;
1024 let mut state_change = false;
1025 $crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
1026 #[allow(unused_comparisons, reason = "type of update might be unsigned, where update < 0 is always false")]
1027 if let Some(old) = old {
1028 let new = old - update;
1029 state_change = old <= 0 && new > 0 || old > 0 && new <= 0;
1030 (new != 0).then_some(new)
1031 } else {
1032 state_change = update < 0;
1033 (update != 0).then_some(-update)
1034 }
1035 });
1036 state_change
1037 }};
1038 ($task:ident, $key:ident $input:tt, $update:expr) => {
1039 match $update {
1040 update => {
1041 let mut state_change = false;
1042 $crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
1043 if let Some(old) = old {
1044 let new = old + update;
1045 state_change = old <= 0 && new > 0 || old > 0 && new <= 0;
1046 (new != 0).then_some(new)
1047 } else {
1048 state_change = update > 0;
1049 (update != 0).then_some(update)
1050 }
1051 });
1052 state_change
1053 }
1054 }
1055 };
1056 ($task:ident, $key:ident, -$update:expr) => {
1057 $crate::backend::storage::update_count!($task, $key {}, -$update)
1058 }; ($task:ident, $key:ident, $update:expr) => {
1059 $crate::backend::storage::update_count!($task, $key {}, $update)
1060 };
1061}
1062
1063macro_rules! remove {
1064 ($task:ident, $key:ident $input:tt) => {{
1065 #[allow(unused_imports)]
1066 use $crate::backend::operation::TaskGuard;
1067 if let Some($crate::data::CachedDataItemValue::$key { value }) = $task.remove(
1068 &$crate::data::CachedDataItemKey::$key $input
1069 ) {
1070 Some(value)
1071 } else {
1072 None
1073 }
1074 }};
1075 ($task:ident, $key:ident) => {
1076 $crate::backend::storage::remove!($task, $key {})
1077 };
1078}
1079
1080pub(crate) use count;
1081pub(crate) use get;
1082pub(crate) use get_many;
1083pub(crate) use get_mut;
1084pub(crate) use get_mut_or_insert_with;
1085pub(crate) use iter_many;
1086pub(crate) use remove;
1087pub(crate) use update;
1088pub(crate) use update_count;
1089
1090pub struct SnapshotGuard<'l> {
1091 storage: &'l Storage,
1092}
1093
1094impl Drop for SnapshotGuard<'_> {
1095 fn drop(&mut self) {
1096 self.storage.end_snapshot();
1097 }
1098}
1099
1100pub struct SnapshotShard<'l, PP, P, PS> {
1101 direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)>,
1102 modified: SmallVec<[TaskId; 4]>,
1103 storage: &'l Storage,
1104 guard: Option<Arc<SnapshotGuard<'l>>>,
1105 process: &'l P,
1106 preprocess: &'l PP,
1107 process_snapshot: &'l PS,
1108}
1109
1110impl<'l, T, R, PP, P, PS> Iterator for SnapshotShard<'l, PP, P, PS>
1111where
1112 PP: for<'a> Fn(TaskId, &'a InnerStorage) -> T + Sync,
1113 P: Fn(TaskId, T) -> R + Sync,
1114 PS: Fn(TaskId, Box<InnerStorageSnapshot>) -> R + Sync,
1115{
1116 type Item = R;
1117
1118 fn next(&mut self) -> Option<Self::Item> {
1119 if let Some((task_id, snapshot)) = self.direct_snapshots.pop() {
1120 return Some((self.process_snapshot)(task_id, snapshot));
1121 }
1122 while let Some(task_id) = self.modified.pop() {
1123 let inner = self.storage.map.get(&task_id).unwrap();
1124 let state = inner.state();
1125 if !state.any_snapshot() {
1126 let preprocessed = (self.preprocess)(task_id, &inner);
1127 drop(inner);
1128 return Some((self.process)(task_id, preprocessed));
1129 } else {
1130 drop(inner);
1131 let maybe_snapshot = {
1132 let mut modified_state = self.storage.modified.get_mut(&task_id).unwrap();
1133 let ModifiedState::Snapshot(snapshot) = &mut *modified_state else {
1134 unreachable!("The snapshot bit was set, so it must be in Snapshot state");
1135 };
1136 snapshot.take()
1137 };
1138 if let Some(snapshot) = maybe_snapshot {
1139 return Some((self.process_snapshot)(task_id, snapshot));
1140 }
1141 }
1142 }
1143 self.guard = None;
1144 None
1145 }
1146}