1use std::ops::RangeInclusive;
2
3use smallvec::{SmallVec, smallvec};
4
5use crate::compaction::interval_map::IntervalMap;
6
7pub trait Compactable {
10 fn range(&self) -> RangeInclusive<u64>;
12
13 fn size(&self) -> u64;
15}
16
17fn is_overlapping(a: &RangeInclusive<u64>, b: &RangeInclusive<u64>) -> bool {
18 a.start() <= b.end() && b.start() <= a.end()
19}
20
21fn spread(range: &RangeInclusive<u64>) -> u128 {
22 u128::from(range.end() - range.start()) + 1
24}
25
26fn extend_range(a: &mut RangeInclusive<u64>, b: &RangeInclusive<u64>) -> bool {
28 let mut extended = false;
29 if b.start() < a.start() {
30 *a = (*b.start())..=(*a.end());
31 extended = true;
32 }
33 if b.end() > a.end() {
34 *a = (*a.start())..=(*b.end());
35 extended = true;
36 }
37 extended
38}
39
40#[derive(Debug)]
41pub struct CompactableMetrics {
42 pub coverage: f32,
44
45 pub overlap: f32,
47
48 pub duplicated_size: u64,
50
51 pub duplication: f32,
53}
54
55pub fn compute_metrics<T: Compactable>(
57 compactables: &[T],
58 full_range: RangeInclusive<u64>,
59) -> CompactableMetrics {
60 let mut interval_map = IntervalMap::<Option<(DuplicationInfo, usize)>>::new();
61 let mut coverage = 0.0f32;
62 for c in compactables {
63 let range = c.range();
64 coverage += spread(&range) as f32;
65 interval_map.update(range.clone(), |value| {
66 let (dup_info, count) = value.get_or_insert_default();
67 dup_info.add(c.size(), &range);
68 *count += 1;
69 });
70 }
71 let full_spread = spread(&full_range) as f32;
72
73 let (duplicated_size, duplication, overlap) = interval_map
74 .iter()
75 .flat_map(|(range, value)| Some((range, value.as_ref()?)))
76 .map(|(range, (dup_info, count))| {
77 let duplicated_size = dup_info.duplication(&range);
78 let total_size = dup_info.size(&range);
79 let overlap = spread(&range) as f32 * count.saturating_sub(1) as f32;
80 (duplicated_size, total_size, overlap)
81 })
82 .reduce(|(dup1, total1, overlap1), (dup2, total2, overlap2)| {
83 (dup1 + dup2, total1 + total2, overlap1 + overlap2)
84 })
85 .map(|(duplicated_size, total_size, overlap)| {
86 (
87 duplicated_size,
88 if total_size > 0 {
89 duplicated_size as f32 / total_size as f32
90 } else {
91 0.0
92 },
93 overlap,
94 )
95 })
96 .unwrap_or((0, 0.0, 0.0));
97
98 CompactableMetrics {
99 coverage: coverage / full_spread,
100 overlap: overlap / full_spread,
101 duplicated_size,
102 duplication,
103 }
104}
105
106#[derive(Clone)]
108pub struct CompactConfig {
109 pub min_merge_count: usize,
111
112 pub optimal_merge_count: usize,
114
115 pub max_merge_count: usize,
117
118 pub max_merge_bytes: u64,
120
121 pub min_merge_duplication_bytes: u64,
123
124 pub optimal_merge_duplication_bytes: u64,
126
127 pub max_merge_segment_count: usize,
129}
130
131impl Default for CompactConfig {
132 fn default() -> Self {
133 const MB: u64 = 1024 * 1024;
134 Self {
135 min_merge_count: 2,
136 optimal_merge_count: 8,
137 max_merge_count: 32,
138 max_merge_bytes: 500 * MB,
139 min_merge_duplication_bytes: 50 * MB,
140 optimal_merge_duplication_bytes: 100 * MB,
141 max_merge_segment_count: 8,
142 }
143 }
144}
145
146#[derive(Clone, Default, Eq, PartialEq)]
147struct DuplicationInfo {
148 total_size: u64,
150 max_size: u64,
152}
153
154impl DuplicationInfo {
155 fn duplication(&self, range: &RangeInclusive<u64>) -> u64 {
158 if self.total_size == 0 {
159 return 0;
160 }
161 u64::try_from(
163 u128::from(self.total_size - self.max_size) * spread(range)
164 / (u128::from(u64::MAX) + 1),
165 )
166 .expect("should not overflow, denominator was `u64::MAX+1`")
167 }
168
169 fn size(&self, range: &RangeInclusive<u64>) -> u64 {
171 if self.total_size == 0 {
172 return 0;
173 }
174 u64::try_from(u128::from(self.total_size) * spread(range) / (u128::from(u64::MAX) + 1))
176 .expect("should not overflow, denominator was `u64::MAX+1`")
177 }
178
179 fn add(&mut self, size: u64, range: &RangeInclusive<u64>) {
180 let scaled_size =
186 u64::try_from(u128::from(size) * (u128::from(u64::MAX) + 1) / spread(range))
187 .unwrap_or(u64::MAX);
188 self.total_size = self.total_size.saturating_add(scaled_size);
189 self.max_size = self.max_size.max(scaled_size);
190 }
191}
192
193fn total_duplication_size(duplication: &IntervalMap<Option<DuplicationInfo>>) -> u64 {
194 duplication
195 .iter()
196 .flat_map(|(range, info)| Some((range, info.as_ref()?)))
197 .map(|(range, info)| info.duplication(&range))
198 .sum()
199}
200
201type MergeSegments = Vec<SmallVec<[usize; 1]>>;
202
203pub fn get_merge_segments<T: Compactable>(
207 compactables: &[T],
208 config: &CompactConfig,
209) -> (MergeSegments, usize) {
210 let mut unused_compactables = compactables.iter().collect::<Vec<_>>();
223 let mut used_compactables = vec![false; compactables.len()];
224
225 let mut merge_segments: MergeSegments = Vec::new();
226 let mut real_merge_segments = 0;
227
228 'outer: while let Some(start_compactable) = unused_compactables.pop() {
231 let start_index = unused_compactables.len();
232 if used_compactables[start_index] {
233 continue;
234 }
235 if real_merge_segments >= config.max_merge_segment_count {
236 break;
238 }
239 let start_compactable_range = start_compactable.range();
240 let start_compactable_size = start_compactable.size();
241 let mut current_range = start_compactable_range.clone();
242
243 'search: loop {
245 let mut current_set = smallvec![start_index];
246 let mut current_size = start_compactable_size;
247 let mut duplication = IntervalMap::<Option<DuplicationInfo>>::new();
248 duplication.update(start_compactable_range.clone(), |dup_info| {
249 dup_info
250 .get_or_insert_default()
251 .add(start_compactable_size, &start_compactable_range);
252 });
253 let mut current_skip = 0;
254
255 loop {
258 let duplication_size = total_duplication_size(&duplication);
260 let optimal_merge_job = current_set.len() >= config.optimal_merge_count
261 && duplication_size >= config.optimal_merge_duplication_bytes;
262 if optimal_merge_job {
263 for &i in current_set.iter() {
264 used_compactables[i] = true;
265 }
266 current_set.reverse();
267 merge_segments.push(current_set);
268 real_merge_segments += 1;
269 continue 'outer;
270 }
271
272 let valid_merge_job = current_set.len() >= config.min_merge_count
275 && duplication_size >= config.min_merge_duplication_bytes;
276 let mut end_job =
277 |mut current_set: SmallVec<[usize; 1]>, used_compactables: &mut Vec<bool>| {
278 if valid_merge_job {
279 for &i in current_set.iter() {
280 used_compactables[i] = true;
281 }
282 current_set.reverse();
283 merge_segments.push(current_set);
284 real_merge_segments += 1;
285 } else {
286 merge_segments.push(smallvec![start_index]);
287 }
288 };
289
290 if current_set.len() >= config.max_merge_count
292 || current_size >= config.max_merge_bytes
293 {
294 end_job(current_set, &mut used_compactables);
296 continue 'outer;
297 }
298
299 let Some((next_index, compactable)) = unused_compactables
301 .iter()
302 .enumerate()
303 .rev()
304 .skip(current_skip)
305 .find(|(i, compactable)| {
306 if used_compactables[*i] {
307 return false;
308 }
309 let range = compactable.range();
310 is_overlapping(¤t_range, &range)
311 })
312 else {
313 end_job(current_set, &mut used_compactables);
315 continue 'outer;
316 };
317 current_skip = unused_compactables.len() - next_index;
318
319 let size = compactable.size();
321 if current_size + size > config.max_merge_bytes {
322 end_job(current_set, &mut used_compactables);
324 continue 'outer;
325 }
326
327 let range = compactable.range();
331 if extend_range(&mut current_range, &range) {
332 continue 'search;
334 }
335
336 current_set.push(next_index);
339 current_size += size;
340 duplication.update(range.clone(), |dup_info| {
341 dup_info.get_or_insert_default().add(size, &range);
342 });
343 }
344 }
345 }
346
347 while merge_segments.last().is_some_and(|s| s.len() == 1) {
348 merge_segments.pop();
350 }
351
352 merge_segments.reverse();
354
355 let mut used_ranges = IntervalMap::<bool>::new();
358 merge_segments.retain(|segment| {
359 if segment.len() == 1 {
361 let range = compactables[segment[0]].range();
362 if !used_ranges.iter_intersecting(range).any(|(_, v)| *v) {
363 return false;
364 }
365 }
366 for i in segment {
368 let range = compactables[*i].range();
369 used_ranges.replace(range, true);
370 }
371 true
372 });
373
374 (merge_segments, real_merge_segments)
375}
376
377#[cfg(test)]
378mod tests {
379 use std::{
380 fmt::Debug,
381 mem::{replace, swap},
382 };
383
384 use rand::{Rng, SeedableRng, seq::SliceRandom};
385
386 use super::*;
387
388 struct TestCompactable {
389 range: RangeInclusive<u64>,
390 size: u64,
391 }
392
393 impl Compactable for TestCompactable {
394 fn range(&self) -> RangeInclusive<u64> {
395 self.range.clone()
396 }
397
398 fn size(&self) -> u64 {
399 self.size
400 }
401 }
402
403 fn compact<const N: usize>(
404 ranges: [RangeInclusive<u64>; N],
405 config: &CompactConfig,
406 ) -> Vec<Vec<usize>> {
407 let compactables = ranges
408 .into_iter()
409 .map(|range| TestCompactable { range, size: 100 })
410 .collect::<Vec<_>>();
411 let (jobs, _) = get_merge_segments(&compactables, config);
412 jobs.into_iter()
413 .map(|job| job.into_iter().collect())
414 .collect()
415 }
416
417 #[test]
418 fn test_compaction_jobs_by_count() {
419 let merge_jobs = compact(
420 [
421 0..=10,
422 10..=30,
423 9..=13,
424 0..=30,
425 40..=44,
426 41..=42,
427 41..=47,
428 90..=100,
429 30..=40,
430 ],
431 &CompactConfig {
432 min_merge_count: 2,
433 optimal_merge_count: 3,
434 max_merge_count: 4,
435 max_merge_bytes: u64::MAX,
436 min_merge_duplication_bytes: 0,
437 optimal_merge_duplication_bytes: 0,
438 max_merge_segment_count: usize::MAX,
439 },
440 );
441 assert_eq!(merge_jobs, vec![vec![1, 2, 3], vec![5, 6, 8]]);
442 }
443
444 #[test]
445 fn test_compaction_jobs_by_size() {
446 let merge_jobs = compact(
447 [
448 0..=10,
449 10..=30,
450 9..=13,
451 0..=30,
452 40..=44,
453 41..=42,
454 41..=47,
455 90..=100,
456 30..=40,
457 ],
458 &CompactConfig {
459 min_merge_count: 2,
460 optimal_merge_count: 2,
461 max_merge_count: usize::MAX,
462 max_merge_bytes: 300,
463 min_merge_duplication_bytes: 0,
464 optimal_merge_duplication_bytes: u64::MAX,
465 max_merge_segment_count: usize::MAX,
466 },
467 );
468 assert_eq!(merge_jobs, vec![vec![1, 2, 3], vec![5, 6, 8]]);
469 }
470
471 #[test]
472 fn test_compaction_jobs_full() {
473 let merge_jobs = compact(
474 [
475 0..=10,
476 10..=30,
477 9..=13,
478 0..=30,
479 40..=44,
480 41..=42,
481 41..=47,
482 90..=100,
483 30..=40,
484 ],
485 &CompactConfig {
486 min_merge_count: 2,
487 optimal_merge_count: usize::MAX,
488 max_merge_count: usize::MAX,
489 max_merge_bytes: u64::MAX,
490 min_merge_duplication_bytes: 0,
491 optimal_merge_duplication_bytes: u64::MAX,
492 max_merge_segment_count: usize::MAX,
493 },
494 );
495 assert_eq!(merge_jobs, vec![vec![0, 1, 2, 3, 4, 5, 6, 8]]);
496 }
497
498 #[test]
499 fn test_compaction_jobs_big() {
500 let merge_jobs = compact(
501 [
502 0..=10,
503 10..=30,
504 9..=13,
505 0..=30,
506 40..=44,
507 41..=42,
508 41..=47,
509 90..=100,
510 30..=40,
511 ],
512 &CompactConfig {
513 min_merge_count: 2,
514 optimal_merge_count: 7,
515 max_merge_count: usize::MAX,
516 max_merge_bytes: u64::MAX,
517 min_merge_duplication_bytes: 0,
518 optimal_merge_duplication_bytes: 0,
519 max_merge_segment_count: usize::MAX,
520 },
521 );
522 assert_eq!(merge_jobs, vec![vec![1, 2, 3, 4, 5, 6, 8]]);
523 }
524
525 #[test]
526 fn test_compaction_jobs_small() {
527 let merge_jobs = compact(
528 [
529 0..=10,
530 10..=30,
531 9..=13,
532 0..=30,
533 40..=44,
534 41..=42,
535 41..=47,
536 90..=100,
537 30..=40,
538 ],
539 &CompactConfig {
540 min_merge_count: 2,
541 optimal_merge_count: 2,
542 max_merge_count: usize::MAX,
543 max_merge_bytes: u64::MAX,
544 min_merge_duplication_bytes: 0,
545 optimal_merge_duplication_bytes: 0,
546 max_merge_segment_count: usize::MAX,
547 },
548 );
549 assert_eq!(
550 merge_jobs,
551 vec![vec![0, 1], vec![2, 3], vec![4, 5], vec![6, 8]]
552 );
553 }
554
555 pub fn debug_print_compactables<T: Compactable>(compactables: &[T], max_key: u64) {
556 const WIDTH: usize = 128;
557 let char_width: u64 = max_key / WIDTH as u64;
558 for (i, c) in compactables.iter().enumerate() {
559 let range = c.range();
560 let size = c.size();
561 let start = usize::try_from(range.start() / char_width).unwrap();
562 let end = usize::try_from(range.end() / char_width).unwrap();
563 let mut line = format!("{i:>3} | ");
564 for j in 0..WIDTH {
565 if j >= start && j <= end {
566 line.push('█');
567 } else {
568 line.push(' ');
569 }
570 }
571 println!("{line} | {size:>6}");
572 }
573 }
574
575 #[test]
576 fn simulate_compactions() {
577 const KEY_RANGE: u64 = 10000;
578 const WARM_KEY_COUNT: usize = 100;
579 const INITIAL_CHUNK_SIZE: usize = 100;
580 const ITERATIONS: usize = 100;
581
582 let mut rnd = rand::rngs::SmallRng::from_seed([0; 32]);
583 let mut keys = (0..KEY_RANGE).collect::<Vec<_>>();
584 keys.shuffle(&mut rnd);
585
586 let mut batch_index = 0;
587 let mut containers = keys
588 .chunks(INITIAL_CHUNK_SIZE)
589 .map(|keys| Container::new(batch_index, keys.to_vec()))
590 .collect::<Vec<_>>();
591
592 let mut warm_keys = (0..WARM_KEY_COUNT)
593 .map(|_| {
594 let i = rnd.random_range(0..keys.len());
595 keys.swap_remove(i)
596 })
597 .collect::<Vec<_>>();
598
599 let mut number_of_compactions = 0;
600
601 for _ in 0..ITERATIONS {
602 let total_size = containers.iter().map(|c| c.keys.len()).sum::<usize>();
603 let metrics = compute_metrics(&containers, 0..=KEY_RANGE);
604 debug_print_compactables(&containers, KEY_RANGE);
605 println!(
606 "size: {}, coverage: {}, overlap: {}, duplication: {}, items: {}",
607 total_size,
608 metrics.coverage,
609 metrics.overlap,
610 metrics.duplication,
611 containers.len()
612 );
613
614 assert!(containers.len() < 400);
615 let config = CompactConfig {
618 max_merge_count: 16,
619 min_merge_count: 2,
620 optimal_merge_count: 4,
621 max_merge_bytes: 5000,
622 min_merge_duplication_bytes: 500,
623 optimal_merge_duplication_bytes: 1000,
624 max_merge_segment_count: 4,
625 };
626 let (jobs, _) = get_merge_segments(&containers, &config);
627 if !jobs.is_empty() {
628 println!("{jobs:?}");
629
630 batch_index += 1;
631 do_compact(&mut containers, jobs, batch_index);
632 number_of_compactions += 1;
633
634 let new_metrics = compute_metrics(&containers, 0..=KEY_RANGE);
635 println!(
636 "Compaction done: coverage: {} ({}), overlap: {} ({}), duplication: {} ({})",
637 new_metrics.coverage,
638 new_metrics.coverage - metrics.coverage,
639 new_metrics.overlap,
640 new_metrics.overlap - metrics.overlap,
641 new_metrics.duplication,
642 new_metrics.duplication - metrics.duplication
643 );
644 } else {
645 println!("No compaction needed");
646 }
647
648 batch_index += 1;
650 let pieces = rnd.random_range(1..4);
651 for chunk in warm_keys.chunks(warm_keys.len().div_ceil(pieces)) {
652 containers.push(Container::new(batch_index, chunk.to_vec()));
653 }
654
655 let changes = rnd.random_range(0..100);
657 for _ in 0..changes {
658 let i = rnd.random_range(0..warm_keys.len());
659 let j = rnd.random_range(0..keys.len());
660 swap(&mut warm_keys[i], &mut keys[j]);
661 }
662 }
663 println!("Number of compactions: {number_of_compactions}");
664
665 let metrics = compute_metrics(&containers, 0..=KEY_RANGE);
666 assert!(number_of_compactions < 30);
667 assert!(containers.len() < 30);
668 assert!(metrics.duplication < 0.5);
669 }
670
671 struct Container {
672 batch_index: usize,
673 keys: Vec<u64>,
674 }
675
676 impl Container {
677 fn new(batch_index: usize, mut keys: Vec<u64>) -> Self {
678 keys.sort_unstable();
679 Self { batch_index, keys }
680 }
681 }
682
683 impl Compactable for Container {
684 fn range(&self) -> RangeInclusive<u64> {
685 (self.keys[0])..=(*self.keys.last().unwrap())
686 }
687
688 fn size(&self) -> u64 {
689 self.keys.len() as u64
690 }
691 }
692
693 impl Debug for Container {
694 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
695 let (l, r) = self.range().into_inner();
696 write!(
697 f,
698 "#{} {}b {l} - {r} ({})",
699 self.batch_index,
700 self.keys.len(),
701 r - l
702 )
703 }
704 }
705
706 fn do_compact(containers: &mut Vec<Container>, segments: MergeSegments, batch_index: usize) {
707 let total_size = containers.iter().map(|c| c.keys.len()).sum::<usize>();
708 for merge_job in segments {
709 if merge_job.len() < 2 {
710 let container = replace(
711 &mut containers[merge_job[0]],
712 Container {
713 batch_index: 0,
714 keys: Default::default(),
715 },
716 );
717 containers.push(container);
718 } else {
719 let mut keys = Vec::new();
720 for i in merge_job {
721 keys.append(&mut containers[i].keys);
722 }
723 keys.sort_unstable();
724 keys.dedup();
725 containers.extend(keys.chunks(1000).map(|keys| Container {
726 batch_index,
727 keys: keys.to_vec(),
728 }));
729 }
730 }
731
732 containers.retain(|c| !c.keys.is_empty());
733 let total_size2 = containers.iter().map(|c| c.keys.len()).sum::<usize>();
734 println!("Compaction done: {total_size} -> {total_size2}",);
735 }
736}