1use std::{
2 hash::Hash,
3 ops::{Deref, DerefMut},
4 sync::{Arc, atomic::AtomicBool},
5};
6
7use bitfield::bitfield;
8use smallvec::SmallVec;
9use turbo_tasks::{FxDashMap, TaskId, parallel};
10
11use crate::{
12 backend::dynamic_storage::DynamicStorage,
13 data::{
14 AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
15 CachedDataItemValue, CachedDataItemValueRef, CachedDataItemValueRefMut, OutputValue,
16 },
17 data_storage::{AutoMapStorage, OptionStorage},
18 utils::{
19 dash_map_drop_contents::drop_contents,
20 dash_map_multi::{RefMut, get_multiple_mut},
21 },
22};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
25pub enum TaskDataCategory {
26 Meta,
27 Data,
28 All,
29}
30
31impl TaskDataCategory {
32 pub fn into_specific(self) -> SpecificTaskDataCategory {
33 match self {
34 TaskDataCategory::Meta => SpecificTaskDataCategory::Meta,
35 TaskDataCategory::Data => SpecificTaskDataCategory::Data,
36 TaskDataCategory::All => unreachable!(),
37 }
38 }
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
42pub enum SpecificTaskDataCategory {
43 Meta,
44 Data,
45}
46
47impl IntoIterator for TaskDataCategory {
48 type Item = TaskDataCategory;
49
50 type IntoIter = TaskDataCategoryIterator;
51
52 fn into_iter(self) -> Self::IntoIter {
53 match self {
54 TaskDataCategory::Meta => TaskDataCategoryIterator::Meta,
55 TaskDataCategory::Data => TaskDataCategoryIterator::Data,
56 TaskDataCategory::All => TaskDataCategoryIterator::All,
57 }
58 }
59}
60
61pub enum TaskDataCategoryIterator {
62 All,
63 Meta,
64 Data,
65 None,
66}
67
68impl Iterator for TaskDataCategoryIterator {
69 type Item = TaskDataCategory;
70
71 fn next(&mut self) -> Option<Self::Item> {
72 match self {
73 TaskDataCategoryIterator::All => {
74 *self = TaskDataCategoryIterator::Data;
75 Some(TaskDataCategory::Meta)
76 }
77 TaskDataCategoryIterator::Meta => {
78 *self = TaskDataCategoryIterator::None;
79 Some(TaskDataCategory::Meta)
80 }
81 TaskDataCategoryIterator::Data => {
82 *self = TaskDataCategoryIterator::None;
83 Some(TaskDataCategory::Data)
84 }
85 TaskDataCategoryIterator::None => None,
86 }
87 }
88}
89
90bitfield! {
91 #[derive(Clone, Default)]
93 pub struct InnerStorageState(u32);
94 impl Debug;
95 pub meta_restored, set_meta_restored: 0;
96 pub data_restored, set_data_restored: 1;
97 pub meta_modified, set_meta_modified: 2;
99 pub data_modified, set_data_modified: 3;
100 pub meta_snapshot, set_meta_snapshot: 4;
102 pub data_snapshot, set_data_snapshot: 5;
103 pub prefetched, set_prefetched: 6;
105}
106
107impl InnerStorageState {
108 pub fn set_restored(&mut self, category: TaskDataCategory) {
109 match category {
110 TaskDataCategory::Meta => {
111 self.set_meta_restored(true);
112 }
113 TaskDataCategory::Data => {
114 self.set_data_restored(true);
115 }
116 TaskDataCategory::All => {
117 self.set_meta_restored(true);
118 self.set_data_restored(true);
119 }
120 }
121 }
122
123 pub fn is_restored(&self, category: TaskDataCategory) -> bool {
124 match category {
125 TaskDataCategory::Meta => self.meta_restored(),
126 TaskDataCategory::Data => self.data_restored(),
127 TaskDataCategory::All => self.meta_restored() && self.data_restored(),
128 }
129 }
130
131 pub fn any_snapshot(&self) -> bool {
132 self.meta_snapshot() || self.data_snapshot()
133 }
134
135 pub fn any_modified(&self) -> bool {
136 self.meta_modified() || self.data_modified()
137 }
138}
139
140pub struct InnerStorageSnapshot {
141 aggregation_number: OptionStorage<AggregationNumber>,
142 output_dependent: AutoMapStorage<TaskId, ()>,
143 output: OptionStorage<OutputValue>,
144 upper: AutoMapStorage<TaskId, u32>,
145 dynamic: DynamicStorage,
146 pub meta_modified: bool,
147 pub data_modified: bool,
148}
149
150impl From<&InnerStorage> for InnerStorageSnapshot {
151 fn from(inner: &InnerStorage) -> Self {
152 Self {
153 aggregation_number: inner.aggregation_number.clone(),
154 output_dependent: inner.output_dependent.clone(),
155 output: inner.output.clone(),
156 upper: inner.upper.clone(),
157 dynamic: inner.dynamic.snapshot_for_persisting(),
158 meta_modified: inner.state.meta_modified(),
159 data_modified: inner.state.data_modified(),
160 }
161 }
162}
163
164impl InnerStorageSnapshot {
165 pub fn iter_all(
166 &self,
167 ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)> {
168 use crate::data_storage::Storage;
169 self.dynamic
170 .iter_all()
171 .chain(self.aggregation_number.iter().map(|(_, value)| {
172 (
173 CachedDataItemKey::AggregationNumber {},
174 CachedDataItemValueRef::AggregationNumber { value },
175 )
176 }))
177 .chain(self.output.iter().map(|(_, value)| {
178 (
179 CachedDataItemKey::Output {},
180 CachedDataItemValueRef::Output { value },
181 )
182 }))
183 .chain(self.upper.iter().map(|(k, value)| {
184 (
185 CachedDataItemKey::Upper { task: *k },
186 CachedDataItemValueRef::Upper { value },
187 )
188 }))
189 .chain(self.output_dependent.iter().map(|(k, value)| {
190 (
191 CachedDataItemKey::OutputDependent { task: *k },
192 CachedDataItemValueRef::OutputDependent { value },
193 )
194 }))
195 }
196
197 pub fn len(&self) -> usize {
198 use crate::data_storage::Storage;
199 self.dynamic.len()
200 + self.aggregation_number.len()
201 + self.output.len()
202 + self.upper.len()
203 + self.output_dependent.len()
204 }
205}
206
207#[derive(Debug, Clone)]
208pub struct InnerStorage {
209 aggregation_number: OptionStorage<AggregationNumber>,
210 output_dependent: AutoMapStorage<TaskId, ()>,
211 output: OptionStorage<OutputValue>,
212 upper: AutoMapStorage<TaskId, u32>,
213 dynamic: DynamicStorage,
214 state: InnerStorageState,
215}
216
217impl InnerStorage {
218 fn new() -> Self {
219 Self {
220 aggregation_number: Default::default(),
221 output_dependent: Default::default(),
222 output: Default::default(),
223 upper: Default::default(),
224 dynamic: DynamicStorage::new(),
225 state: InnerStorageState::default(),
226 }
227 }
228
229 pub fn state(&self) -> &InnerStorageState {
230 &self.state
231 }
232
233 pub fn state_mut(&mut self) -> &mut InnerStorageState {
234 &mut self.state
235 }
236}
237
238#[macro_export]
239macro_rules! generate_inner_storage_internal {
240 (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $key_field:ident => $field:ident,) => {
242 if let CachedDataItem::$tag { $key_field, $value } = $item {
243 let result = $self.$field.$fn($key_field, $($args)*);
244 return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $key_field => $field);
245 }
246 };
247 (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident => $field:ident,) => {
248 if let CachedDataItem::$tag { $value } = $item {
249 let result = $self.$field.$fn((), $($args)*);
250 return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag => $field);
251 }
252 };
253 (CachedDataItem: $self:ident, $item:ident, $value:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
254 $crate::generate_inner_storage_internal!(CachedDataItem: $self, $item, $value, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
255 $crate::generate_inner_storage_internal!(CachedDataItem: $self, $item, $value, $return_ty, $fn($($args)*): $($config)+)
256 };
257 (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $key_field:ident => $field:ident,) => {
259 if let CachedDataItemKey::$tag { $key_field } = $item {
260 let result = $self.$field.$fn($key_field, $($args)*);
261 return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $key_field => $field);
262 }
263 };
264 (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident => $field:ident,) => {
265 if let CachedDataItemKey::$tag { } = $item {
266 let result = $self.$field.$fn(&(), $($args)*);
267 return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag => $field);
268 }
269 };
270 (CachedDataItemKey: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
271 $crate::generate_inner_storage_internal!(CachedDataItemKey: $self, $item, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
272 $crate::generate_inner_storage_internal!(CachedDataItemKey: $self, $item, $return_ty, $fn($($args)*): $($config)+)
273 };
274 (CachedDataItemType: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident,) => {
276 if let CachedDataItemType::$tag = $item {
277 let result = $self.$field.$fn($($args)*);
278 return $crate::generate_inner_storage_internal!(return_value: result, $return_ty: $tag $($key_field)? => $field);
279 }
280 };
281 (CachedDataItemType: $self:ident, $item:ident, $return_ty:tt, $fn:ident($($args:tt)*): $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
282 $crate::generate_inner_storage_internal!(CachedDataItemType: $self, $item, $return_ty, $fn($($args)*): $tag $($key_field)? => $field,);
283 $crate::generate_inner_storage_internal!(CachedDataItemType: $self, $item, $return_ty, $fn($($args)*): $($config)+)
284 };
285
286 (update: $self:ident, $key:ident, $update:ident: $tag:ident $key_field:ident => $field:ident,) => {
288 if let CachedDataItemKey::$tag { $key_field } = $key {
289 $self.$field.update($key_field, |old| {
290 let old = old.map(|old| CachedDataItemValue::$tag { value: old });
291 let new = $update(old);
292 new.map(|new| if let CachedDataItemValue::$tag { value } = new {
293 value
294 } else {
295 unreachable!()
296 })
297 });
298 return;
299 }
300 };
301 (update: $self:ident, $key:ident, $update:ident: $tag:ident => $field:ident,) => {
302 if let CachedDataItemKey::$tag { } = $key {
303 $self.$field.update((), |old| {
304 let old = old.map(|old| CachedDataItemValue::$tag { value: old });
305 let new = $update(old);
306 new.map(|new| if let CachedDataItemValue::$tag { value } = new {
307 value
308 } else {
309 unreachable!()
310 })
311 });
312 return;
313 }
314 };
315 (update: $self:ident, $key:ident, $update:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
316 $crate::generate_inner_storage_internal!(update: $self, $key, $update: $tag $($key_field)? => $field,);
317 $crate::generate_inner_storage_internal!(update: $self, $key, $update: $($config)+)
318 };
319
320 (extend: $self:ident, $ty:ident, $items:ident: $tag:ident $key_field:ident => $field:ident,) => {
322 if let CachedDataItemType::$tag = $ty {
323 return $self.$field.extend($items.map(|item| {
324 let pair = turbo_tasks::KeyValuePair::into_key_and_value(item);
325 if let (CachedDataItemKey::$tag { $key_field }, CachedDataItemValue::$tag { value }) = pair {
326 ($key_field, value)
327 } else {
328 unreachable!()
329 }
330 }));
331 }
332 };
333 (extend: $self:ident, $ty:ident, $items:ident: $tag:ident => $field:ident,) => {
334 if let CachedDataItemType::$tag = $ty {
335 return $self.$field.extend($items.map(|item| {
336 let pair = turbo_tasks::KeyValuePair::into_key_and_value(item);
337 if let (_, CachedDataItemValue::$tag { value }) = pair {
338 ((), value)
339 } else {
340 unreachable!()
341 }
342 }));
343 }
344 };
345 (extend: $self:ident, $ty:ident, $items:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
346 $crate::generate_inner_storage_internal!(extend: $self, $ty, $items: $tag $($key_field)? => $field,);
347 $crate::generate_inner_storage_internal!(extend: $self, $ty, $items: $($config)+)
348 };
349
350 (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident $key_field:ident => $field:ident,) => {
352 if let CachedDataItemKey::$tag { $key_field } = $key {
353 let value = $self.$field.get_mut_or_insert_with($key_field, || {
354 let value = $insert_with();
355 if let CachedDataItemValue::$tag { value } = value {
356 value
357 } else {
358 unreachable!()
359 }
360 });
361 return CachedDataItemValueRefMut::$tag { value };
362 }
363 };
364 (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident => $field:ident,) => {
365 if let CachedDataItemKey::$tag { } = $key {
366 let value = $self.$field.get_mut_or_insert_with((), || {
367 let value = $insert_with();
368 if let CachedDataItemValue::$tag { value } = value {
369 value
370 } else {
371 unreachable!()
372 }
373 });
374 return CachedDataItemValueRefMut::$tag { value };
375 }
376 };
377 (get_mut_or_insert_with: $self:ident, $key:ident, $insert_with:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
378 $crate::generate_inner_storage_internal!(get_mut_or_insert_with: $self, $key, $insert_with: $tag $($key_field)? => $field,);
379 $crate::generate_inner_storage_internal!(get_mut_or_insert_with: $self, $key, $insert_with: $($config)+)
380 };
381
382 (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident $key_field:ident => $field:ident,) => {
384 if let CachedDataItemType::$tag = $ty {
385 let iter = $self.$field.extract_if(move |key, value| {
386 $f(CachedDataItemKey::$tag { $key_field: *key }, CachedDataItemValueRef::$tag { value })
387 }).map(|($key_field, value)| CachedDataItem::$tag { $key_field, value });
388 return InnerStorageIter::$tag(iter);
389 }
390 };
391 (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident => $field:ident,) => {
392 if let CachedDataItemType::$tag = $ty {
393 let iter = $self.$field.extract_if(move |_, value| {
394 $f(CachedDataItemKey::$tag { }, CachedDataItemValueRef::$tag { value })
395 }).map(|(_, value)| CachedDataItem::$tag { value });
396 return InnerStorageIter::$tag(iter);
397 }
398 };
399 (extract_if: $self:ident, $ty:ident, $f:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
400 $crate::generate_inner_storage_internal!(extract_if: $self, $ty, $f: $tag $($key_field)? => $field,);
401 $crate::generate_inner_storage_internal!(extract_if: $self, $ty, $f: $($config)+)
402 };
403
404 (iter: $self:ident, $ty:ident: $tag:ident $key_field:ident => $field:ident,) => {
406 if let CachedDataItemType::$tag = $ty {
407 let iter = $self.$field.iter().map(|($key_field, value)| (CachedDataItemKey::$tag { $key_field: *$key_field }, CachedDataItemValueRef::$tag { value }));
408 return InnerStorageIter::$tag(iter);
409 }
410 };
411 (iter: $self:ident, $ty:ident: $tag:ident => $field:ident,) => {
412 if let CachedDataItemType::$tag = $ty {
413 let iter = $self.$field.iter().map(|(_, value)| (CachedDataItemKey::$tag { }, CachedDataItemValueRef::$tag { value }));
414 return InnerStorageIter::$tag(iter);
415 }
416 };
417 (iter: $self:ident, $ty:ident: $tag:ident $($key_field:ident)? => $field:ident, $($config:tt)+) => {
418 $crate::generate_inner_storage_internal!(iter: $self, $ty: $tag $($key_field)? => $field,);
419 $crate::generate_inner_storage_internal!(iter: $self, $ty: $($config)+)
420 };
421
422
423 (return_value: $result:ident, none: $($more:tt)*) => {
425 $result
426 };
427 (return_value: $result:ident, option_value: $tag:ident $($more:tt)*) => {
428 $result.map(|value| CachedDataItemValue::$tag { value })
429 };
430 (return_value: $result:ident, option_ref: $tag:ident $($more:tt)*) => {
431 $result.map(|value| CachedDataItemValueRef::$tag { value })
432 };
433 (return_value: $result:ident, option_ref_mut: $tag:ident $($more:tt)*) => {
434 $result.map(|value| CachedDataItemValueRefMut::$tag { value })
435 };
436
437 (input_value: $input:ident, option_value: $tag:ident $($more:tt)*) => {
439 $input.map(|value| {
440 if let CachedDataItemValue::$tag { value } = value {
441 value
442 } else {
443 unreachable!()
444 }
445 })
446 };
447
448}
449
450macro_rules! generate_inner_storage {
451 ($($config:tt)*) => {
452 impl InnerStorage {
453 pub fn add(&mut self, item: CachedDataItem) -> bool {
454 use crate::data_storage::Storage;
455 $crate::generate_inner_storage_internal!(CachedDataItem: self, item, value, none, add(value): $($config)*);
456 self.dynamic.add(item)
457 }
458
459 pub fn extend(&mut self, ty: CachedDataItemType, items: impl Iterator<Item = CachedDataItem>) -> bool {
460 use crate::data_storage::Storage;
461 $crate::generate_inner_storage_internal!(extend: self, ty, items: $($config)*);
462 self.dynamic.extend(ty, items)
463 }
464
465 pub fn insert(&mut self, item: CachedDataItem) -> Option<CachedDataItemValue> {
466 use crate::data_storage::Storage;
467 $crate::generate_inner_storage_internal!(CachedDataItem: self, item, value, option_value, insert(value): $($config)*);
468 self.dynamic.insert(item)
469 }
470
471 pub fn remove(&mut self, key: &CachedDataItemKey) -> Option<CachedDataItemValue> {
472 use crate::data_storage::Storage;
473 $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_value, remove(): $($config)*);
474 self.dynamic.remove(key)
475 }
476
477 pub fn count(&self, ty: CachedDataItemType) -> usize {
478 use crate::data_storage::Storage;
479 $crate::generate_inner_storage_internal!(CachedDataItemType: self, ty, none, len(): $($config)*);
480 self.dynamic.count(ty)
481 }
482
483 pub fn get(&self, key: &CachedDataItemKey) -> Option<CachedDataItemValueRef<'_>> {
484 use crate::data_storage::Storage;
485 $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_ref, get(): $($config)*);
486 self.dynamic.get(key)
487 }
488
489 pub fn contains_key(&self, key: &CachedDataItemKey) -> bool {
490 use crate::data_storage::Storage;
491 $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, none, contains_key(): $($config)*);
492 self.dynamic.contains_key(key)
493 }
494
495 pub fn get_mut(&mut self, key: &CachedDataItemKey) -> Option<CachedDataItemValueRefMut<'_>> {
496 use crate::data_storage::Storage;
497 $crate::generate_inner_storage_internal!(CachedDataItemKey: self, key, option_ref_mut, get_mut(): $($config)*);
498 self.dynamic.get_mut(key)
499 }
500
501 pub fn shrink_to_fit(&mut self, ty: CachedDataItemType) {
502 use crate::data_storage::Storage;
503 $crate::generate_inner_storage_internal!(CachedDataItemType: self, ty, none, shrink_to_fit(): $($config)*);
504 self.dynamic.shrink_to_fit(ty)
505 }
506
507 pub fn update(
508 &mut self,
509 key: CachedDataItemKey,
510 update: impl FnOnce(Option<CachedDataItemValue>) -> Option<CachedDataItemValue>,
511 ) {
512 use crate::data_storage::Storage;
513 $crate::generate_inner_storage_internal!(update: self, key, update: $($config)*);
514 self.dynamic.update(key, update)
515 }
516
517 pub fn extract_if<'l, F>(
518 &'l mut self,
519 ty: CachedDataItemType,
520 mut f: F,
521 ) -> impl Iterator<Item = CachedDataItem> + use<'l, F>
522 where
523 F: for<'a> FnMut(CachedDataItemKey, CachedDataItemValueRef<'a>) -> bool + 'l,
524 {
525 use crate::data_storage::Storage;
526 $crate::generate_inner_storage_internal!(extract_if: self, ty, f: $($config)*);
527 InnerStorageIter::Dynamic(self.dynamic.extract_if(ty, f))
528 }
529
530 pub fn get_mut_or_insert_with(
531 &mut self,
532 key: CachedDataItemKey,
533 f: impl FnOnce() -> CachedDataItemValue,
534 ) -> CachedDataItemValueRefMut<'_>
535 {
536 use crate::data_storage::Storage;
537 $crate::generate_inner_storage_internal!(get_mut_or_insert_with: self, key, f: $($config)*);
538 self.dynamic.get_mut_or_insert_with(key, f)
539 }
540
541 pub fn iter(
542 &self,
543 ty: CachedDataItemType,
544 ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)>
545 {
546 use crate::data_storage::Storage;
547 $crate::generate_inner_storage_internal!(iter: self, ty: $($config)*);
548 InnerStorageIter::Dynamic(self.dynamic.iter(ty))
549 }
550
551 }
552 };
553}
554
555generate_inner_storage!(
556 AggregationNumber => aggregation_number,
557 OutputDependent task => output_dependent,
558 Output => output,
559 Upper task => upper,
560);
561
562enum InnerStorageIter<A, B, C, D, E> {
563 AggregationNumber(A),
564 OutputDependent(B),
565 Output(C),
566 Upper(D),
567 Dynamic(E),
568}
569
570impl<T, A, B, C, D, E> Iterator for InnerStorageIter<A, B, C, D, E>
571where
572 A: Iterator<Item = T>,
573 B: Iterator<Item = T>,
574 C: Iterator<Item = T>,
575 D: Iterator<Item = T>,
576 E: Iterator<Item = T>,
577{
578 type Item = T;
579
580 fn next(&mut self) -> Option<Self::Item> {
581 match self {
582 InnerStorageIter::AggregationNumber(iter) => iter.next(),
583 InnerStorageIter::OutputDependent(iter) => iter.next(),
584 InnerStorageIter::Output(iter) => iter.next(),
585 InnerStorageIter::Upper(iter) => iter.next(),
586 InnerStorageIter::Dynamic(iter) => iter.next(),
587 }
588 }
589}
590
591impl InnerStorage {
592 pub fn iter_all(
593 &self,
594 ) -> impl Iterator<Item = (CachedDataItemKey, CachedDataItemValueRef<'_>)> {
595 use crate::data_storage::Storage;
596 self.dynamic
597 .iter_all()
598 .chain(self.aggregation_number.iter().map(|(_, value)| {
599 (
600 CachedDataItemKey::AggregationNumber {},
601 CachedDataItemValueRef::AggregationNumber { value },
602 )
603 }))
604 .chain(self.output.iter().map(|(_, value)| {
605 (
606 CachedDataItemKey::Output {},
607 CachedDataItemValueRef::Output { value },
608 )
609 }))
610 .chain(self.upper.iter().map(|(k, value)| {
611 (
612 CachedDataItemKey::Upper { task: *k },
613 CachedDataItemValueRef::Upper { value },
614 )
615 }))
616 .chain(self.output_dependent.iter().map(|(k, value)| {
617 (
618 CachedDataItemKey::OutputDependent { task: *k },
619 CachedDataItemValueRef::OutputDependent { value },
620 )
621 }))
622 }
623
624 pub fn len(&self) -> usize {
625 use crate::data_storage::Storage;
626 self.dynamic.len()
627 + self.aggregation_number.len()
628 + self.output.len()
629 + self.upper.len()
630 + self.output_dependent.len()
631 }
632}
633
634enum ModifiedState {
635 Modified,
638 Snapshot(Option<Box<InnerStorageSnapshot>>),
645}
646
647pub struct Storage {
648 snapshot_mode: AtomicBool,
649 modified: FxDashMap<TaskId, ModifiedState>,
650 map: FxDashMap<TaskId, Box<InnerStorage>>,
651}
652
653impl Storage {
654 pub fn new(shard_amount: usize, small_preallocation: bool) -> Self {
655 let map_capacity: usize = if small_preallocation {
656 1024
657 } else {
658 1024 * 1024
659 };
660 let modified_capacity: usize = if small_preallocation { 0 } else { 1024 };
661
662 Self {
663 snapshot_mode: AtomicBool::new(false),
664 modified: FxDashMap::with_capacity_and_hasher_and_shard_amount(
665 modified_capacity,
666 Default::default(),
667 shard_amount,
668 ),
669 map: FxDashMap::with_capacity_and_hasher_and_shard_amount(
670 map_capacity,
671 Default::default(),
672 shard_amount,
673 ),
674 }
675 }
676
677 pub fn take_snapshot<
682 'l,
683 T,
684 R,
685 PP: for<'a> Fn(TaskId, &'a InnerStorage) -> T + Sync,
686 P: Fn(TaskId, T) -> R + Sync,
687 PS: Fn(TaskId, Box<InnerStorageSnapshot>) -> R + Sync,
688 >(
689 &'l self,
690 preprocess: &'l PP,
691 process: &'l P,
692 process_snapshot: &'l PS,
693 ) -> Vec<SnapshotShard<'l, PP, P, PS>> {
694 if !self.snapshot_mode() {
695 self.start_snapshot();
696 }
697
698 let guard = Arc::new(SnapshotGuard { storage: self });
699
700 parallel::map_collect::<_, _, Vec<_>>(self.modified.shards(), |shard| {
703 let mut direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)> = Vec::new();
704 let mut modified: SmallVec<[TaskId; 4]> = SmallVec::new();
705 {
706 let guard = shard.write();
708 for bucket in unsafe { guard.iter() } {
710 let (key, shared_value) = unsafe { bucket.as_mut() };
713 let modified_state = shared_value.get_mut();
714 match modified_state {
715 ModifiedState::Modified => {
716 modified.push(*key);
717 }
718 ModifiedState::Snapshot(snapshot) => {
719 if let Some(snapshot) = snapshot.take() {
720 direct_snapshots.push((*key, snapshot));
721 }
722 }
723 }
724 }
725 drop(guard);
727 }
728
729 SnapshotShard {
730 direct_snapshots,
731 modified,
732 storage: self,
733 guard: Some(guard.clone()),
734 process,
735 preprocess,
736 process_snapshot,
737 }
738 })
739 }
740
741 pub fn start_snapshot(&self) {
743 self.snapshot_mode
744 .store(true, std::sync::atomic::Ordering::Release);
745 }
746
747 fn end_snapshot(&self) {
752 let mut removed_modified = Vec::new();
755 self.modified.retain(|key, inner| {
756 if matches!(inner, ModifiedState::Modified) {
757 removed_modified.push(*key);
758 false
759 } else {
760 true
761 }
762 });
763
764 for key in removed_modified {
766 if let Some(mut inner) = self.map.get_mut(&key) {
767 let state = inner.state_mut();
768 state.set_data_modified(false);
769 state.set_meta_modified(false);
770 }
771 }
772
773 self.snapshot_mode
776 .store(false, std::sync::atomic::Ordering::Release);
777
778 let mut removed_snapshots = Vec::new();
780 for mut item in self.modified.iter_mut() {
781 match item.value() {
782 ModifiedState::Snapshot(_) => {
783 removed_snapshots.push(*item.key());
784 *item.value_mut() = ModifiedState::Modified;
785 }
786 ModifiedState::Modified => {
787 }
790 }
791 }
792
793 for key in removed_snapshots {
795 if let Some(mut inner) = self.map.get_mut(&key) {
796 let state = inner.state_mut();
797 if state.meta_snapshot() {
798 state.set_meta_snapshot(false);
799 state.set_meta_modified(true);
800 }
801 if state.data_snapshot() {
802 state.set_data_snapshot(false);
803 state.set_data_modified(true);
804 }
805 }
806 }
807
808 self.modified.shrink_to_fit();
810 }
811
812 fn snapshot_mode(&self) -> bool {
813 self.snapshot_mode
814 .load(std::sync::atomic::Ordering::Acquire)
815 }
816
817 pub fn access_mut(&self, key: TaskId) -> StorageWriteGuard<'_> {
818 let inner = match self.map.entry(key) {
819 dashmap::mapref::entry::Entry::Occupied(e) => e.into_ref(),
820 dashmap::mapref::entry::Entry::Vacant(e) => e.insert(Box::new(InnerStorage::new())),
821 };
822 StorageWriteGuard {
823 storage: self,
824 inner: inner.into(),
825 }
826 }
827
828 pub fn access_pair_mut(
829 &self,
830 key1: TaskId,
831 key2: TaskId,
832 ) -> (StorageWriteGuard<'_>, StorageWriteGuard<'_>) {
833 let (a, b) = get_multiple_mut(&self.map, key1, key2, || Box::new(InnerStorage::new()));
834 (
835 StorageWriteGuard {
836 storage: self,
837 inner: a,
838 },
839 StorageWriteGuard {
840 storage: self,
841 inner: b,
842 },
843 )
844 }
845
846 pub fn drop_contents(&self) {
847 drop_contents(&self.map);
848 drop_contents(&self.modified);
849 }
850}
851
852pub struct StorageWriteGuard<'a> {
853 storage: &'a Storage,
854 inner: RefMut<'a, TaskId, Box<InnerStorage>>,
855}
856
857impl StorageWriteGuard<'_> {
858 pub fn track_modification(&mut self, category: SpecificTaskDataCategory) {
860 let state = self.inner.state();
861 let snapshot = match category {
862 SpecificTaskDataCategory::Meta => state.meta_snapshot(),
863 SpecificTaskDataCategory::Data => state.data_snapshot(),
864 };
865 if !snapshot {
866 let modified = match category {
867 SpecificTaskDataCategory::Meta => state.meta_modified(),
868 SpecificTaskDataCategory::Data => state.data_modified(),
869 };
870 match (self.storage.snapshot_mode(), modified) {
871 (false, false) => {
872 if !state.any_snapshot() && !state.any_modified() {
874 self.storage
875 .modified
876 .insert(*self.inner.key(), ModifiedState::Modified);
877 }
878 let state = self.inner.state_mut();
879 match category {
880 SpecificTaskDataCategory::Meta => state.set_meta_modified(true),
881 SpecificTaskDataCategory::Data => state.set_data_modified(true),
882 }
883 }
884 (false, true) => {
885 }
888 (true, false) => {
889 if !state.any_snapshot() {
891 self.storage
892 .modified
893 .insert(*self.inner.key(), ModifiedState::Snapshot(None));
894 }
895 let state = self.inner.state_mut();
896 match category {
897 SpecificTaskDataCategory::Meta => state.set_meta_snapshot(true),
898 SpecificTaskDataCategory::Data => state.set_data_snapshot(true),
899 }
900 }
901 (true, true) => {
902 if !state.any_snapshot() {
905 self.storage.modified.insert(
906 *self.inner.key(),
907 ModifiedState::Snapshot(Some(Box::new((&**self.inner).into()))),
908 );
909 }
910 let state = self.inner.state_mut();
911 match category {
912 SpecificTaskDataCategory::Meta => state.set_meta_snapshot(true),
913 SpecificTaskDataCategory::Data => state.set_data_snapshot(true),
914 }
915 }
916 }
917 }
918 }
919}
920
921impl Deref for StorageWriteGuard<'_> {
922 type Target = InnerStorage;
923
924 fn deref(&self) -> &Self::Target {
925 &self.inner
926 }
927}
928
929impl DerefMut for StorageWriteGuard<'_> {
930 fn deref_mut(&mut self) -> &mut Self::Target {
931 &mut self.inner
932 }
933}
934
935macro_rules! count {
936 ($task:ident, $key:ident) => {{ $task.count($crate::data::CachedDataItemType::$key) }};
937}
938
939macro_rules! get {
940 ($task:ident, $key:ident $input:tt) => {{
941 #[allow(unused_imports)]
942 use $crate::backend::operation::TaskGuard;
943 if let Some($crate::data::CachedDataItemValueRef::$key {
944 value,
945 }) = $task.get(&$crate::data::CachedDataItemKey::$key $input) {
946 Some(value)
947 } else {
948 None
949 }
950 }};
951 ($task:ident, $key:ident) => {
952 $crate::backend::storage::get!($task, $key {})
953 };
954}
955
956macro_rules! get_mut {
957 ($task:ident, $key:ident $input:tt) => {{
958 #[allow(unused_imports)]
959 use $crate::backend::operation::TaskGuard;
960 if let Some($crate::data::CachedDataItemValueRefMut::$key {
961 value,
962 }) = $task.get_mut(&$crate::data::CachedDataItemKey::$key $input) {
963 let () = $crate::data::allow_mut_access::$key;
964 Some(value)
965 } else {
966 None
967 }
968 }};
969 ($task:ident, $key:ident) => {
970 $crate::backend::storage::get_mut!($task, $key {})
971 };
972}
973
974macro_rules! get_mut_or_insert_with {
975 ($task:ident, $key:ident $input:tt, $f:expr) => {{
976 #[allow(unused_imports)]
977 use $crate::backend::operation::TaskGuard;
978 let () = $crate::data::allow_mut_access::$key;
979 let functor = $f;
980 let $crate::data::CachedDataItemValueRefMut::$key {
981 value,
982 } = $task.get_mut_or_insert_with($crate::data::CachedDataItemKey::$key $input, move || $crate::data::CachedDataItemValue::$key { value: functor() }) else {
983 unreachable!()
984 };
985 value
986 }};
987 ($task:ident, $key:ident, $f:expr) => {
988 $crate::backend::storage::get_mut_or_insert_with!($task, $key {}, $f)
989 };
990}
991
992macro_rules! iter_many {
998 ($task:ident, $key:ident $key_pattern:tt $(if $cond:expr)? => $iter_item:expr) => {{
999 #[allow(unused_imports)]
1000 use $crate::backend::operation::TaskGuard;
1001 $task
1002 .iter($crate::data::CachedDataItemType::$key)
1003 .filter_map(|(key, _)| match key {
1004 $crate::data::CachedDataItemKey::$key $key_pattern $(if $cond)? => Some(
1005 $iter_item
1006 ),
1007 _ => None,
1008 })
1009 }};
1010 ($task:ident, $key:ident $input:tt $value_pattern:tt $(if $cond:expr)? => $iter_item:expr) => {{
1011 #[allow(unused_imports)]
1012 use $crate::backend::operation::TaskGuard;
1013 $task
1014 .iter($crate::data::CachedDataItemType::$key)
1015 .filter_map(|(key, value)| match (key, value) {
1016 (
1017 $crate::data::CachedDataItemKey::$key $input,
1018 $crate::data::CachedDataItemValueRef::$key { value: $value_pattern }
1019 ) $(if $cond)? => Some($iter_item),
1020 _ => None,
1021 })
1022 }};
1023}
1024
1025macro_rules! get_many {
1030 ($($args:tt)*) => {
1031 $crate::backend::storage::iter_many!($($args)*).collect()
1032 };
1033}
1034
1035macro_rules! update {
1036 ($task:ident, $key:ident $input:tt, $update:expr) => {{
1037 #[allow(unused_imports)]
1038 use $crate::backend::operation::TaskGuard;
1039 #[allow(unused_mut)]
1040 let mut update = $update;
1041 $task.update($crate::data::CachedDataItemKey::$key $input, |old| {
1042 update(old.and_then(|old| {
1043 if let $crate::data::CachedDataItemValue::$key { value } = old {
1044 Some(value)
1045 } else {
1046 None
1047 }
1048 }))
1049 .map(|new| $crate::data::CachedDataItemValue::$key { value: new })
1050 })
1051 }};
1052 ($task:ident, $key:ident, $update:expr) => {
1053 $crate::backend::storage::update!($task, $key {}, $update)
1054 };
1055}
1056
1057macro_rules! update_count {
1058 ($task:ident, $key:ident $input:tt, -$update:expr) => {{
1059 let update = $update;
1060 let mut state_change = false;
1061 $crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
1062 #[allow(unused_comparisons, reason = "type of update might be unsigned, where update < 0 is always false")]
1063 if let Some(old) = old {
1064 let new = old - update;
1065 state_change = old <= 0 && new > 0 || old > 0 && new <= 0;
1066 (new != 0).then_some(new)
1067 } else {
1068 state_change = update < 0;
1069 (update != 0).then_some(-update)
1070 }
1071 });
1072 state_change
1073 }};
1074 ($task:ident, $key:ident $input:tt, $update:expr) => {
1075 match $update {
1076 update => {
1077 let mut state_change = false;
1078 $crate::backend::storage::update!($task, $key $input, |old: Option<_>| {
1079 if let Some(old) = old {
1080 let new = old + update;
1081 state_change = old <= 0 && new > 0 || old > 0 && new <= 0;
1082 (new != 0).then_some(new)
1083 } else {
1084 state_change = update > 0;
1085 (update != 0).then_some(update)
1086 }
1087 });
1088 state_change
1089 }
1090 }
1091 };
1092 ($task:ident, $key:ident, -$update:expr) => {
1093 $crate::backend::storage::update_count!($task, $key {}, -$update)
1094 }; ($task:ident, $key:ident, $update:expr) => {
1095 $crate::backend::storage::update_count!($task, $key {}, $update)
1096 };
1097}
1098
1099macro_rules! remove {
1100 ($task:ident, $key:ident $input:tt) => {{
1101 #[allow(unused_imports)]
1102 use $crate::backend::operation::TaskGuard;
1103 if let Some($crate::data::CachedDataItemValue::$key { value }) = $task.remove(
1104 &$crate::data::CachedDataItemKey::$key $input
1105 ) {
1106 Some(value)
1107 } else {
1108 None
1109 }
1110 }};
1111 ($task:ident, $key:ident) => {
1112 $crate::backend::storage::remove!($task, $key {})
1113 };
1114}
1115
1116pub(crate) use count;
1117pub(crate) use get;
1118pub(crate) use get_many;
1119pub(crate) use get_mut;
1120pub(crate) use get_mut_or_insert_with;
1121pub(crate) use iter_many;
1122pub(crate) use remove;
1123pub(crate) use update;
1124pub(crate) use update_count;
1125
1126pub struct SnapshotGuard<'l> {
1127 storage: &'l Storage,
1128}
1129
1130impl Drop for SnapshotGuard<'_> {
1131 fn drop(&mut self) {
1132 self.storage.end_snapshot();
1133 }
1134}
1135
1136pub struct SnapshotShard<'l, PP, P, PS> {
1137 direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)>,
1138 modified: SmallVec<[TaskId; 4]>,
1139 storage: &'l Storage,
1140 guard: Option<Arc<SnapshotGuard<'l>>>,
1141 process: &'l P,
1142 preprocess: &'l PP,
1143 process_snapshot: &'l PS,
1144}
1145
1146impl<'l, T, R, PP, P, PS> Iterator for SnapshotShard<'l, PP, P, PS>
1147where
1148 PP: for<'a> Fn(TaskId, &'a InnerStorage) -> T + Sync,
1149 P: Fn(TaskId, T) -> R + Sync,
1150 PS: Fn(TaskId, Box<InnerStorageSnapshot>) -> R + Sync,
1151{
1152 type Item = R;
1153
1154 fn next(&mut self) -> Option<Self::Item> {
1155 if let Some((task_id, snapshot)) = self.direct_snapshots.pop() {
1156 return Some((self.process_snapshot)(task_id, snapshot));
1157 }
1158 while let Some(task_id) = self.modified.pop() {
1159 let inner = self.storage.map.get(&task_id).unwrap();
1160 let state = inner.state();
1161 if !state.any_snapshot() {
1162 let preprocessed = (self.preprocess)(task_id, &inner);
1163 drop(inner);
1164 return Some((self.process)(task_id, preprocessed));
1165 } else {
1166 drop(inner);
1167 let maybe_snapshot = {
1168 let mut modified_state = self.storage.modified.get_mut(&task_id).unwrap();
1169 let ModifiedState::Snapshot(snapshot) = &mut *modified_state else {
1170 unreachable!("The snapshot bit was set, so it must be in Snapshot state");
1171 };
1172 snapshot.take()
1173 };
1174 if let Some(snapshot) = maybe_snapshot {
1175 return Some((self.process_snapshot)(task_id, snapshot));
1176 }
1177 }
1178 }
1179 self.guard = None;
1180 None
1181 }
1182}