1use std::ops::RangeInclusive;
2
3use smallvec::{SmallVec, smallvec};
4
5use crate::compaction::interval_map::IntervalMap;
6
7pub trait Compactable {
10 fn range(&self) -> RangeInclusive<u64>;
12
13 fn size(&self) -> u64;
15}
16
17fn is_overlapping(a: &RangeInclusive<u64>, b: &RangeInclusive<u64>) -> bool {
18 a.start() <= b.end() && b.start() <= a.end()
19}
20
21fn spread(range: &RangeInclusive<u64>) -> u128 {
22 u128::from(range.end() - range.start()) + 1
24}
25
26fn extend_range(a: &mut RangeInclusive<u64>, b: &RangeInclusive<u64>) -> bool {
28 let mut extended = false;
29 if b.start() < a.start() {
30 *a = (*b.start())..=(*a.end());
31 extended = true;
32 }
33 if b.end() > a.end() {
34 *a = (*a.start())..=(*b.end());
35 extended = true;
36 }
37 extended
38}
39
40#[derive(Debug)]
41pub struct CompactableMetrics {
42 pub coverage: f32,
44
45 pub overlap: f32,
47
48 pub duplicated_size: u64,
50
51 pub duplication: f32,
53}
54
55pub fn compute_metrics<T: Compactable>(
57 compactables: &[T],
58 full_range: RangeInclusive<u64>,
59) -> CompactableMetrics {
60 let mut interval_map = IntervalMap::<Option<(DuplicationInfo, usize)>>::new();
61 let mut coverage = 0.0f32;
62 for c in compactables {
63 let range = c.range();
64 coverage += spread(&range) as f32;
65 interval_map.update(range.clone(), |value| {
66 let (dup_info, count) = value.get_or_insert_default();
67 dup_info.add(c.size(), &range);
68 *count += 1;
69 });
70 }
71 let full_spread = spread(&full_range) as f32;
72
73 let (duplicated_size, duplication, overlap) = interval_map
74 .iter()
75 .flat_map(|(range, value)| Some((range, value.as_ref()?)))
76 .map(|(range, (dup_info, count))| {
77 let duplicated_size = dup_info.duplication(&range);
78 let total_size = dup_info.size(&range);
79 let overlap = spread(&range) as f32 * count.saturating_sub(1) as f32;
80 (duplicated_size, total_size, overlap)
81 })
82 .reduce(|(dup1, total1, overlap1), (dup2, total2, overlap2)| {
83 (dup1 + dup2, total1 + total2, overlap1 + overlap2)
84 })
85 .map(|(duplicated_size, total_size, overlap)| {
86 (
87 duplicated_size,
88 if total_size > 0 {
89 duplicated_size as f32 / total_size as f32
90 } else {
91 0.0
92 },
93 overlap,
94 )
95 })
96 .unwrap_or((0, 0.0, 0.0));
97
98 CompactableMetrics {
99 coverage: coverage / full_spread,
100 overlap: overlap / full_spread,
101 duplicated_size,
102 duplication,
103 }
104}
105
106pub struct CompactConfig {
108 pub min_merge_count: usize,
110
111 pub optimal_merge_count: usize,
113
114 pub max_merge_count: usize,
116
117 pub max_merge_bytes: u64,
119
120 pub min_merge_duplication_bytes: u64,
122
123 pub optimal_merge_duplication_bytes: u64,
125
126 pub max_merge_segment_count: usize,
128}
129
130impl Default for CompactConfig {
131 fn default() -> Self {
132 const MB: u64 = 1024 * 1024;
133 Self {
134 min_merge_count: 2,
135 optimal_merge_count: 8,
136 max_merge_count: 32,
137 max_merge_bytes: 500 * MB,
138 min_merge_duplication_bytes: MB,
139 optimal_merge_duplication_bytes: 10 * MB,
140 max_merge_segment_count: 8,
141 }
142 }
143}
144
145#[derive(Clone, Default, Eq, PartialEq)]
146struct DuplicationInfo {
147 total_size: u64,
149 max_size: u64,
151}
152
153impl DuplicationInfo {
154 fn duplication(&self, range: &RangeInclusive<u64>) -> u64 {
157 if self.total_size == 0 {
158 return 0;
159 }
160 u64::try_from(
162 u128::from(self.total_size - self.max_size) * spread(range)
163 / (u128::from(u64::MAX) + 1),
164 )
165 .expect("should not overflow, denominator was `u64::MAX+1`")
166 }
167
168 fn size(&self, range: &RangeInclusive<u64>) -> u64 {
170 if self.total_size == 0 {
171 return 0;
172 }
173 u64::try_from(u128::from(self.total_size) * spread(range) / (u128::from(u64::MAX) + 1))
175 .expect("should not overflow, denominator was `u64::MAX+1`")
176 }
177
178 fn add(&mut self, size: u64, range: &RangeInclusive<u64>) {
179 let scaled_size =
185 u64::try_from(u128::from(size) * (u128::from(u64::MAX) + 1) / spread(range))
186 .unwrap_or(u64::MAX);
187 self.total_size = self.total_size.saturating_add(scaled_size);
188 self.max_size = self.max_size.max(scaled_size);
189 }
190}
191
192fn total_duplication_size(duplication: &IntervalMap<Option<DuplicationInfo>>) -> u64 {
193 duplication
194 .iter()
195 .flat_map(|(range, info)| Some((range, info.as_ref()?)))
196 .map(|(range, info)| info.duplication(&range))
197 .sum()
198}
199
200type MergeSegments = Vec<SmallVec<[usize; 1]>>;
201
202pub fn get_merge_segments<T: Compactable>(
203 compactables: &[T],
204 config: &CompactConfig,
205) -> MergeSegments {
206 let mut unused_compactables = compactables.iter().collect::<Vec<_>>();
219 let mut used_compactables = vec![false; compactables.len()];
220
221 let mut merge_segments: MergeSegments = Vec::new();
222 let mut real_merge_segments = 0;
223
224 'outer: while let Some(start_compactable) = unused_compactables.pop() {
227 let start_index = unused_compactables.len();
228 if used_compactables[start_index] {
229 continue;
230 }
231 if real_merge_segments >= config.max_merge_segment_count {
232 break;
234 }
235 let mut current_range = start_compactable.range();
236
237 'search: loop {
239 let mut current_set = smallvec![start_index];
240 let mut current_size = start_compactable.size();
241 let mut duplication = IntervalMap::<Option<DuplicationInfo>>::new();
242 let mut current_skip = 0;
243
244 loop {
247 let duplication_size = total_duplication_size(&duplication);
249 let optimal_merge_job = current_set.len() >= config.optimal_merge_count
250 && duplication_size >= config.optimal_merge_duplication_bytes;
251 if optimal_merge_job {
252 for &i in current_set.iter() {
253 used_compactables[i] = true;
254 }
255 current_set.reverse();
256 merge_segments.push(current_set);
257 real_merge_segments += 1;
258 continue 'outer;
259 }
260
261 let valid_merge_job = current_set.len() >= config.min_merge_count
264 && duplication_size >= config.min_merge_duplication_bytes;
265 let mut end_job =
266 |mut current_set: SmallVec<[usize; 1]>, used_compactables: &mut Vec<bool>| {
267 if valid_merge_job {
268 for &i in current_set.iter() {
269 used_compactables[i] = true;
270 }
271 current_set.reverse();
272 merge_segments.push(current_set);
273 real_merge_segments += 1;
274 } else {
275 merge_segments.push(smallvec![start_index]);
276 }
277 };
278
279 if current_set.len() >= config.max_merge_count
281 || current_size >= config.max_merge_bytes
282 {
283 end_job(current_set, &mut used_compactables);
285 continue 'outer;
286 }
287
288 let Some((next_index, compactable)) = unused_compactables
290 .iter()
291 .enumerate()
292 .rev()
293 .skip(current_skip)
294 .find(|(i, compactable)| {
295 if used_compactables[*i] {
296 return false;
297 }
298 let range = compactable.range();
299 is_overlapping(¤t_range, &range)
300 })
301 else {
302 end_job(current_set, &mut used_compactables);
304 continue 'outer;
305 };
306 current_skip = unused_compactables.len() - next_index;
307
308 let size = compactable.size();
310 if current_size + size > config.max_merge_bytes {
311 end_job(current_set, &mut used_compactables);
313 continue 'outer;
314 }
315
316 let range = compactable.range();
320 if extend_range(&mut current_range, &range) {
321 continue 'search;
323 }
324
325 current_set.push(next_index);
328 current_size += size;
329 duplication.update(range.clone(), |dup_info| {
330 dup_info.get_or_insert_default().add(size, &range);
331 });
332 }
333 }
334 }
335
336 while merge_segments.last().is_some_and(|s| s.len() == 1) {
337 merge_segments.pop();
339 }
340
341 merge_segments.reverse();
343
344 let mut used_ranges = IntervalMap::<bool>::new();
347 merge_segments.retain(|segment| {
348 if segment.len() == 1 {
350 let range = compactables[segment[0]].range();
351 if !used_ranges.iter_intersecting(range).any(|(_, v)| *v) {
352 return false;
353 }
354 }
355 for i in segment {
357 let range = compactables[*i].range();
358 used_ranges.replace(range, true);
359 }
360 true
361 });
362
363 merge_segments
364}
365
366#[cfg(test)]
367mod tests {
368 use std::{
369 fmt::Debug,
370 mem::{replace, swap},
371 };
372
373 use rand::{Rng, SeedableRng, seq::SliceRandom};
374
375 use super::*;
376
377 struct TestCompactable {
378 range: RangeInclusive<u64>,
379 size: u64,
380 }
381
382 impl Compactable for TestCompactable {
383 fn range(&self) -> RangeInclusive<u64> {
384 self.range.clone()
385 }
386
387 fn size(&self) -> u64 {
388 self.size
389 }
390 }
391
392 fn compact<const N: usize>(
393 ranges: [RangeInclusive<u64>; N],
394 config: &CompactConfig,
395 ) -> Vec<Vec<usize>> {
396 let compactables = ranges
397 .into_iter()
398 .map(|range| TestCompactable { range, size: 100 })
399 .collect::<Vec<_>>();
400 let jobs = get_merge_segments(&compactables, config);
401 jobs.into_iter()
402 .map(|job| job.into_iter().collect())
403 .collect()
404 }
405
406 #[test]
407 fn test_compaction_jobs_by_count() {
408 let merge_jobs = compact(
409 [
410 0..=10,
411 10..=30,
412 9..=13,
413 0..=30,
414 40..=44,
415 41..=42,
416 41..=47,
417 90..=100,
418 30..=40,
419 ],
420 &CompactConfig {
421 min_merge_count: 2,
422 optimal_merge_count: 3,
423 max_merge_count: 4,
424 max_merge_bytes: u64::MAX,
425 min_merge_duplication_bytes: 0,
426 optimal_merge_duplication_bytes: 0,
427 max_merge_segment_count: usize::MAX,
428 },
429 );
430 assert_eq!(merge_jobs, vec![vec![1, 2, 3], vec![5, 6, 8]]);
431 }
432
433 #[test]
434 fn test_compaction_jobs_by_size() {
435 let merge_jobs = compact(
436 [
437 0..=10,
438 10..=30,
439 9..=13,
440 0..=30,
441 40..=44,
442 41..=42,
443 41..=47,
444 90..=100,
445 30..=40,
446 ],
447 &CompactConfig {
448 min_merge_count: 2,
449 optimal_merge_count: 2,
450 max_merge_count: usize::MAX,
451 max_merge_bytes: 300,
452 min_merge_duplication_bytes: 0,
453 optimal_merge_duplication_bytes: u64::MAX,
454 max_merge_segment_count: usize::MAX,
455 },
456 );
457 assert_eq!(merge_jobs, vec![vec![1, 2, 3], vec![5, 6, 8]]);
458 }
459
460 #[test]
461 fn test_compaction_jobs_full() {
462 let merge_jobs = compact(
463 [
464 0..=10,
465 10..=30,
466 9..=13,
467 0..=30,
468 40..=44,
469 41..=42,
470 41..=47,
471 90..=100,
472 30..=40,
473 ],
474 &CompactConfig {
475 min_merge_count: 2,
476 optimal_merge_count: usize::MAX,
477 max_merge_count: usize::MAX,
478 max_merge_bytes: u64::MAX,
479 min_merge_duplication_bytes: 0,
480 optimal_merge_duplication_bytes: u64::MAX,
481 max_merge_segment_count: usize::MAX,
482 },
483 );
484 assert_eq!(merge_jobs, vec![vec![0, 1, 2, 3, 4, 5, 6, 8]]);
485 }
486
487 #[test]
488 fn test_compaction_jobs_big() {
489 let merge_jobs = compact(
490 [
491 0..=10,
492 10..=30,
493 9..=13,
494 0..=30,
495 40..=44,
496 41..=42,
497 41..=47,
498 90..=100,
499 30..=40,
500 ],
501 &CompactConfig {
502 min_merge_count: 2,
503 optimal_merge_count: 7,
504 max_merge_count: usize::MAX,
505 max_merge_bytes: u64::MAX,
506 min_merge_duplication_bytes: 0,
507 optimal_merge_duplication_bytes: 0,
508 max_merge_segment_count: usize::MAX,
509 },
510 );
511 assert_eq!(merge_jobs, vec![vec![1, 2, 3, 4, 5, 6, 8]]);
512 }
513
514 #[test]
515 fn test_compaction_jobs_small() {
516 let merge_jobs = compact(
517 [
518 0..=10,
519 10..=30,
520 9..=13,
521 0..=30,
522 40..=44,
523 41..=42,
524 41..=47,
525 90..=100,
526 30..=40,
527 ],
528 &CompactConfig {
529 min_merge_count: 2,
530 optimal_merge_count: 2,
531 max_merge_count: usize::MAX,
532 max_merge_bytes: u64::MAX,
533 min_merge_duplication_bytes: 0,
534 optimal_merge_duplication_bytes: 0,
535 max_merge_segment_count: usize::MAX,
536 },
537 );
538 assert_eq!(
539 merge_jobs,
540 vec![vec![0, 1], vec![2, 3], vec![4, 5], vec![6, 8]]
541 );
542 }
543
544 pub fn debug_print_compactables<T: Compactable>(compactables: &[T], max_key: u64) {
545 const WIDTH: usize = 128;
546 let char_width: u64 = max_key / WIDTH as u64;
547 for (i, c) in compactables.iter().enumerate() {
548 let range = c.range();
549 let size = c.size();
550 let start = usize::try_from(range.start() / char_width).unwrap();
551 let end = usize::try_from(range.end() / char_width).unwrap();
552 let mut line = format!("{i:>3} | ");
553 for j in 0..WIDTH {
554 if j >= start && j <= end {
555 line.push('█');
556 } else {
557 line.push(' ');
558 }
559 }
560 println!("{line} | {size:>6}");
561 }
562 }
563
564 #[test]
565 fn simulate_compactions() {
566 const KEY_RANGE: u64 = 10000;
567 const WARM_KEY_COUNT: usize = 100;
568 const INITIAL_CHUNK_SIZE: usize = 100;
569 const ITERATIONS: usize = 100;
570
571 let mut rnd = rand::rngs::SmallRng::from_seed([0; 32]);
572 let mut keys = (0..KEY_RANGE).collect::<Vec<_>>();
573 keys.shuffle(&mut rnd);
574
575 let mut batch_index = 0;
576 let mut containers = keys
577 .chunks(INITIAL_CHUNK_SIZE)
578 .map(|keys| Container::new(batch_index, keys.to_vec()))
579 .collect::<Vec<_>>();
580
581 let mut warm_keys = (0..WARM_KEY_COUNT)
582 .map(|_| {
583 let i = rnd.random_range(0..keys.len());
584 keys.swap_remove(i)
585 })
586 .collect::<Vec<_>>();
587
588 let mut number_of_compactions = 0;
589
590 for _ in 0..ITERATIONS {
591 let total_size = containers.iter().map(|c| c.keys.len()).sum::<usize>();
592 let metrics = compute_metrics(&containers, 0..=KEY_RANGE);
593 debug_print_compactables(&containers, KEY_RANGE);
594 println!(
595 "size: {}, coverage: {}, overlap: {}, duplication: {}, items: {}",
596 total_size,
597 metrics.coverage,
598 metrics.overlap,
599 metrics.duplication,
600 containers.len()
601 );
602
603 assert!(containers.len() < 400);
604 let config = CompactConfig {
607 max_merge_count: 16,
608 min_merge_count: 2,
609 optimal_merge_count: 4,
610 max_merge_bytes: 5000,
611 min_merge_duplication_bytes: 200,
612 optimal_merge_duplication_bytes: 500,
613 max_merge_segment_count: 4,
614 };
615 let jobs = get_merge_segments(&containers, &config);
616 if !jobs.is_empty() {
617 println!("{jobs:?}");
618
619 batch_index += 1;
620 do_compact(&mut containers, jobs, batch_index);
621 number_of_compactions += 1;
622
623 let new_metrics = compute_metrics(&containers, 0..=KEY_RANGE);
624 println!(
625 "Compaction done: coverage: {} ({}), overlap: {} ({}), duplication: {} ({})",
626 new_metrics.coverage,
627 new_metrics.coverage - metrics.coverage,
628 new_metrics.overlap,
629 new_metrics.overlap - metrics.overlap,
630 new_metrics.duplication,
631 new_metrics.duplication - metrics.duplication
632 );
633 } else {
634 println!("No compaction needed");
635 }
636
637 batch_index += 1;
639 let pieces = rnd.random_range(1..4);
640 for chunk in warm_keys.chunks(warm_keys.len().div_ceil(pieces)) {
641 containers.push(Container::new(batch_index, chunk.to_vec()));
642 }
643
644 let changes = rnd.random_range(0..100);
646 for _ in 0..changes {
647 let i = rnd.random_range(0..warm_keys.len());
648 let j = rnd.random_range(0..keys.len());
649 swap(&mut warm_keys[i], &mut keys[j]);
650 }
651 }
652 println!("Number of compactions: {number_of_compactions}");
653
654 let metrics = compute_metrics(&containers, 0..=KEY_RANGE);
655 assert!(number_of_compactions < 40);
656 assert!(containers.len() < 30);
657 assert!(metrics.duplication < 0.5);
658 }
659
660 struct Container {
661 batch_index: usize,
662 keys: Vec<u64>,
663 }
664
665 impl Container {
666 fn new(batch_index: usize, mut keys: Vec<u64>) -> Self {
667 keys.sort_unstable();
668 Self { batch_index, keys }
669 }
670 }
671
672 impl Compactable for Container {
673 fn range(&self) -> RangeInclusive<u64> {
674 (self.keys[0])..=(*self.keys.last().unwrap())
675 }
676
677 fn size(&self) -> u64 {
678 self.keys.len() as u64
679 }
680 }
681
682 impl Debug for Container {
683 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
684 let (l, r) = self.range().into_inner();
685 write!(
686 f,
687 "#{} {}b {l} - {r} ({})",
688 self.batch_index,
689 self.keys.len(),
690 r - l
691 )
692 }
693 }
694
695 fn do_compact(containers: &mut Vec<Container>, segments: MergeSegments, batch_index: usize) {
696 let total_size = containers.iter().map(|c| c.keys.len()).sum::<usize>();
697 for merge_job in segments {
698 if merge_job.len() < 2 {
699 let container = replace(
700 &mut containers[merge_job[0]],
701 Container {
702 batch_index: 0,
703 keys: Default::default(),
704 },
705 );
706 containers.push(container);
707 } else {
708 let mut keys = Vec::new();
709 for i in merge_job {
710 keys.append(&mut containers[i].keys);
711 }
712 keys.sort_unstable();
713 keys.dedup();
714 containers.extend(keys.chunks(1000).map(|keys| Container {
715 batch_index,
716 keys: keys.to_vec(),
717 }));
718 }
719 }
720
721 containers.retain(|c| !c.keys.is_empty());
722 let total_size2 = containers.iter().map(|c| c.keys.len()).sum::<usize>();
723 println!("Compaction done: {total_size} -> {total_size2}",);
724 }
725}