1use std::{
2 hash::Hash,
3 ops::{Deref, DerefMut},
4 sync::{Arc, atomic::AtomicBool},
5 thread::available_parallelism,
6};
7
8use bitfield::bitfield;
9use smallvec::SmallVec;
10use turbo_tasks::{FxDashMap, TaskId, parallel};
11
12use crate::{
13 backend::dynamic_storage::DynamicStorage,
14 data::{
15 AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
16 CachedDataItemValue, CachedDataItemValueRef, CachedDataItemValueRefMut, OutputValue,
17 },
18 data_storage::{AutoMapStorage, OptionStorage},
19 utils::{
20 dash_map_drop_contents::drop_contents,
21 dash_map_multi::{RefMut, get_multiple_mut},
22 },
23};
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
26pub enum TaskDataCategory {
27 Meta,
28 Data,
29 All,
30}
31
32impl TaskDataCategory {
33 pub fn into_specific(self) -> SpecificTaskDataCategory {
34 match self {
35 TaskDataCategory::Meta => SpecificTaskDataCategory::Meta,
36 TaskDataCategory::Data => SpecificTaskDataCategory::Data,
37 TaskDataCategory::All => unreachable!(),
38 }
39 }
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
43pub enum SpecificTaskDataCategory {
44 Meta,
45 Data,
46}
47
48impl IntoIterator for TaskDataCategory {
49 type Item = TaskDataCategory;
50
51 type IntoIter = TaskDataCategoryIterator;
52
53 fn into_iter(self) -> Self::IntoIter {
54 match self {
55 TaskDataCategory::Meta => TaskDataCategoryIterator::Meta,
56 TaskDataCategory::Data => TaskDataCategoryIterator::Data,
57 TaskDataCategory::All => TaskDataCategoryIterator::All,
58 }
59 }
60}
61
62pub enum TaskDataCategoryIterator {
63 All,
64 Meta,
65 Data,
66 None,
67}
68
69impl Iterator for TaskDataCategoryIterator {
70 type Item = TaskDataCategory;
71
72 fn next(&mut self) -> Option<Self::Item> {
73 match self {
74 TaskDataCategoryIterator::All => {
75 *self = TaskDataCategoryIterator::Data;
76 Some(TaskDataCategory::Meta)
77 }
78 TaskDataCategoryIterator::Meta => {
79 *self = TaskDataCategoryIterator::None;
80 Some(TaskDataCategory::Meta)
81 }
82 TaskDataCategoryIterator::Data => {
83 *self = TaskDataCategoryIterator::None;
84 Some(TaskDataCategory::Data)
85 }
86 TaskDataCategoryIterator::None => None,
87 }
88 }
89}
90
91bitfield! {
92 #[derive(Clone, Default)]
94 pub struct InnerStorageState(u32);
95 impl Debug;
96 pub meta_restored, set_meta_restored: 0;
97 pub data_restored, set_data_restored: 1;
98 pub meta_modified, set_meta_modified: 2;
100 pub data_modified, set_data_modified: 3;
101 pub meta_snapshot, set_meta_snapshot: 4;
103 pub data_snapshot, set_data_snapshot: 5;
104 pub prefetched, set_prefetched: 6;
106}
107
108impl InnerStorageState {
109 pub fn set_restored(&mut self, category: TaskDataCategory) {
110 match category {
111 TaskDataCategory::Meta => {
112 self.set_meta_restored(true);
113 }
114 TaskDataCategory::Data => {
115 self.set_data_restored(true);
116 }
117 TaskDataCategory::All => {
118 self.set_meta_restored(true);
119 self.set_data_restored(true);
120 }
121 }
122 }
123
124 pub fn is_restored(&self, category: TaskDataCategory) -> bool {
125 match category {
126 TaskDataCategory::Meta => self.meta_restored(),
127 TaskDataCategory::Data => self.data_restored(),
128 TaskDataCategory::All => self.meta_restored() && self.data_restored(),
129 }
130 }
131
132 pub fn any_snapshot(&self) -> bool {
133 self.meta_snapshot() || self.data_snapshot()
134 }
135
136 pub fn any_modified(&self) -> bool {
137 self.meta_modified() || self.data_modified()
138 }
139}
140
141pub struct InnerStorageSnapshot {
142 aggregation_number: OptionStorage<AggregationNumber>,
143 output_dependent: AutoMapStorage<TaskId, ()>,
144 output: OptionStorage<OutputValue>,
145 upper: AutoMapStorage<TaskId, u32>,
146 dynamic: DynamicStorage,
147 pub meta_modified: bool,
148 pub data_modified: bool,
149}
150
151impl From<&InnerStorage> for InnerStorageSnapshot {
152 fn from(inner: &InnerStorage) -> Self {
153 Self {
154 aggregation_number: inner.aggregation_number.clone(),
155 output_dependent: inner.output_dependent.clone(),
156 output: inner.output.clone(),
157 upper: inner.upper.clone(),
158 dynamic: inner.dynamic.snapshot_for_persisting(),
159 meta_modified: inner.state.meta_modified(),
160 data_modified: inner.state.data_modified(),
161 }
162 }
163}
164
165impl InnerStorageSnapshot {
166 pub fn iter_all(
167 &self,
168 ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)> {
169 use crate::data_storage::Storage;
170 self.dynamic
171 .iter_all()
172 .chain(self.aggregation_number.iter().map(|(_, value)| {
173 (
174 CachedDataItemKey::AggregationNumber {},
175 CachedDataItemValueRef::AggregationNumber { value },
176 )
177 }))
178 .chain(self.output.iter().map(|(_, value)| {
179 (
180 CachedDataItemKey::Output {},
181 CachedDataItemValueRef::Output { value },
182 )
183 }))
184 .chain(self.upper.iter().map(|(k, value)| {
185 (
186 CachedDataItemKey::Upper { task: *k },
187 CachedDataItemValueRef::Upper { value },
188 )
189 }))
190 .chain(self.output_dependent.iter().map(|(k, value)| {
191 (
192 CachedDataItemKey::OutputDependent { task: *k },
193 CachedDataItemValueRef::OutputDependent { value },
194 )
195 }))
196 }
197
198 pub fn len(&self) -> usize {
199 use crate::data_storage::Storage;
200 self.dynamic.len()
201 + self.aggregation_number.len()
202 + self.output.len()
203 + self.upper.len()
204 + self.output_dependent.len()
205 }
206}
207
208#[derive(Debug, Clone)]
209pub struct InnerStorage {
210 aggregation_number: OptionStorage<AggregationNumber>,
211 output_dependent: AutoMapStorage<TaskId, ()>,
212 output: OptionStorage<OutputValue>,
213 upper: AutoMapStorage<TaskId, u32>,
214 dynamic: DynamicStorage,
215 state: InnerStorageState,
216}
217
218impl InnerStorage {
219 fn new() -> Self {
220 Self {
221 aggregation_number: Default::default(),
222 output_dependent: Default::default(),
223 output: Default::default(),
224 upper: Default::default(),
225 dynamic: DynamicStorage::new(),
226 state: InnerStorageState::default(),
227 }
228 }
229
230 pub fn state(&self) -> &InnerStorageState {
231 &self.state
232 }
233
234 pub fn state_mut(&mut self) -> &mut InnerStorageState {
235 &mut self.state
236 }
237}
238
239#[macro_export]
240macro_rules! generate_inner_storage_internal {
241 (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $key_field:ident => $field:ident,) => {
243 if let CachedDataItem::$tag { $key_field, $value } = $item {
244 let result = $self.$field.$fn($key_field, $($args)*);
245 return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $key_field => $field);
246 }
247 };
248 (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident => $field:ident,) => {
249 if let CachedDataItem::$tag { $value } = $item {
250 let result = $self.$field.$fn((), $($args)*);
251 return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag => $field);
252 }
253 };
254 (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
255 $crate::generate_inner_storage_internal!(CachedDataItem: $self, $item, $value, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
256 $crate::generate_inner_storage_internal!(CachedDataItem: $self, $item, $value, $return_ty, $fn($($args)*): $($config)+)
257 };
258 (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $key_field:ident => $field:ident,) => {
260 if let CachedDataItemKey::$tag { $key_field } = $item {
261 let result = $self.$field.$fn($key_field, $($args)*);
262 return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $key_field => $field);
263 }
264 };
265 (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident => $field:ident,) => {
266 if let CachedDataItemKey::$tag { } = $item {
267 let result = $self.$field.$fn(&(), $($args)*);
268 return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag => $field);
269 }
270 };
271 (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
272 $crate::generate_inner_storage_internal!(CachedDataItemKey: $self, $item, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
273 $crate::generate_inner_storage_internal!(CachedDataItemKey: $self, $item, $return_ty, $fn($($args)*): $($config)+)
274 };
275 (CachedDataItemType: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident,) => {
277 if let CachedDataItemType::$tag = $item {
278 let result = $self.$field.$fn($($args)*);
279 return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $($key_field)? => $field);
280 }
281 };
282 (CachedDataItemType: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
283 $crate::generate_inner_storage_internal!(CachedDataItemType: $self, $item, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
284 $crate::generate_inner_storage_internal!(CachedDataItemType: $self, $item, $return_ty, $fn($($args)*): $($config)+)
285 };
286
287 (update: $self:ident, $key:ident, $update:ident: $tag:ident $key_field:ident => $field:ident,) => {
289 if let CachedDataItemKey::$tag { $key_field } = $key {
290 $self.$field.update($key_field, |old| {
291 let old = old.map(|old| CachedDataItemValue::$tag { value: old });
292 let new = $update(old);
293 new.map(|new| if let CachedDataItemValue::$tag { value } = new {
294 value
295 } else {
296 unreachable!()
297 })
298 });
299 return;
300 }
301 };
302 (update: $self:ident, $key:ident, $update:ident: $tag:ident => $field:ident,) => {
303 if let CachedDataItemKey::$tag { } = $key {
304 $self.$field.update((), |old| {
305 let old = old.map(|old| CachedDataItemValue::$tag { value: old });
306 let new = $update(old);
307 new.map(|new| if let CachedDataItemValue::$tag { value } = new {
308 value
309 } else {
310 unreachable!()
311 })
312 });
313 return;
314 }
315 };
316 (update: $self:ident, $key:ident, $update:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
317 $crate::generate_inner_storage_internal!(update: $self, $key, $update: $tag $($key_field)? => $field,);
318 $crate::generate_inner_storage_internal!(update: $self, $key, $update: $($config)+)
319 };
320
321 (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident $key_field:ident => $field:ident,) => {
323 if let CachedDataItemKey::$tag { $key_field } = $key {
324 let value = $self.$field.get_mut_or_insert_with($key_field, || {
325 let value = $insert_with();
326 if let CachedDataItemValue::$tag { value } = value {
327 value
328 } else {
329 unreachable!()
330 }
331 });
332 return CachedDataItemValueRefMut::$tag { value };
333 }
334 };
335 (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident => $field:ident,) => {
336 if let CachedDataItemKey::$tag { } = $key {
337 let value = $self.$field.get_mut_or_insert_with((), || {
338 let value = $insert_with();
339 if let CachedDataItemValue::$tag { value } = value {
340 value
341 } else {
342 unreachable!()
343 }
344 });
345 return CachedDataItemValueRefMut::$tag { value };
346 }
347 };
348 (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
349 $crate::generate_inner_storage_internal!(get_mut_or_insert_with: $self, $key, $insert_with: $tag $($key_field)? => $field,);
350 $crate::generate_inner_storage_internal!(get_mut_or_insert_with: $self, $key, $insert_with: $($config)+)
351 };
352
353 (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident $key_field:ident => $field:ident,) => {
355 if let CachedDataItemType::$tag = $ty {
356 let iter = $self.$field.extract_if(move |key, value| {
357 $f(CachedDataItemKey::$tag { $key_field: *key }, CachedDataItemValueRef::$tag { value })
358 }).map(|($key_field, value)| CachedDataItem::$tag { $key_field, value });
359 return InnerStorageIter::$tag(iter);
360 }
361 };
362 (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident => $field:ident,) => {
363 if let CachedDataItemType::$tag = $ty {
364 let iter = $self.$field.extract_if(move |_, value| {
365 $f(CachedDataItemKey::$tag { }, CachedDataItemValueRef::$tag { value })
366 }).map(|(_, value)| CachedDataItem::$tag { value });
367 return InnerStorageIter::$tag(iter);
368 }
369 };
370 (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
371 $crate::generate_inner_storage_internal!(extract_if: $self, $ty, $f: $tag $($key_field)? => $field,);
372 $crate::generate_inner_storage_internal!(extract_if: $self, $ty, $f: $($config)+)
373 };
374
375 (iter: $self:ident, $ty:ident: $tag:ident $key_field:ident => $field:ident,) => {
377 if let CachedDataItemType::$tag = $ty {
378 let iter = $self.$field.iter().map(|($key_field, value)| (CachedDataItemKey::$tag { $key_field: *$key_field }, CachedDataItemValueRef::$tag { value }));
379 return InnerStorageIter::$tag(iter);
380 }
381 };
382 (iter: $self:ident, $ty:ident: $tag:ident => $field:ident,) => {
383 if let CachedDataItemType::$tag = $ty {
384 let iter = $self.$field.iter().map(|(_, value)| (CachedDataItemKey::$tag { }, CachedDataItemValueRef::$tag { value }));
385 return InnerStorageIter::$tag(iter);
386 }
387 };
388 (iter: $self:ident, $ty:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
389 $crate::generate_inner_storage_internal!(iter: $self, $ty: $tag $($key_field)? => $field,);
390 $crate::generate_inner_storage_internal!(iter: $self, $ty: $($config)+)
391 };
392
393
394 (return_value: $result:ident, none: $($more:tt)*) => {
396 $result
397 };
398 (return_value: $result:ident, option_value: $tag:ident $($more:tt)*) => {
399 $result.map(|value| CachedDataItemValue::$tag { value })
400 };
401 (return_value: $result:ident, option_ref: $tag:ident $($more:tt)*) => {
402 $result.map(|value| CachedDataItemValueRef::$tag { value })
403 };
404 (return_value: $result:ident, option_ref_mut: $tag:ident $($more:tt)*) => {
405 $result.map(|value| CachedDataItemValueRefMut::$tag { value })
406 };
407
408 (input_value: $input:ident, option_value: $tag:ident $($more:tt)*) => {
410 $input.map(|value| {
411 if let CachedDataItemValue::$tag { value } = value {
412 value
413 } else {
414 unreachable!()
415 }
416 })
417 };
418
419}
420
421macro_rules! generate_inner_storage {
422 ($($config:tt)*) => {
423 impl InnerStorage {
424 pub fn add(&mut self, item: CachedDataItem) -> bool {
425 use crate::data_storage::Storage;
426 $crate::generate_inner_storage_internal!(CachedDataItem: self, item, value, none, add(value): $($config)*);
427 self.dynamic.add(item)
428 }
429
430 pub fn insert(&mut self, item: CachedDataItem) -> Option<CachedDataItemValue> {
431 use crate::data_storage::Storage;
432 $crate::generate_inner_storage_internal!(CachedDataItem: self, item, value, option_value, insert(value): $($config)*);
433 self.dynamic.insert(item)
434 }
435
436 pub fn remove(&mut self, key: &CachedDataItemKey) -> Option<CachedDataItemValue> {
437 use crate::data_storage::Storage;
438 $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_value, remove(): $($config)*);
439 self.dynamic.remove(key)
440 }
441
442 pub fn count(&self, ty: CachedDataItemType) -> usize {
443 use crate::data_storage::Storage;
444 $crate::generate_inner_storage_internal!(CachedDataItemType: self, ty, none, len(): $($config)*);
445 self.dynamic.count(ty)
446 }
447
448 pub fn get(&self, key: &CachedDataItemKey) -> Option<CachedDataItemValueRef<'_>> {
449 use crate::data_storage::Storage;
450 $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_ref, get(): $($config)*);
451 self.dynamic.get(key)
452 }
453
454 pub fn contains_key(&self, key: &CachedDataItemKey) -> bool {
455 use crate::data_storage::Storage;
456 $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, none, contains_key(): $($config)*);
457 self.dynamic.contains_key(key)
458 }
459
460 pub fn get_mut(&mut self, key: &CachedDataItemKey) -> Option<CachedDataItemValueRefMut<'_>> {
461 use crate::data_storage::Storage;
462 $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_ref_mut, get_mut(): $($config)*);
463 self.dynamic.get_mut(key)
464 }
465
466 pub fn shrink_to_fit(&mut self, ty: CachedDataItemType) {
467 use crate::data_storage::Storage;
468 $crate::generate_inner_storage_internal!(CachedDataItemType: self, ty, none, shrink_to_fit(): $($config)*);
469 self.dynamic.shrink_to_fit(ty)
470 }
471
472 pub fn update(
473 &mut self,
474 key: CachedDataItemKey,
475 update: impl FnOnce(Option<CachedDataItemValue>) -> Option<CachedDataItemValue>,
476 ) {
477 use crate::data_storage::Storage;
478 $crate::generate_inner_storage_internal!(update: self, key, update: $($config)*);
479 self.dynamic.update(key, update)
480 }
481
482 pub fn extract_if<'l, F>(
483 &'l mut self,
484 ty: CachedDataItemType,
485 mut f: F,
486 ) -> impl Iterator<Item = CachedDataItem> + use<'l, F>
487 where
488 F: for<'a> FnMut(CachedDataItemKey, CachedDataItemValueRef<'a>) -> bool + 'l,
489 {
490 use crate::data_storage::Storage;
491 $crate::generate_inner_storage_internal!(extract_if: self, ty, f: $($config)*);
492 InnerStorageIter::Dynamic(self.dynamic.extract_if(ty, f))
493 }
494
495 pub fn get_mut_or_insert_with(
496 &mut self,
497 key: CachedDataItemKey,
498 f: impl FnOnce() -> CachedDataItemValue,
499 ) -> CachedDataItemValueRefMut<'_>
500 {
501 use crate::data_storage::Storage;
502 $crate::generate_inner_storage_internal!(get_mut_or_insert_with: self, key, f: $($config)*);
503 self.dynamic.get_mut_or_insert_with(key, f)
504 }
505
506 pub fn iter(
507 &self,
508 ty: CachedDataItemType,
509 ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)>
510 {
511 use crate::data_storage::Storage;
512 $crate::generate_inner_storage_internal!(iter: self, ty: $($config)*);
513 InnerStorageIter::Dynamic(self.dynamic.iter(ty))
514 }
515
516 }
517 };
518}
519
520generate_inner_storage!(
521 AggregationNumber => aggregation_number,
522 OutputDependent task => output_dependent,
523 Output => output,
524 Upper task => upper,
525);
526
527enum InnerStorageIter<A, B, C, D, E> {
528 AggregationNumber(A),
529 OutputDependent(B),
530 Output(C),
531 Upper(D),
532 Dynamic(E),
533}
534
535impl<T, A, B, C, D, E> Iterator for InnerStorageIter<A, B, C, D, E>
536where
537 A: Iterator<Item = T>,
538 B: Iterator<Item = T>,
539 C: Iterator<Item = T>,
540 D: Iterator<Item = T>,
541 E: Iterator<Item = T>,
542{
543 type Item = T;
544
545 fn next(&mut self) -> Option<Self::Item> {
546 match self {
547 InnerStorageIter::AggregationNumber(iter) => iter.next(),
548 InnerStorageIter::OutputDependent(iter) => iter.next(),
549 InnerStorageIter::Output(iter) => iter.next(),
550 InnerStorageIter::Upper(iter) => iter.next(),
551 InnerStorageIter::Dynamic(iter) => iter.next(),
552 }
553 }
554}
555
556impl InnerStorage {
557 pub fn iter_all(
558 &self,
559 ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)> {
560 use crate::data_storage::Storage;
561 self.dynamic
562 .iter_all()
563 .chain(self.aggregation_number.iter().map(|(_, value)| {
564 (
565 CachedDataItemKey::AggregationNumber {},
566 CachedDataItemValueRef::AggregationNumber { value },
567 )
568 }))
569 .chain(self.output.iter().map(|(_, value)| {
570 (
571 CachedDataItemKey::Output {},
572 CachedDataItemValueRef::Output { value },
573 )
574 }))
575 .chain(self.upper.iter().map(|(k, value)| {
576 (
577 CachedDataItemKey::Upper { task: *k },
578 CachedDataItemValueRef::Upper { value },
579 )
580 }))
581 .chain(self.output_dependent.iter().map(|(k, value)| {
582 (
583 CachedDataItemKey::OutputDependent { task: *k },
584 CachedDataItemValueRef::OutputDependent { value },
585 )
586 }))
587 }
588
589 pub fn len(&self) -> usize {
590 use crate::data_storage::Storage;
591 self.dynamic.len()
592 + self.aggregation_number.len()
593 + self.output.len()
594 + self.upper.len()
595 + self.output_dependent.len()
596 }
597}
598
599enum ModifiedState {
600 Modified,
603 Snapshot(Option<Box<InnerStorageSnapshot>>),
610}
611
612pub struct Storage {
613 snapshot_mode: AtomicBool,
614 modified: FxDashMap<TaskId, ModifiedState>,
615 map: FxDashMap<TaskId, Box<InnerStorage>>,
616}
617
618impl Storage {
619 pub fn new(small_preallocation: bool) -> Self {
620 let map_capacity: usize = if small_preallocation {
621 1024
622 } else {
623 1024 * 1024
624 };
625 let modified_capacity: usize = if small_preallocation { 0 } else { 1024 };
626 let shard_factor: usize = if small_preallocation { 4 } else { 64 };
627
628 let shard_amount =
629 (available_parallelism().map_or(4, |v| v.get()) * shard_factor).next_power_of_two();
630
631 Self {
632 snapshot_mode: AtomicBool::new(false),
633 modified: FxDashMap::with_capacity_and_hasher_and_shard_amount(
634 modified_capacity,
635 Default::default(),
636 shard_amount,
637 ),
638 map: FxDashMap::with_capacity_and_hasher_and_shard_amount(
639 map_capacity,
640 Default::default(),
641 shard_amount,
642 ),
643 }
644 }
645
646 pub fn take_snapshot<
651 'l,
652 T,
653 R,
654 PP: for<'a> Fn(TaskId, &'a InnerStorage) -> T + Sync,
655 P: Fn(TaskId, T) -> R + Sync,
656 PS: Fn(TaskId, Box<InnerStorageSnapshot>) -> R + Sync,
657 >(
658 &'l self,
659 preprocess: &'l PP,
660 process: &'l P,
661 process_snapshot: &'l PS,
662 ) -> Vec<SnapshotShard<'l, PP, P, PS>> {
663 if !self.snapshot_mode() {
664 self.start_snapshot();
665 }
666
667 let guard = Arc::new(SnapshotGuard { storage: self });
668
669 parallel::map_collect::<_, _, Vec<_>>(self.modified.shards(), |shard| {
672 let mut direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)> = Vec::new();
673 let mut modified: SmallVec<[TaskId; 4]> = SmallVec::new();
674 {
675 let guard = shard.write();
677 for bucket in unsafe { guard.iter() } {
679 let (key, shared_value) = unsafe { bucket.as_mut() };
682 let modified_state = shared_value.get_mut();
683 match modified_state {
684 ModifiedState::Modified => {
685 modified.push(*key);
686 }
687 ModifiedState::Snapshot(snapshot) => {
688 if let Some(snapshot) = snapshot.take() {
689 direct_snapshots.push((*key, snapshot));
690 }
691 }
692 }
693 }
694 drop(guard);
696 }
697
698 SnapshotShard {
699 direct_snapshots,
700 modified,
701 storage: self,
702 guard: Some(guard.clone()),
703 process,
704 preprocess,
705 process_snapshot,
706 }
707 })
708 }
709
710 pub fn start_snapshot(&self) {
712 self.snapshot_mode
713 .store(true, std::sync::atomic::Ordering::Release);
714 }
715
716 fn end_snapshot(&self) {
721 let mut removed_modified = Vec::new();
724 self.modified.retain(|key, inner| {
725 if matches!(inner, ModifiedState::Modified) {
726 removed_modified.push(*key);
727 false
728 } else {
729 true
730 }
731 });
732
733 for key in removed_modified {
735 if let Some(mut inner) = self.map.get_mut(&key) {
736 let state = inner.state_mut();
737 state.set_data_modified(false);
738 state.set_meta_modified(false);
739 }
740 }
741
742 self.snapshot_mode
745 .store(false, std::sync::atomic::Ordering::Release);
746
747 let mut removed_snapshots = Vec::new();
749 for mut item in self.modified.iter_mut() {
750 match item.value() {
751 ModifiedState::Snapshot(_) => {
752 removed_snapshots.push(*item.key());
753 *item.value_mut() = ModifiedState::Modified;
754 }
755 ModifiedState::Modified => {
756 }
759 }
760 }
761
762 for key in removed_snapshots {
764 if let Some(mut inner) = self.map.get_mut(&key) {
765 let state = inner.state_mut();
766 if state.meta_snapshot() {
767 state.set_meta_snapshot(false);
768 state.set_meta_modified(true);
769 }
770 if state.data_snapshot() {
771 state.set_data_snapshot(false);
772 state.set_data_modified(true);
773 }
774 }
775 }
776
777 self.modified.shrink_to_fit();
779 }
780
781 fn snapshot_mode(&self) -> bool {
782 self.snapshot_mode
783 .load(std::sync::atomic::Ordering::Acquire)
784 }
785
786 pub fn access_mut(&self, key: TaskId) -> StorageWriteGuard<'_> {
787 let inner = match self.map.entry(key) {
788 dashmap::mapref::entry::Entry::Occupied(e) => e.into_ref(),
789 dashmap::mapref::entry::Entry::Vacant(e) => e.insert(Box::new(InnerStorage::new())),
790 };
791 StorageWriteGuard {
792 storage: self,
793 inner: inner.into(),
794 }
795 }
796
797 pub fn access_pair_mut(
798 &self,
799 key1: TaskId,
800 key2: TaskId,
801 ) -> (StorageWriteGuard<'_>, StorageWriteGuard<'_>) {
802 let (a, b) = get_multiple_mut(&self.map, key1, key2, || Box::new(InnerStorage::new()));
803 (
804 StorageWriteGuard {
805 storage: self,
806 inner: a,
807 },
808 StorageWriteGuard {
809 storage: self,
810 inner: b,
811 },
812 )
813 }
814
815 pub fn drop_contents(&self) {
816 drop_contents(&self.map);
817 drop_contents(&self.modified);
818 }
819}
820
821pub struct StorageWriteGuard<'a> {
822 storage: &'a Storage,
823 inner: RefMut<'a, TaskId, Box<InnerStorage>>,
824}
825
826impl StorageWriteGuard<'_> {
827 pub fn track_modification(&mut self, category: SpecificTaskDataCategory) {
829 let state = self.inner.state();
830 let snapshot = match category {
831 SpecificTaskDataCategory::Meta => state.meta_snapshot(),
832 SpecificTaskDataCategory::Data => state.data_snapshot(),
833 };
834 if !snapshot {
835 let modified = match category {
836 SpecificTaskDataCategory::Meta => state.meta_modified(),
837 SpecificTaskDataCategory::Data => state.data_modified(),
838 };
839 match (self.storage.snapshot_mode(), modified) {
840 (false, false) => {
841 if !state.any_snapshot() && !state.any_modified() {
843 self.storage
844 .modified
845 .insert(*self.inner.key(), ModifiedState::Modified);
846 }
847 let state = self.inner.state_mut();
848 match category {
849 SpecificTaskDataCategory::Meta => state.set_meta_modified(true),
850 SpecificTaskDataCategory::Data => state.set_data_modified(true),
851 }
852 }
853 (false, true) => {
854 }
857 (true, false) => {
858 if !state.any_snapshot() {
860 self.storage
861 .modified
862 .insert(*self.inner.key(), ModifiedState::Snapshot(None));
863 }
864 let state = self.inner.state_mut();
865 match category {
866 SpecificTaskDataCategory::Meta => state.set_meta_snapshot(true),
867 SpecificTaskDataCategory::Data => state.set_data_snapshot(true),
868 }
869 }
870 (true, true) => {
871 if !state.any_snapshot() {
874 self.storage.modified.insert(
875 *self.inner.key(),
876 ModifiedState::Snapshot(Some(Box::new((&**self.inner).into()))),
877 );
878 }
879 let state = self.inner.state_mut();
880 match category {
881 SpecificTaskDataCategory::Meta => state.set_meta_snapshot(true),
882 SpecificTaskDataCategory::Data => state.set_data_snapshot(true),
883 }
884 }
885 }
886 }
887 }
888}
889
890impl Deref for StorageWriteGuard<'_> {
891 type Target = InnerStorage;
892
893 fn deref(&self) -> &Self::Target {
894 &self.inner
895 }
896}
897
898impl DerefMut for StorageWriteGuard<'_> {
899 fn deref_mut(&mut self) -> &mut Self::Target {
900 &mut self.inner
901 }
902}
903
904macro_rules! count {
905 ($task:ident, $key:ident) => {{ $task.count($crate::data::CachedDataItemType::$key) }};
906}
907
908macro_rules! get {
909 ($task:ident, $key:ident $input:tt) => {{
910 #[allow(unused_imports)]
911 use $crate::backend::operation::TaskGuard;
912 if let Some($crate::data::CachedDataItemValueRef::$key {
913 value,
914 }) = $task.get(&$crate::data::CachedDataItemKey::$key $input) {
915 Some(value)
916 } else {
917 None
918 }
919 }};
920 ($task:ident, $key:ident) => {
921 $crate::backend::storage::get!($task, $key {})
922 };
923}
924
925macro_rules! get_mut {
926 ($task:ident, $key:ident $input:tt) => {{
927 #[allow(unused_imports)]
928 use $crate::backend::operation::TaskGuard;
929 if let Some($crate::data::CachedDataItemValueRefMut::$key {
930 value,
931 }) = $task.get_mut(&$crate::data::CachedDataItemKey::$key $input) {
932 let () = $crate::data::allow_mut_access::$key;
933 Some(value)
934 } else {
935 None
936 }
937 }};
938 ($task:ident, $key:ident) => {
939 $crate::backend::storage::get_mut!($task, $key {})
940 };
941}
942
943macro_rules! get_mut_or_insert_with {
944 ($task:ident, $key:ident $input:tt, $f:expr) => {{
945 #[allow(unused_imports)]
946 use $crate::backend::operation::TaskGuard;
947 let () = $crate::data::allow_mut_access::$key;
948 let functor = $f;
949 let $crate::data::CachedDataItemValueRefMut::$key {
950 value,
951 } = $task.get_mut_or_insert_with($crate::data::CachedDataItemKey::$key $input, move || $crate::data::CachedDataItemValue::$key { value: functor() }) else {
952 unreachable!()
953 };
954 value
955 }};
956 ($task:ident, $key:ident, $f:expr) => {
957 $crate::backend::storage::get_mut_or_insert_with!($task, $key {}, $f)
958 };
959}
960
961macro_rules! iter_many {
967 ($task:ident, $key:ident $key_pattern:tt $(if $cond:expr)? => $iter_item:expr) => {{
968 #[allow(unused_imports)]
969 use $crate::backend::operation::TaskGuard;
970 $task
971 .iter($crate::data::CachedDataItemType::$key)
972 .filter_map(|(key, _)| match key {
973 $crate::data::CachedDataItemKey::$key $key_pattern $(if $cond)? => Some(
974 $iter_item
975 ),
976 _ => None,
977 })
978 }};
979 ($task:ident, $key:ident $input:tt $value_pattern:tt $(if $cond:expr)? => $iter_item:expr) => {{
980 #[allow(unused_imports)]
981 use $crate::backend::operation::TaskGuard;
982 $task
983 .iter($crate::data::CachedDataItemType::$key)
984 .filter_map(|(key, value)| match (key, value) {
985 (
986 $crate::data::CachedDataItemKey::$key $input,
987 $crate::data::CachedDataItemValueRef::$key { value: $value_pattern }
988 ) $(if $cond)? => Some($iter_item),
989 _ => None,
990 })
991 }};
992}
993
994macro_rules! get_many {
999 ($($args:tt)*) => {
1000 $crate::backend::storage::iter_many!($($args)*).collect()
1001 };
1002}
1003
1004macro_rules! update {
1005 ($task:ident, $key:ident $input:tt, $update:expr) => {{
1006 #[allow(unused_imports)]
1007 use $crate::backend::operation::TaskGuard;
1008 #[allow(unused_mut)]
1009 let mut update = $update;
1010 $task.update($crate::data::CachedDataItemKey::$key $input, |old| {
1011 update(old.and_then(|old| {
1012 if let $crate::data::CachedDataItemValue::$key { value } = old {
1013 Some(value)
1014 } else {
1015 None
1016 }
1017 }))
1018 .map(|new| $crate::data::CachedDataItemValue::$key { value: new })
1019 })
1020 }};
1021 ($task:ident, $key:ident, $update:expr) => {
1022 $crate::backend::storage::update!($task, $key {}, $update)
1023 };
1024}
1025
1026macro_rules! update_count {
1027 ($task:ident, $key:ident $input:tt, -$update:expr) => {{
1028 let update = $update;
1029 let mut state_change = false;
1030 $crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
1031 #[allow(unused_comparisons, reason = "type of update might be unsigned, where update < 0 is always false")]
1032 if let Some(old) = old {
1033 let new = old - update;
1034 state_change = old <= 0 && new > 0 || old > 0 && new <= 0;
1035 (new != 0).then_some(new)
1036 } else {
1037 state_change = update < 0;
1038 (update != 0).then_some(-update)
1039 }
1040 });
1041 state_change
1042 }};
1043 ($task:ident, $key:ident $input:tt, $update:expr) => {
1044 match $update {
1045 update => {
1046 let mut state_change = false;
1047 $crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
1048 if let Some(old) = old {
1049 let new = old + update;
1050 state_change = old <= 0 && new > 0 || old > 0 && new <= 0;
1051 (new != 0).then_some(new)
1052 } else {
1053 state_change = update > 0;
1054 (update != 0).then_some(update)
1055 }
1056 });
1057 state_change
1058 }
1059 }
1060 };
1061 ($task:ident, $key:ident, -$update:expr) => {
1062 $crate::backend::storage::update_count!($task, $key {}, -$update)
1063 }; ($task:ident, $key:ident, $update:expr) => {
1064 $crate::backend::storage::update_count!($task, $key {}, $update)
1065 };
1066}
1067
1068macro_rules! remove {
1069 ($task:ident, $key:ident $input:tt) => {{
1070 #[allow(unused_imports)]
1071 use $crate::backend::operation::TaskGuard;
1072 if let Some($crate::data::CachedDataItemValue::$key { value }) = $task.remove(
1073 &$crate::data::CachedDataItemKey::$key $input
1074 ) {
1075 Some(value)
1076 } else {
1077 None
1078 }
1079 }};
1080 ($task:ident, $key:ident) => {
1081 $crate::backend::storage::remove!($task, $key {})
1082 };
1083}
1084
1085pub(crate) use count;
1086pub(crate) use get;
1087pub(crate) use get_many;
1088pub(crate) use get_mut;
1089pub(crate) use get_mut_or_insert_with;
1090pub(crate) use iter_many;
1091pub(crate) use remove;
1092pub(crate) use update;
1093pub(crate) use update_count;
1094
1095pub struct SnapshotGuard<'l> {
1096 storage: &'l Storage,
1097}
1098
1099impl Drop for SnapshotGuard<'_> {
1100 fn drop(&mut self) {
1101 self.storage.end_snapshot();
1102 }
1103}
1104
1105pub struct SnapshotShard<'l, PP, P, PS> {
1106 direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)>,
1107 modified: SmallVec<[TaskId; 4]>,
1108 storage: &'l Storage,
1109 guard: Option<Arc<SnapshotGuard<'l>>>,
1110 process: &'l P,
1111 preprocess: &'l PP,
1112 process_snapshot: &'l PS,
1113}
1114
1115impl<'l, T, R, PP, P, PS> Iterator for SnapshotShard<'l, PP, P, PS>
1116where
1117 PP: for<'a> Fn(TaskId, &'a InnerStorage) -> T + Sync,
1118 P: Fn(TaskId, T) -> R + Sync,
1119 PS: Fn(TaskId, Box<InnerStorageSnapshot>) -> R + Sync,
1120{
1121 type Item = R;
1122
1123 fn next(&mut self) -> Option<Self::Item> {
1124 if let Some((task_id, snapshot)) = self.direct_snapshots.pop() {
1125 return Some((self.process_snapshot)(task_id, snapshot));
1126 }
1127 while let Some(task_id) = self.modified.pop() {
1128 let inner = self.storage.map.get(&task_id).unwrap();
1129 let state = inner.state();
1130 if !state.any_snapshot() {
1131 let preprocessed = (self.preprocess)(task_id, &inner);
1132 drop(inner);
1133 return Some((self.process)(task_id, preprocessed));
1134 } else {
1135 drop(inner);
1136 let maybe_snapshot = {
1137 let mut modified_state = self.storage.modified.get_mut(&task_id).unwrap();
1138 let ModifiedState::Snapshot(snapshot) = &mut *modified_state else {
1139 unreachable!("The snapshot bit was set, so it must be in Snapshot state");
1140 };
1141 snapshot.take()
1142 };
1143 if let Some(snapshot) = maybe_snapshot {
1144 return Some((self.process_snapshot)(task_id, snapshot));
1145 }
1146 }
1147 }
1148 self.guard = None;
1149 None
1150 }
1151}