1use std::{
2 hash::Hash,
3 ops::{Deref, DerefMut},
4 sync::{Arc, atomic::AtomicBool},
5 thread::available_parallelism,
6};
7
8use bitfield::bitfield;
9use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
10use turbo_tasks::{FxDashMap, TaskId};
11
12use crate::{
13 backend::dynamic_storage::DynamicStorage,
14 data::{
15 AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
16 CachedDataItemValue, CachedDataItemValueRef, CachedDataItemValueRefMut, OutputValue,
17 },
18 data_storage::{AutoMapStorage, OptionStorage},
19 utils::dash_map_multi::{RefMut, get_multiple_mut},
20};
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
23pub enum TaskDataCategory {
24 Meta,
25 Data,
26 All,
27}
28
29impl TaskDataCategory {
30 pub fn into_specific(self) -> SpecificTaskDataCategory {
31 match self {
32 TaskDataCategory::Meta => SpecificTaskDataCategory::Meta,
33 TaskDataCategory::Data => SpecificTaskDataCategory::Data,
34 TaskDataCategory::All => unreachable!(),
35 }
36 }
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
40pub enum SpecificTaskDataCategory {
41 Meta,
42 Data,
43}
44
45impl IntoIterator for TaskDataCategory {
46 type Item = TaskDataCategory;
47
48 type IntoIter = TaskDataCategoryIterator;
49
50 fn into_iter(self) -> Self::IntoIter {
51 match self {
52 TaskDataCategory::Meta => TaskDataCategoryIterator::Meta,
53 TaskDataCategory::Data => TaskDataCategoryIterator::Data,
54 TaskDataCategory::All => TaskDataCategoryIterator::All,
55 }
56 }
57}
58
59pub enum TaskDataCategoryIterator {
60 All,
61 Meta,
62 Data,
63 None,
64}
65
66impl Iterator for TaskDataCategoryIterator {
67 type Item = TaskDataCategory;
68
69 fn next(&mut self) -> Option<Self::Item> {
70 match self {
71 TaskDataCategoryIterator::All => {
72 *self = TaskDataCategoryIterator::Data;
73 Some(TaskDataCategory::Meta)
74 }
75 TaskDataCategoryIterator::Meta => {
76 *self = TaskDataCategoryIterator::None;
77 Some(TaskDataCategory::Meta)
78 }
79 TaskDataCategoryIterator::Data => {
80 *self = TaskDataCategoryIterator::None;
81 Some(TaskDataCategory::Data)
82 }
83 TaskDataCategoryIterator::None => None,
84 }
85 }
86}
87
88bitfield! {
89 #[derive(Clone, Default)]
91 pub struct InnerStorageState(u32);
92 impl Debug;
93 pub meta_restored, set_meta_restored: 0;
94 pub data_restored, set_data_restored: 1;
95 pub meta_modified, set_meta_modified: 2;
97 pub data_modified, set_data_modified: 3;
98 pub meta_snapshot, set_meta_snapshot: 4;
100 pub data_snapshot, set_data_snapshot: 4;
101}
102
103impl InnerStorageState {
104 pub fn set_restored(&mut self, category: TaskDataCategory) {
105 match category {
106 TaskDataCategory::Meta => {
107 self.set_meta_restored(true);
108 }
109 TaskDataCategory::Data => {
110 self.set_data_restored(true);
111 }
112 TaskDataCategory::All => {
113 self.set_meta_restored(true);
114 self.set_data_restored(true);
115 }
116 }
117 }
118
119 pub fn is_restored(&self, category: TaskDataCategory) -> bool {
120 match category {
121 TaskDataCategory::Meta => self.meta_restored(),
122 TaskDataCategory::Data => self.data_restored(),
123 TaskDataCategory::All => self.meta_restored() && self.data_restored(),
124 }
125 }
126
127 pub fn any_snapshot(&self) -> bool {
128 self.meta_snapshot() || self.data_snapshot()
129 }
130
131 pub fn any_modified(&self) -> bool {
132 self.meta_modified() || self.data_modified()
133 }
134}
135
136pub struct InnerStorageSnapshot {
137 aggregation_number: OptionStorage<AggregationNumber>,
138 output_dependent: AutoMapStorage<TaskId, ()>,
139 output: OptionStorage<OutputValue>,
140 upper: AutoMapStorage<TaskId, i32>,
141 dynamic: DynamicStorage,
142 pub meta_modified: bool,
143 pub data_modified: bool,
144}
145
146impl From<&InnerStorage> for InnerStorageSnapshot {
147 fn from(inner: &InnerStorage) -> Self {
148 Self {
149 aggregation_number: inner.aggregation_number.clone(),
150 output_dependent: inner.output_dependent.clone(),
151 output: inner.output.clone(),
152 upper: inner.upper.clone(),
153 dynamic: inner.dynamic.snapshot_for_persisting(),
154 meta_modified: inner.state.meta_modified(),
155 data_modified: inner.state.data_modified(),
156 }
157 }
158}
159
160impl InnerStorageSnapshot {
161 pub fn iter_all(
162 &self,
163 ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)> {
164 use crate::data_storage::Storage;
165 self.dynamic
166 .iter_all()
167 .chain(self.aggregation_number.iter().map(|(_, value)| {
168 (
169 CachedDataItemKey::AggregationNumber {},
170 CachedDataItemValueRef::AggregationNumber { value },
171 )
172 }))
173 .chain(self.output.iter().map(|(_, value)| {
174 (
175 CachedDataItemKey::Output {},
176 CachedDataItemValueRef::Output { value },
177 )
178 }))
179 .chain(self.upper.iter().map(|(k, value)| {
180 (
181 CachedDataItemKey::Upper { task: *k },
182 CachedDataItemValueRef::Upper { value },
183 )
184 }))
185 .chain(self.output_dependent.iter().map(|(k, value)| {
186 (
187 CachedDataItemKey::OutputDependent { task: *k },
188 CachedDataItemValueRef::OutputDependent { value },
189 )
190 }))
191 }
192
193 pub fn len(&self) -> usize {
194 use crate::data_storage::Storage;
195 self.dynamic.len()
196 + self.aggregation_number.len()
197 + self.output.len()
198 + self.upper.len()
199 + self.output_dependent.len()
200 }
201}
202
203#[derive(Debug, Clone)]
204pub struct InnerStorage {
205 aggregation_number: OptionStorage<AggregationNumber>,
206 output_dependent: AutoMapStorage<TaskId, ()>,
207 output: OptionStorage<OutputValue>,
208 upper: AutoMapStorage<TaskId, i32>,
209 dynamic: DynamicStorage,
210 state: InnerStorageState,
211}
212
213impl InnerStorage {
214 fn new() -> Self {
215 Self {
216 aggregation_number: Default::default(),
217 output_dependent: Default::default(),
218 output: Default::default(),
219 upper: Default::default(),
220 dynamic: DynamicStorage::new(),
221 state: InnerStorageState::default(),
222 }
223 }
224
225 pub fn state(&self) -> &InnerStorageState {
226 &self.state
227 }
228
229 pub fn state_mut(&mut self) -> &mut InnerStorageState {
230 &mut self.state
231 }
232}
233
234#[macro_export]
235macro_rules! generate_inner_storage_internal {
236 (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $key_field:ident => $field:ident,) => {
238 if let CachedDataItem::$tag { $key_field, $value } = $item {
239 let result = $self.$field.$fn($key_field, $($args)*);
240 return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $key_field => $field);
241 }
242 };
243 (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident => $field:ident,) => {
244 if let CachedDataItem::$tag { $value } = $item {
245 let result = $self.$field.$fn((), $($args)*);
246 return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag => $field);
247 }
248 };
249 (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
250 $crate::generate_inner_storage_internal!(CachedDataItem: $self, $item, $value, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
251 $crate::generate_inner_storage_internal!(CachedDataItem: $self, $item, $value, $return_ty, $fn($($args)*): $($config)+)
252 };
253 (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $key_field:ident => $field:ident,) => {
255 if let CachedDataItemKey::$tag { $key_field } = $item {
256 let result = $self.$field.$fn($key_field, $($args)*);
257 return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $key_field => $field);
258 }
259 };
260 (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident => $field:ident,) => {
261 if let CachedDataItemKey::$tag { } = $item {
262 let result = $self.$field.$fn(&(), $($args)*);
263 return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag => $field);
264 }
265 };
266 (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
267 $crate::generate_inner_storage_internal!(CachedDataItemKey: $self, $item, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
268 $crate::generate_inner_storage_internal!(CachedDataItemKey: $self, $item, $return_ty, $fn($($args)*): $($config)+)
269 };
270 (CachedDataItemType: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident,) => {
272 if let CachedDataItemType::$tag = $item {
273 let result = $self.$field.$fn($($args)*);
274 return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $($key_field)? => $field);
275 }
276 };
277 (CachedDataItemType: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
278 $crate::generate_inner_storage_internal!(CachedDataItemType: $self, $item, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
279 $crate::generate_inner_storage_internal!(CachedDataItemType: $self, $item, $return_ty, $fn($($args)*): $($config)+)
280 };
281
282 (update: $self:ident, $key:ident, $update:ident: $tag:ident $key_field:ident => $field:ident,) => {
284 if let CachedDataItemKey::$tag { $key_field } = $key {
285 $self.$field.update($key_field, |old| {
286 let old = old.map(|old| CachedDataItemValue::$tag { value: old });
287 let new = $update(old);
288 new.map(|new| if let CachedDataItemValue::$tag { value } = new {
289 value
290 } else {
291 unreachable!()
292 })
293 });
294 return;
295 }
296 };
297 (update: $self:ident, $key:ident, $update:ident: $tag:ident => $field:ident,) => {
298 if let CachedDataItemKey::$tag { } = $key {
299 $self.$field.update((), |old| {
300 let old = old.map(|old| CachedDataItemValue::$tag { value: old });
301 let new = $update(old);
302 new.map(|new| if let CachedDataItemValue::$tag { value } = new {
303 value
304 } else {
305 unreachable!()
306 })
307 });
308 return;
309 }
310 };
311 (update: $self:ident, $key:ident, $update:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
312 $crate::generate_inner_storage_internal!(update: $self, $key, $update: $tag $($key_field)? => $field,);
313 $crate::generate_inner_storage_internal!(update: $self, $key, $update: $($config)+)
314 };
315
316 (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident $key_field:ident => $field:ident,) => {
318 if let CachedDataItemKey::$tag { $key_field } = $key {
319 let value = $self.$field.get_mut_or_insert_with($key_field, || {
320 let value = $insert_with();
321 if let CachedDataItemValue::$tag { value } = value {
322 value
323 } else {
324 unreachable!()
325 }
326 });
327 return CachedDataItemValueRefMut::$tag { value };
328 }
329 };
330 (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident => $field:ident,) => {
331 if let CachedDataItemKey::$tag { } = $key {
332 let value = $self.$field.get_mut_or_insert_with((), || {
333 let value = $insert_with();
334 if let CachedDataItemValue::$tag { value } = value {
335 value
336 } else {
337 unreachable!()
338 }
339 });
340 return CachedDataItemValueRefMut::$tag { value };
341 }
342 };
343 (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
344 $crate::generate_inner_storage_internal!(get_mut_or_insert_with: $self, $key, $insert_with: $tag $($key_field)? => $field,);
345 $crate::generate_inner_storage_internal!(get_mut_or_insert_with: $self, $key, $insert_with: $($config)+)
346 };
347
348 (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident $key_field:ident => $field:ident,) => {
350 if let CachedDataItemType::$tag = $ty {
351 let iter = $self.$field.extract_if(move |key, value| {
352 $f(CachedDataItemKey::$tag { $key_field: *key }, CachedDataItemValueRef::$tag { value })
353 }).map(|($key_field, value)| CachedDataItem::$tag { $key_field, value });
354 return InnerStorageIter::$tag(iter);
355 }
356 };
357 (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident => $field:ident,) => {
358 if let CachedDataItemType::$tag = $ty {
359 let iter = $self.$field.extract_if(move |_, value| {
360 $f(CachedDataItemKey::$tag { }, CachedDataItemValueRef::$tag { value })
361 }).map(|(_, value)| CachedDataItem::$tag { value });
362 return InnerStorageIter::$tag(iter);
363 }
364 };
365 (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
366 $crate::generate_inner_storage_internal!(extract_if: $self, $ty, $f: $tag $($key_field)? => $field,);
367 $crate::generate_inner_storage_internal!(extract_if: $self, $ty, $f: $($config)+)
368 };
369
370 (iter: $self:ident, $ty:ident: $tag:ident $key_field:ident => $field:ident,) => {
372 if let CachedDataItemType::$tag = $ty {
373 let iter = $self.$field.iter().map(|($key_field, value)| (CachedDataItemKey::$tag { $key_field: *$key_field }, CachedDataItemValueRef::$tag { value }));
374 return InnerStorageIter::$tag(iter);
375 }
376 };
377 (iter: $self:ident, $ty:ident: $tag:ident => $field:ident,) => {
378 if let CachedDataItemType::$tag = $ty {
379 let iter = $self.$field.iter().map(|(_, value)| (CachedDataItemKey::$tag { }, CachedDataItemValueRef::$tag { value }));
380 return InnerStorageIter::$tag(iter);
381 }
382 };
383 (iter: $self:ident, $ty:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
384 $crate::generate_inner_storage_internal!(iter: $self, $ty: $tag $($key_field)? => $field,);
385 $crate::generate_inner_storage_internal!(iter: $self, $ty: $($config)+)
386 };
387
388
389 (return_value: $result:ident, none: $($more:tt)*) => {
391 $result
392 };
393 (return_value: $result:ident, option_value: $tag:ident $($more:tt)*) => {
394 $result.map(|value| CachedDataItemValue::$tag { value })
395 };
396 (return_value: $result:ident, option_ref: $tag:ident $($more:tt)*) => {
397 $result.map(|value| CachedDataItemValueRef::$tag { value })
398 };
399 (return_value: $result:ident, option_ref_mut: $tag:ident $($more:tt)*) => {
400 $result.map(|value| CachedDataItemValueRefMut::$tag { value })
401 };
402
403 (input_value: $input:ident, option_value: $tag:ident $($more:tt)*) => {
405 $input.map(|value| {
406 if let CachedDataItemValue::$tag { value } = value {
407 value
408 } else {
409 unreachable!()
410 }
411 })
412 };
413
414}
415
416macro_rules! generate_inner_storage {
417 ($($config:tt)*) => {
418 impl InnerStorage {
419 pub fn add(&mut self, item: CachedDataItem) -> bool {
420 use crate::data_storage::Storage;
421 $crate::generate_inner_storage_internal!(CachedDataItem: self, item, value, none, add(value): $($config)*);
422 self.dynamic.add(item)
423 }
424
425 pub fn insert(&mut self, item: CachedDataItem) -> Option<CachedDataItemValue> {
426 use crate::data_storage::Storage;
427 $crate::generate_inner_storage_internal!(CachedDataItem: self, item, value, option_value, insert(value): $($config)*);
428 self.dynamic.insert(item)
429 }
430
431 pub fn remove(&mut self, key: &CachedDataItemKey) -> Option<CachedDataItemValue> {
432 use crate::data_storage::Storage;
433 $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_value, remove(): $($config)*);
434 self.dynamic.remove(key)
435 }
436
437 pub fn count(&self, ty: CachedDataItemType) -> usize {
438 use crate::data_storage::Storage;
439 $crate::generate_inner_storage_internal!(CachedDataItemType: self, ty, none, len(): $($config)*);
440 self.dynamic.count(ty)
441 }
442
443 pub fn get(&self, key: &CachedDataItemKey) -> Option<CachedDataItemValueRef> {
444 use crate::data_storage::Storage;
445 $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_ref, get(): $($config)*);
446 self.dynamic.get(key)
447 }
448
449 pub fn contains_key(&self, key: &CachedDataItemKey) -> bool {
450 use crate::data_storage::Storage;
451 $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, none, contains_key(): $($config)*);
452 self.dynamic.contains_key(key)
453 }
454
455 pub fn get_mut(&mut self, key: &CachedDataItemKey) -> Option<CachedDataItemValueRefMut> {
456 use crate::data_storage::Storage;
457 $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_ref_mut, get_mut(): $($config)*);
458 self.dynamic.get_mut(key)
459 }
460
461 pub fn shrink_to_fit(&mut self, ty: CachedDataItemType) {
462 use crate::data_storage::Storage;
463 $crate::generate_inner_storage_internal!(CachedDataItemType: self, ty, none, shrink_to_fit(): $($config)*);
464 self.dynamic.shrink_to_fit(ty)
465 }
466
467 pub fn update(
468 &mut self,
469 key: CachedDataItemKey,
470 update: impl FnOnce(Option<CachedDataItemValue>) -> Option<CachedDataItemValue>,
471 ) {
472 use crate::data_storage::Storage;
473 $crate::generate_inner_storage_internal!(update: self, key, update: $($config)*);
474 self.dynamic.update(key, update)
475 }
476
477 pub fn extract_if<'l, F>(
478 &'l mut self,
479 ty: CachedDataItemType,
480 mut f: F,
481 ) -> impl Iterator<Item = CachedDataItem> + use<'l, F>
482 where
483 F: for<'a> FnMut(CachedDataItemKey, CachedDataItemValueRef<'a>) -> bool + 'l,
484 {
485 use crate::data_storage::Storage;
486 $crate::generate_inner_storage_internal!(extract_if: self, ty, f: $($config)*);
487 InnerStorageIter::Dynamic(self.dynamic.extract_if(ty, f))
488 }
489
490 pub fn get_mut_or_insert_with(
491 &mut self,
492 key: CachedDataItemKey,
493 f: impl FnOnce() -> CachedDataItemValue,
494 ) -> CachedDataItemValueRefMut<'_>
495 {
496 use crate::data_storage::Storage;
497 $crate::generate_inner_storage_internal!(get_mut_or_insert_with: self, key, f: $($config)*);
498 self.dynamic.get_mut_or_insert_with(key, f)
499 }
500
501 pub fn iter(
502 &self,
503 ty: CachedDataItemType,
504 ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)>
505 {
506 use crate::data_storage::Storage;
507 $crate::generate_inner_storage_internal!(iter: self, ty: $($config)*);
508 InnerStorageIter::Dynamic(self.dynamic.iter(ty))
509 }
510
511 }
512 };
513}
514
515generate_inner_storage!(
516 AggregationNumber => aggregation_number,
517 OutputDependent task => output_dependent,
518 Output => output,
519 Upper task => upper,
520);
521
522enum InnerStorageIter<A, B, C, D, E> {
523 AggregationNumber(A),
524 OutputDependent(B),
525 Output(C),
526 Upper(D),
527 Dynamic(E),
528}
529
530impl<T, A, B, C, D, E> Iterator for InnerStorageIter<A, B, C, D, E>
531where
532 A: Iterator<Item = T>,
533 B: Iterator<Item = T>,
534 C: Iterator<Item = T>,
535 D: Iterator<Item = T>,
536 E: Iterator<Item = T>,
537{
538 type Item = T;
539
540 fn next(&mut self) -> Option<Self::Item> {
541 match self {
542 InnerStorageIter::AggregationNumber(iter) => iter.next(),
543 InnerStorageIter::OutputDependent(iter) => iter.next(),
544 InnerStorageIter::Output(iter) => iter.next(),
545 InnerStorageIter::Upper(iter) => iter.next(),
546 InnerStorageIter::Dynamic(iter) => iter.next(),
547 }
548 }
549}
550
551impl InnerStorage {
552 pub fn iter_all(
553 &self,
554 ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)> {
555 use crate::data_storage::Storage;
556 self.dynamic
557 .iter_all()
558 .chain(self.aggregation_number.iter().map(|(_, value)| {
559 (
560 CachedDataItemKey::AggregationNumber {},
561 CachedDataItemValueRef::AggregationNumber { value },
562 )
563 }))
564 .chain(self.output.iter().map(|(_, value)| {
565 (
566 CachedDataItemKey::Output {},
567 CachedDataItemValueRef::Output { value },
568 )
569 }))
570 .chain(self.upper.iter().map(|(k, value)| {
571 (
572 CachedDataItemKey::Upper { task: *k },
573 CachedDataItemValueRef::Upper { value },
574 )
575 }))
576 .chain(self.output_dependent.iter().map(|(k, value)| {
577 (
578 CachedDataItemKey::OutputDependent { task: *k },
579 CachedDataItemValueRef::OutputDependent { value },
580 )
581 }))
582 }
583
584 pub fn len(&self) -> usize {
585 use crate::data_storage::Storage;
586 self.dynamic.len()
587 + self.aggregation_number.len()
588 + self.output.len()
589 + self.upper.len()
590 + self.output_dependent.len()
591 }
592}
593
594enum ModifiedState {
595 Modified,
598 Snapshot(Option<Box<InnerStorageSnapshot>>),
605}
606
607pub struct Storage {
608 snapshot_mode: AtomicBool,
609 modified: FxDashMap<TaskId, ModifiedState>,
610 map: FxDashMap<TaskId, Box<InnerStorage>>,
611}
612
613impl Storage {
614 pub fn new() -> Self {
615 let shard_amount =
616 (available_parallelism().map_or(4, |v| v.get()) * 64).next_power_of_two();
617 Self {
618 snapshot_mode: AtomicBool::new(false),
619 modified: FxDashMap::with_capacity_and_hasher_and_shard_amount(
620 1024,
621 Default::default(),
622 shard_amount,
623 ),
624 map: FxDashMap::with_capacity_and_hasher_and_shard_amount(
625 1024 * 1024,
626 Default::default(),
627 shard_amount,
628 ),
629 }
630 }
631
632 pub fn take_snapshot<
637 'l,
638 T,
639 R,
640 PP: for<'a> Fn(TaskId, &'a InnerStorage) -> T + Sync,
641 P: Fn(TaskId, T) -> R + Sync,
642 PS: Fn(TaskId, Box<InnerStorageSnapshot>) -> R + Sync,
643 >(
644 &'l self,
645 preprocess: &'l PP,
646 process: &'l P,
647 process_snapshot: &'l PS,
648 ) -> Vec<SnapshotShard<'l, PP, P, PS>> {
649 if !self.snapshot_mode() {
650 self.start_snapshot();
651 }
652
653 let guard = Arc::new(SnapshotGuard { storage: self });
654
655 self.modified
658 .shards()
659 .par_iter()
660 .with_max_len(1)
661 .map(|shard| {
662 let mut direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)> = Vec::new();
663 let mut modified: Vec<TaskId> = Vec::new();
664 {
665 let guard = shard.write();
667 for bucket in unsafe { guard.iter() } {
669 let (key, shared_value) = unsafe { bucket.as_mut() };
672 let modified_state = shared_value.get_mut();
673 match modified_state {
674 ModifiedState::Modified => {
675 modified.push(*key);
676 }
677 ModifiedState::Snapshot(snapshot) => {
678 if let Some(snapshot) = snapshot.take() {
679 direct_snapshots.push((*key, snapshot));
680 }
681 }
682 }
683 }
684 drop(guard);
686 }
687
688 SnapshotShard {
689 direct_snapshots,
690 modified,
691 storage: self,
692 guard: Some(guard.clone()),
693 process,
694 preprocess,
695 process_snapshot,
696 }
697 })
698 .collect::<Vec<_>>()
699 }
700
701 pub fn start_snapshot(&self) {
703 self.snapshot_mode
704 .store(true, std::sync::atomic::Ordering::Release);
705 }
706
707 fn end_snapshot(&self) {
712 let mut removed_modified = Vec::new();
715 self.modified.retain(|key, inner| {
716 if matches!(inner, ModifiedState::Modified) {
717 removed_modified.push(*key);
718 false
719 } else {
720 true
721 }
722 });
723
724 for key in removed_modified {
726 if let Some(mut inner) = self.map.get_mut(&key) {
727 let state = inner.state_mut();
728 state.set_data_modified(false);
729 state.set_meta_modified(false);
730 }
731 }
732
733 self.snapshot_mode
736 .store(false, std::sync::atomic::Ordering::Release);
737
738 let mut removed_snapshots = Vec::new();
740 for mut item in self.modified.iter_mut() {
741 match item.value() {
742 ModifiedState::Snapshot(_) => {
743 removed_snapshots.push(*item.key());
744 *item.value_mut() = ModifiedState::Modified;
745 }
746 ModifiedState::Modified => {
747 }
750 }
751 }
752
753 for key in removed_snapshots {
755 if let Some(mut inner) = self.map.get_mut(&key) {
756 let state = inner.state_mut();
757 if state.meta_snapshot() {
758 state.set_meta_snapshot(false);
759 state.set_meta_modified(true);
760 }
761 if state.data_snapshot() {
762 state.set_data_snapshot(false);
763 state.set_data_modified(true);
764 }
765 }
766 }
767
768 self.modified.shrink_to_fit();
770 }
771
772 fn snapshot_mode(&self) -> bool {
773 self.snapshot_mode
774 .load(std::sync::atomic::Ordering::Acquire)
775 }
776
777 pub fn access_mut(&self, key: TaskId) -> StorageWriteGuard<'_> {
778 let inner = match self.map.entry(key) {
779 dashmap::mapref::entry::Entry::Occupied(e) => e.into_ref(),
780 dashmap::mapref::entry::Entry::Vacant(e) => e.insert(Box::new(InnerStorage::new())),
781 };
782 StorageWriteGuard {
783 storage: self,
784 inner: inner.into(),
785 }
786 }
787
788 pub fn access_pair_mut(
789 &self,
790 key1: TaskId,
791 key2: TaskId,
792 ) -> (StorageWriteGuard<'_>, StorageWriteGuard<'_>) {
793 let (a, b) = get_multiple_mut(&self.map, key1, key2, || Box::new(InnerStorage::new()));
794 (
795 StorageWriteGuard {
796 storage: self,
797 inner: a,
798 },
799 StorageWriteGuard {
800 storage: self,
801 inner: b,
802 },
803 )
804 }
805}
806
807pub struct StorageWriteGuard<'a> {
808 storage: &'a Storage,
809 inner: RefMut<'a, TaskId, Box<InnerStorage>>,
810}
811
812impl StorageWriteGuard<'_> {
813 pub fn track_modification(&mut self, category: SpecificTaskDataCategory) {
815 let state = self.inner.state();
816 let snapshot = match category {
817 SpecificTaskDataCategory::Meta => state.meta_snapshot(),
818 SpecificTaskDataCategory::Data => state.data_snapshot(),
819 };
820 if !snapshot {
821 let modified = match category {
822 SpecificTaskDataCategory::Meta => state.meta_modified(),
823 SpecificTaskDataCategory::Data => state.data_modified(),
824 };
825 match (self.storage.snapshot_mode(), modified) {
826 (false, false) => {
827 if !state.any_snapshot() && !state.any_modified() {
829 self.storage
830 .modified
831 .insert(*self.inner.key(), ModifiedState::Modified);
832 }
833 let state = self.inner.state_mut();
834 match category {
835 SpecificTaskDataCategory::Meta => state.set_meta_modified(true),
836 SpecificTaskDataCategory::Data => state.set_data_modified(true),
837 }
838 }
839 (false, true) => {
840 }
843 (true, false) => {
844 if !state.any_snapshot() {
846 self.storage
847 .modified
848 .insert(*self.inner.key(), ModifiedState::Snapshot(None));
849 }
850 let state = self.inner.state_mut();
851 match category {
852 SpecificTaskDataCategory::Meta => state.set_meta_snapshot(true),
853 SpecificTaskDataCategory::Data => state.set_data_snapshot(true),
854 }
855 }
856 (true, true) => {
857 if !state.any_snapshot() {
860 self.storage.modified.insert(
861 *self.inner.key(),
862 ModifiedState::Snapshot(Some(Box::new((&**self.inner).into()))),
863 );
864 }
865 let state = self.inner.state_mut();
866 match category {
867 SpecificTaskDataCategory::Meta => state.set_meta_snapshot(true),
868 SpecificTaskDataCategory::Data => state.set_data_snapshot(true),
869 }
870 }
871 }
872 }
873 }
874}
875
876impl Deref for StorageWriteGuard<'_> {
877 type Target = InnerStorage;
878
879 fn deref(&self) -> &Self::Target {
880 &self.inner
881 }
882}
883
884impl DerefMut for StorageWriteGuard<'_> {
885 fn deref_mut(&mut self) -> &mut Self::Target {
886 &mut self.inner
887 }
888}
889
890macro_rules! count {
891 ($task:ident, $key:ident) => {{ $task.count($crate::data::CachedDataItemType::$key) }};
892}
893
894macro_rules! get {
895 ($task:ident, $key:ident $input:tt) => {{
896 #[allow(unused_imports)]
897 use $crate::backend::operation::TaskGuard;
898 if let Some($crate::data::CachedDataItemValueRef::$key {
899 value,
900 }) = $task.get(&$crate::data::CachedDataItemKey::$key $input) {
901 Some(value)
902 } else {
903 None
904 }
905 }};
906 ($task:ident, $key:ident) => {
907 $crate::backend::storage::get!($task, $key {})
908 };
909}
910
911macro_rules! get_mut {
912 ($task:ident, $key:ident $input:tt) => {{
913 #[allow(unused_imports)]
914 use $crate::backend::operation::TaskGuard;
915 if let Some($crate::data::CachedDataItemValueRefMut::$key {
916 value,
917 }) = $task.get_mut(&$crate::data::CachedDataItemKey::$key $input) {
918 let () = $crate::data::allow_mut_access::$key;
919 Some(value)
920 } else {
921 None
922 }
923 }};
924 ($task:ident, $key:ident) => {
925 $crate::backend::storage::get_mut!($task, $key {})
926 };
927}
928
929macro_rules! get_mut_or_insert_with {
930 ($task:ident, $key:ident $input:tt, $f:expr) => {{
931 #[allow(unused_imports)]
932 use $crate::backend::operation::TaskGuard;
933 let () = $crate::data::allow_mut_access::$key;
934 let functor = $f;
935 let $crate::data::CachedDataItemValueRefMut::$key {
936 value,
937 } = $task.get_mut_or_insert_with($crate::data::CachedDataItemKey::$key $input, move || $crate::data::CachedDataItemValue::$key { value: functor() }) else {
938 unreachable!()
939 };
940 value
941 }};
942 ($task:ident, $key:ident, $f:expr) => {
943 $crate::backend::storage::get_mut_or_insert_with!($task, $key {}, $f)
944 };
945}
946
947macro_rules! iter_many {
953 ($task:ident, $key:ident $key_pattern:tt $(if $cond:expr)? => $iter_item:expr) => {{
954 #[allow(unused_imports)]
955 use $crate::backend::operation::TaskGuard;
956 $task
957 .iter($crate::data::CachedDataItemType::$key)
958 .filter_map(|(key, _)| match key {
959 $crate::data::CachedDataItemKey::$key $key_pattern $(if $cond)? => Some(
960 $iter_item
961 ),
962 _ => None,
963 })
964 }};
965 ($task:ident, $key:ident $input:tt $value_pattern:tt $(if $cond:expr)? => $iter_item:expr) => {{
966 #[allow(unused_imports)]
967 use $crate::backend::operation::TaskGuard;
968 $task
969 .iter($crate::data::CachedDataItemType::$key)
970 .filter_map(|(key, value)| match (key, value) {
971 (
972 $crate::data::CachedDataItemKey::$key $input,
973 $crate::data::CachedDataItemValueRef::$key { value: $value_pattern }
974 ) $(if $cond)? => Some($iter_item),
975 _ => None,
976 })
977 }};
978}
979
980macro_rules! get_many {
985 ($($args:tt)*) => {
986 $crate::backend::storage::iter_many!($($args)*).collect()
987 };
988}
989
990macro_rules! update {
991 ($task:ident, $key:ident $input:tt, $update:expr) => {{
992 #[allow(unused_imports)]
993 use $crate::backend::operation::TaskGuard;
994 #[allow(unused_mut)]
995 let mut update = $update;
996 $task.update($crate::data::CachedDataItemKey::$key $input, |old| {
997 update(old.and_then(|old| {
998 if let $crate::data::CachedDataItemValue::$key { value } = old {
999 Some(value)
1000 } else {
1001 None
1002 }
1003 }))
1004 .map(|new| $crate::data::CachedDataItemValue::$key { value: new })
1005 })
1006 }};
1007 ($task:ident, $key:ident, $update:expr) => {
1008 $crate::backend::storage::update!($task, $key {}, $update)
1009 };
1010}
1011
1012macro_rules! update_count {
1013 ($task:ident, $key:ident $input:tt, -$update:expr) => {{
1014 let update = $update;
1015 let mut state_change = false;
1016 $crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
1017 #[allow(unused_comparisons, reason = "type of update might be unsigned, where update < 0 is always false")]
1018 if let Some(old) = old {
1019 let new = old - update;
1020 state_change = old <= 0 && new > 0 || old > 0 && new <= 0;
1021 (new != 0).then_some(new)
1022 } else {
1023 state_change = update < 0;
1024 (update != 0).then_some(-update)
1025 }
1026 });
1027 state_change
1028 }};
1029 ($task:ident, $key:ident $input:tt, $update:expr) => {
1030 match $update {
1031 update => {
1032 let mut state_change = false;
1033 $crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
1034 if let Some(old) = old {
1035 let new = old + update;
1036 state_change = old <= 0 && new > 0 || old > 0 && new <= 0;
1037 (new != 0).then_some(new)
1038 } else {
1039 state_change = update > 0;
1040 (update != 0).then_some(update)
1041 }
1042 });
1043 state_change
1044 }
1045 }
1046 };
1047 ($task:ident, $key:ident, -$update:expr) => {
1048 $crate::backend::storage::update_count!($task, $key {}, -$update)
1049 }; ($task:ident, $key:ident, $update:expr) => {
1050 $crate::backend::storage::update_count!($task, $key {}, $update)
1051 };
1052}
1053
1054macro_rules! remove {
1055 ($task:ident, $key:ident $input:tt) => {{
1056 #[allow(unused_imports)]
1057 use $crate::backend::operation::TaskGuard;
1058 if let Some($crate::data::CachedDataItemValue::$key { value }) = $task.remove(
1059 &$crate::data::CachedDataItemKey::$key $input
1060 ) {
1061 Some(value)
1062 } else {
1063 None
1064 }
1065 }};
1066 ($task:ident, $key:ident) => {
1067 $crate::backend::storage::remove!($task, $key {})
1068 };
1069}
1070
1071pub(crate) use count;
1072pub(crate) use get;
1073pub(crate) use get_many;
1074pub(crate) use get_mut;
1075pub(crate) use get_mut_or_insert_with;
1076pub(crate) use iter_many;
1077pub(crate) use remove;
1078pub(crate) use update;
1079pub(crate) use update_count;
1080
1081pub struct SnapshotGuard<'l> {
1082 storage: &'l Storage,
1083}
1084
1085impl Drop for SnapshotGuard<'_> {
1086 fn drop(&mut self) {
1087 self.storage.end_snapshot();
1088 }
1089}
1090
1091pub struct SnapshotShard<'l, PP, P, PS> {
1092 direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)>,
1093 modified: Vec<TaskId>,
1094 storage: &'l Storage,
1095 guard: Option<Arc<SnapshotGuard<'l>>>,
1096 process: &'l P,
1097 preprocess: &'l PP,
1098 process_snapshot: &'l PS,
1099}
1100
1101impl<'l, T, R, PP, P, PS> Iterator for SnapshotShard<'l, PP, P, PS>
1102where
1103 PP: for<'a> Fn(TaskId, &'a InnerStorage) -> T + Sync,
1104 P: Fn(TaskId, T) -> R + Sync,
1105 PS: Fn(TaskId, Box<InnerStorageSnapshot>) -> R + Sync,
1106{
1107 type Item = R;
1108
1109 fn next(&mut self) -> Option<Self::Item> {
1110 if let Some((task_id, snapshot)) = self.direct_snapshots.pop() {
1111 return Some((self.process_snapshot)(task_id, snapshot));
1112 }
1113 while let Some(task_id) = self.modified.pop() {
1114 let inner = self.storage.map.get(&task_id).unwrap();
1115 let state = inner.state();
1116 if !state.any_snapshot() {
1117 let preprocessed = (self.preprocess)(task_id, &inner);
1118 drop(inner);
1119 return Some((self.process)(task_id, preprocessed));
1120 } else {
1121 drop(inner);
1122 let maybe_snapshot = {
1123 let mut modified_state = self.storage.modified.get_mut(&task_id).unwrap();
1124 let ModifiedState::Snapshot(snapshot) = &mut *modified_state else {
1125 unreachable!("The snapshot bit was set, so it must be in Snapshot state");
1126 };
1127 snapshot.take()
1128 };
1129 if let Some(snapshot) = maybe_snapshot {
1130 return Some((self.process_snapshot)(task_id, snapshot));
1131 }
1132 }
1133 }
1134 self.guard = None;
1135 None
1136 }
1137}