1use std::ops::RangeInclusive;
2
3use rustc_hash::FxHashMap;
4use smallvec::{SmallVec, smallvec};
5
6use crate::compaction::interval_map::IntervalMap;
7
8pub trait Compactable {
11 fn range(&self) -> RangeInclusive<u64>;
13
14 fn size(&self) -> u64;
16
17 fn category(&self) -> u8 {
20 0
21 }
22}
23
24fn is_overlapping(a: &RangeInclusive<u64>, b: &RangeInclusive<u64>) -> bool {
25 a.start() <= b.end() && b.start() <= a.end()
26}
27
28fn spread(range: &RangeInclusive<u64>) -> u128 {
29 u128::from(range.end() - range.start()) + 1
31}
32
33fn extend_range(a: &mut RangeInclusive<u64>, b: &RangeInclusive<u64>) -> bool {
35 let mut extended = false;
36 if b.start() < a.start() {
37 *a = (*b.start())..=(*a.end());
38 extended = true;
39 }
40 if b.end() > a.end() {
41 *a = (*a.start())..=(*b.end());
42 extended = true;
43 }
44 extended
45}
46
47#[cfg(test)]
48#[derive(Debug)]
49pub struct CompactableMetrics {
50 pub coverage: f32,
52
53 pub overlap: f32,
55
56 pub duplicated_size: u64,
58
59 pub duplication: f32,
61}
62
63#[cfg(test)]
65pub fn compute_metrics<T: Compactable>(
66 compactables: &[T],
67 full_range: RangeInclusive<u64>,
68) -> CompactableMetrics {
69 let mut interval_map = IntervalMap::<Option<(DuplicationInfo, usize)>>::new();
70 let mut coverage = 0.0f32;
71 for c in compactables {
72 let range = c.range();
73 coverage += spread(&range) as f32;
74 interval_map.update(range.clone(), |value| {
75 let (dup_info, count) = value.get_or_insert_default();
76 dup_info.add(c.size(), &range);
77 *count += 1;
78 });
79 }
80 let full_spread = spread(&full_range) as f32;
81
82 let (duplicated_size, duplication, overlap) = interval_map
83 .iter()
84 .flat_map(|(range, value)| Some((range, value.as_ref()?)))
85 .map(|(range, (dup_info, count))| {
86 let duplicated_size = dup_info.duplication(&range);
87 let total_size = dup_info.size(&range);
88 let overlap = spread(&range) as f32 * count.saturating_sub(1) as f32;
89 (duplicated_size, total_size, overlap)
90 })
91 .reduce(|(dup1, total1, overlap1), (dup2, total2, overlap2)| {
92 (dup1 + dup2, total1 + total2, overlap1 + overlap2)
93 })
94 .map(|(duplicated_size, total_size, overlap)| {
95 (
96 duplicated_size,
97 if total_size > 0 {
98 duplicated_size as f32 / total_size as f32
99 } else {
100 0.0
101 },
102 overlap,
103 )
104 })
105 .unwrap_or((0, 0.0, 0.0));
106
107 CompactableMetrics {
108 coverage: coverage / full_spread,
109 overlap: overlap / full_spread,
110 duplicated_size,
111 duplication,
112 }
113}
114
115#[derive(Clone)]
117pub struct CompactConfig {
118 pub min_merge_count: usize,
120
121 pub optimal_merge_count: usize,
123
124 pub max_merge_count: usize,
126
127 pub max_merge_bytes: u64,
129
130 pub min_merge_duplication_bytes: u64,
132
133 pub optimal_merge_duplication_bytes: u64,
135
136 pub max_merge_segment_count: usize,
138}
139
140impl Default for CompactConfig {
141 fn default() -> Self {
142 const MB: u64 = 1024 * 1024;
143 Self {
144 min_merge_count: 2,
145 optimal_merge_count: 8,
146 max_merge_count: 32,
147 max_merge_bytes: 500 * MB,
148 min_merge_duplication_bytes: 50 * MB,
149 optimal_merge_duplication_bytes: 100 * MB,
150 max_merge_segment_count: 8,
151 }
152 }
153}
154
155#[derive(Clone, Default, Eq, PartialEq)]
156struct DuplicationInfo {
157 total_size: u64,
159 max_size: u64,
161}
162
163impl DuplicationInfo {
164 fn duplication(&self, range: &RangeInclusive<u64>) -> u64 {
167 if self.max_size == self.total_size {
168 return 0;
169 }
170 u64::try_from(
172 u128::from(self.total_size - self.max_size) * spread(range)
173 / (u128::from(u64::MAX) + 1),
174 )
175 .expect("should not overflow, denominator was `u64::MAX+1`")
176 }
177
178 #[cfg(test)]
180 fn size(&self, range: &RangeInclusive<u64>) -> u64 {
181 if self.total_size == 0 {
182 return 0;
183 }
184 u64::try_from(u128::from(self.total_size) * spread(range) / (u128::from(u64::MAX) + 1))
186 .expect("should not overflow, denominator was `u64::MAX+1`")
187 }
188
189 fn add(&mut self, size: u64, range: &RangeInclusive<u64>) {
190 let scaled_size =
196 u64::try_from(u128::from(size) * (u128::from(u64::MAX) + 1) / spread(range))
197 .unwrap_or(u64::MAX);
198 self.total_size = self.total_size.saturating_add(scaled_size);
199 self.max_size = self.max_size.max(scaled_size);
200 }
201}
202
203fn total_duplication_size(duplication: &IntervalMap<FxHashMap<u8, DuplicationInfo>>) -> u64 {
204 duplication
205 .iter()
206 .map(|(range, info)| {
207 info.values()
208 .map(|info| info.duplication(&range))
209 .sum::<u64>()
210 })
211 .sum()
212}
213
214type MergeSegments = Vec<SmallVec<[usize; 1]>>;
215
216pub fn get_merge_segments<T: Compactable>(
220 compactables: &[T],
221 config: &CompactConfig,
222) -> (MergeSegments, usize) {
223 let mut unused_compactables = compactables.iter().collect::<Vec<_>>();
236 let mut used_compactables = vec![false; compactables.len()];
237
238 let mut merge_segments: MergeSegments = Vec::new();
239 let mut real_merge_segments = 0;
240
241 'outer: while let Some(start_compactable) = unused_compactables.pop() {
244 let start_index = unused_compactables.len();
245 if used_compactables[start_index] {
246 continue;
247 }
248 if real_merge_segments >= config.max_merge_segment_count {
249 break;
251 }
252 let start_compactable_range = start_compactable.range();
253 let start_compactable_size = start_compactable.size();
254 let start_compactable_category = start_compactable.category();
255 let mut current_range = start_compactable_range.clone();
256
257 'search: loop {
259 let mut current_set = smallvec![start_index];
260 let mut current_size = start_compactable_size;
261 let mut duplication = IntervalMap::<FxHashMap<u8, DuplicationInfo>>::new();
262 duplication.update(start_compactable_range.clone(), |dup_info| {
263 dup_info
264 .entry(start_compactable_category)
265 .or_default()
266 .add(start_compactable_size, &start_compactable_range);
267 });
268 let mut current_skip = 0;
269
270 loop {
273 let duplication_size = total_duplication_size(&duplication);
275 let optimal_merge_job = current_set.len() >= config.optimal_merge_count
276 && duplication_size >= config.optimal_merge_duplication_bytes;
277 if optimal_merge_job {
278 for &i in current_set.iter() {
279 used_compactables[i] = true;
280 }
281 current_set.reverse();
282 merge_segments.push(current_set);
283 real_merge_segments += 1;
284 continue 'outer;
285 }
286
287 let valid_merge_job = current_set.len() >= config.min_merge_count
290 && duplication_size >= config.min_merge_duplication_bytes;
291 let mut end_job =
292 |mut current_set: SmallVec<[usize; 1]>, used_compactables: &mut Vec<bool>| {
293 if valid_merge_job {
294 for &i in current_set.iter() {
295 used_compactables[i] = true;
296 }
297 current_set.reverse();
298 merge_segments.push(current_set);
299 real_merge_segments += 1;
300 } else {
301 merge_segments.push(smallvec![start_index]);
302 }
303 };
304
305 if current_set.len() >= config.max_merge_count
307 || current_size >= config.max_merge_bytes
308 {
309 end_job(current_set, &mut used_compactables);
311 continue 'outer;
312 }
313
314 let Some((next_index, compactable)) = unused_compactables
316 .iter()
317 .enumerate()
318 .rev()
319 .skip(current_skip)
320 .find(|(i, compactable)| {
321 if used_compactables[*i] {
322 return false;
323 }
324 let range = compactable.range();
325 is_overlapping(¤t_range, &range)
326 })
327 else {
328 end_job(current_set, &mut used_compactables);
330 continue 'outer;
331 };
332 current_skip = unused_compactables.len() - next_index;
333
334 let size = compactable.size();
336 if current_size + size > config.max_merge_bytes {
337 end_job(current_set, &mut used_compactables);
339 continue 'outer;
340 }
341
342 let range = compactable.range();
346 if extend_range(&mut current_range, &range) {
347 continue 'search;
349 }
350
351 current_set.push(next_index);
354 current_size += size;
355 let category = compactable.category();
356 duplication.update(range.clone(), |dup_info| {
357 dup_info.entry(category).or_default().add(size, &range);
358 });
359 }
360 }
361 }
362
363 while merge_segments.last().is_some_and(|s| s.len() == 1) {
364 merge_segments.pop();
366 }
367
368 merge_segments.reverse();
370
371 let mut used_ranges = IntervalMap::<bool>::new();
374 merge_segments.retain(|segment| {
375 if segment.len() == 1 {
377 let range = compactables[segment[0]].range();
378 if !used_ranges.iter_intersecting(range).any(|(_, v)| *v) {
379 return false;
380 }
381 }
382 for i in segment {
384 let range = compactables[*i].range();
385 used_ranges.replace(range, true);
386 }
387 true
388 });
389
390 (merge_segments, real_merge_segments)
391}
392
393#[cfg(test)]
394mod tests {
395 use std::{
396 fmt::Debug,
397 mem::{replace, swap},
398 };
399
400 use rand::{Rng, SeedableRng, seq::SliceRandom};
401
402 use super::*;
403
404 struct TestCompactable {
405 range: RangeInclusive<u64>,
406 size: u64,
407 }
408
409 impl Compactable for TestCompactable {
410 fn range(&self) -> RangeInclusive<u64> {
411 self.range.clone()
412 }
413
414 fn size(&self) -> u64 {
415 self.size
416 }
417 }
418
419 fn compact<const N: usize>(
420 ranges: [RangeInclusive<u64>; N],
421 config: &CompactConfig,
422 ) -> Vec<Vec<usize>> {
423 let compactables = ranges
424 .into_iter()
425 .map(|range| TestCompactable { range, size: 100 })
426 .collect::<Vec<_>>();
427 let (jobs, _) = get_merge_segments(&compactables, config);
428 jobs.into_iter()
429 .map(|job| job.into_iter().collect())
430 .collect()
431 }
432
433 #[test]
434 fn test_compaction_jobs_by_count() {
435 let merge_jobs = compact(
436 [
437 0..=10,
438 10..=30,
439 9..=13,
440 0..=30,
441 40..=44,
442 41..=42,
443 41..=47,
444 90..=100,
445 30..=40,
446 ],
447 &CompactConfig {
448 min_merge_count: 2,
449 optimal_merge_count: 3,
450 max_merge_count: 4,
451 max_merge_bytes: u64::MAX,
452 min_merge_duplication_bytes: 0,
453 optimal_merge_duplication_bytes: 0,
454 max_merge_segment_count: usize::MAX,
455 },
456 );
457 assert_eq!(merge_jobs, vec![vec![1, 2, 3], vec![5, 6, 8]]);
458 }
459
460 #[test]
461 fn test_compaction_jobs_by_size() {
462 let merge_jobs = compact(
463 [
464 0..=10,
465 10..=30,
466 9..=13,
467 0..=30,
468 40..=44,
469 41..=42,
470 41..=47,
471 90..=100,
472 30..=40,
473 ],
474 &CompactConfig {
475 min_merge_count: 2,
476 optimal_merge_count: 2,
477 max_merge_count: usize::MAX,
478 max_merge_bytes: 300,
479 min_merge_duplication_bytes: 0,
480 optimal_merge_duplication_bytes: u64::MAX,
481 max_merge_segment_count: usize::MAX,
482 },
483 );
484 assert_eq!(merge_jobs, vec![vec![1, 2, 3], vec![5, 6, 8]]);
485 }
486
487 #[test]
488 fn test_compaction_jobs_full() {
489 let merge_jobs = compact(
490 [
491 0..=10,
492 10..=30,
493 9..=13,
494 0..=30,
495 40..=44,
496 41..=42,
497 41..=47,
498 90..=100,
499 30..=40,
500 ],
501 &CompactConfig {
502 min_merge_count: 2,
503 optimal_merge_count: usize::MAX,
504 max_merge_count: usize::MAX,
505 max_merge_bytes: u64::MAX,
506 min_merge_duplication_bytes: 0,
507 optimal_merge_duplication_bytes: u64::MAX,
508 max_merge_segment_count: usize::MAX,
509 },
510 );
511 assert_eq!(merge_jobs, vec![vec![0, 1, 2, 3, 4, 5, 6, 8]]);
512 }
513
514 #[test]
515 fn test_compaction_jobs_big() {
516 let merge_jobs = compact(
517 [
518 0..=10,
519 10..=30,
520 9..=13,
521 0..=30,
522 40..=44,
523 41..=42,
524 41..=47,
525 90..=100,
526 30..=40,
527 ],
528 &CompactConfig {
529 min_merge_count: 2,
530 optimal_merge_count: 7,
531 max_merge_count: usize::MAX,
532 max_merge_bytes: u64::MAX,
533 min_merge_duplication_bytes: 0,
534 optimal_merge_duplication_bytes: 0,
535 max_merge_segment_count: usize::MAX,
536 },
537 );
538 assert_eq!(merge_jobs, vec![vec![1, 2, 3, 4, 5, 6, 8]]);
539 }
540
541 #[test]
542 fn test_compaction_jobs_small() {
543 let merge_jobs = compact(
544 [
545 0..=10,
546 10..=30,
547 9..=13,
548 0..=30,
549 40..=44,
550 41..=42,
551 41..=47,
552 90..=100,
553 30..=40,
554 ],
555 &CompactConfig {
556 min_merge_count: 2,
557 optimal_merge_count: 2,
558 max_merge_count: usize::MAX,
559 max_merge_bytes: u64::MAX,
560 min_merge_duplication_bytes: 0,
561 optimal_merge_duplication_bytes: 0,
562 max_merge_segment_count: usize::MAX,
563 },
564 );
565 assert_eq!(
566 merge_jobs,
567 vec![vec![0, 1], vec![2, 3], vec![4, 5], vec![6, 8]]
568 );
569 }
570
571 pub fn debug_print_compactables<T: Compactable>(compactables: &[T], max_key: u64) {
572 const WIDTH: usize = 128;
573 let char_width: u64 = max_key / WIDTH as u64;
574 for (i, c) in compactables.iter().enumerate() {
575 let range = c.range();
576 let size = c.size();
577 let start = usize::try_from(range.start() / char_width).unwrap();
578 let end = usize::try_from(range.end() / char_width).unwrap();
579 let mut line = format!("{i:>3} | ");
580 for j in 0..WIDTH {
581 if j >= start && j <= end {
582 line.push('█');
583 } else {
584 line.push(' ');
585 }
586 }
587 println!("{line} | {size:>6}");
588 }
589 }
590
591 #[test]
592 fn simulate_compactions() {
593 const KEY_RANGE: u64 = 10000;
594 const WARM_KEY_COUNT: usize = 100;
595 const INITIAL_CHUNK_SIZE: usize = 100;
596 const ITERATIONS: usize = 100;
597
598 let mut rnd = rand::rngs::SmallRng::from_seed([0; 32]);
599 let mut keys = (0..KEY_RANGE).collect::<Vec<_>>();
600 keys.shuffle(&mut rnd);
601
602 let mut batch_index = 0;
603 let mut containers = keys
604 .chunks(INITIAL_CHUNK_SIZE)
605 .map(|keys| Container::new(batch_index, keys.to_vec()))
606 .collect::<Vec<_>>();
607
608 let mut warm_keys = (0..WARM_KEY_COUNT)
609 .map(|_| {
610 let i = rnd.random_range(0..keys.len());
611 keys.swap_remove(i)
612 })
613 .collect::<Vec<_>>();
614
615 let mut number_of_compactions = 0;
616
617 for _ in 0..ITERATIONS {
618 let total_size = containers.iter().map(|c| c.keys.len()).sum::<usize>();
619 let metrics = compute_metrics(&containers, 0..=KEY_RANGE);
620 debug_print_compactables(&containers, KEY_RANGE);
621 println!(
622 "size: {}, coverage: {}, overlap: {}, duplication: {}, items: {}",
623 total_size,
624 metrics.coverage,
625 metrics.overlap,
626 metrics.duplication,
627 containers.len()
628 );
629
630 assert!(containers.len() < 400);
631 let config = CompactConfig {
634 max_merge_count: 16,
635 min_merge_count: 2,
636 optimal_merge_count: 4,
637 max_merge_bytes: 5000,
638 min_merge_duplication_bytes: 500,
639 optimal_merge_duplication_bytes: 1000,
640 max_merge_segment_count: 4,
641 };
642 let (jobs, _) = get_merge_segments(&containers, &config);
643 if !jobs.is_empty() {
644 println!("{jobs:?}");
645
646 batch_index += 1;
647 do_compact(&mut containers, jobs, batch_index);
648 number_of_compactions += 1;
649
650 let new_metrics = compute_metrics(&containers, 0..=KEY_RANGE);
651 println!(
652 "Compaction done: coverage: {} ({}), overlap: {} ({}), duplication: {} ({}), \
653 duplicated_size: {} ({})",
654 new_metrics.coverage,
655 new_metrics.coverage - metrics.coverage,
656 new_metrics.overlap,
657 new_metrics.overlap - metrics.overlap,
658 new_metrics.duplication,
659 new_metrics.duplication - metrics.duplication,
660 new_metrics.duplicated_size,
661 (new_metrics.duplicated_size as f32) - metrics.duplicated_size as f32,
662 );
663 } else {
664 println!("No compaction needed");
665 }
666
667 batch_index += 1;
669 let pieces = rnd.random_range(1..4);
670 for chunk in warm_keys.chunks(warm_keys.len().div_ceil(pieces)) {
671 containers.push(Container::new(batch_index, chunk.to_vec()));
672 }
673
674 let changes = rnd.random_range(0..100);
676 for _ in 0..changes {
677 let i = rnd.random_range(0..warm_keys.len());
678 let j = rnd.random_range(0..keys.len());
679 swap(&mut warm_keys[i], &mut keys[j]);
680 }
681 }
682 println!("Number of compactions: {number_of_compactions}");
683
684 let metrics = compute_metrics(&containers, 0..=KEY_RANGE);
685 assert!(number_of_compactions < 30);
686 assert!(containers.len() < 30);
687 assert!(metrics.duplication < 0.5);
688 }
689
690 struct Container {
691 batch_index: usize,
692 keys: Vec<u64>,
693 }
694
695 impl Container {
696 fn new(batch_index: usize, mut keys: Vec<u64>) -> Self {
697 keys.sort_unstable();
698 Self { batch_index, keys }
699 }
700 }
701
702 impl Compactable for Container {
703 fn range(&self) -> RangeInclusive<u64> {
704 (self.keys[0])..=(*self.keys.last().unwrap())
705 }
706
707 fn size(&self) -> u64 {
708 self.keys.len() as u64
709 }
710 }
711
712 impl Debug for Container {
713 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
714 let (l, r) = self.range().into_inner();
715 write!(
716 f,
717 "#{} {}b {l} - {r} ({})",
718 self.batch_index,
719 self.keys.len(),
720 r - l
721 )
722 }
723 }
724
725 fn do_compact(containers: &mut Vec<Container>, segments: MergeSegments, batch_index: usize) {
726 let total_size = containers.iter().map(|c| c.keys.len()).sum::<usize>();
727 for merge_job in segments {
728 if merge_job.len() < 2 {
729 let container = replace(
730 &mut containers[merge_job[0]],
731 Container {
732 batch_index: 0,
733 keys: Default::default(),
734 },
735 );
736 containers.push(container);
737 } else {
738 let mut keys = Vec::new();
739 for i in merge_job {
740 keys.append(&mut containers[i].keys);
741 }
742 keys.sort_unstable();
743 keys.dedup();
744 containers.extend(keys.chunks(1000).map(|keys| Container {
745 batch_index,
746 keys: keys.to_vec(),
747 }));
748 }
749 }
750
751 containers.retain(|c| !c.keys.is_empty());
752 let total_size2 = containers.iter().map(|c| c.keys.len()).sum::<usize>();
753 println!("Compaction done: {total_size} -> {total_size2}",);
754 }
755}