turbo_persistence/compaction/
selector.rs

1use std::ops::RangeInclusive;
2
3use smallvec::{SmallVec, smallvec};
4
5use crate::compaction::interval_map::IntervalMap;
6
7/// Represents part of a database (i.e. an SST file) with a range of keys (i.e. hashes) and a size
8/// of that data in bytes.
9pub trait Compactable {
10    /// The range of keys stored in this database segment.
11    fn range(&self) -> RangeInclusive<u64>;
12
13    /// The size of the compactable database segment in bytes.
14    fn size(&self) -> u64;
15}
16
17fn is_overlapping(a: &RangeInclusive<u64>, b: &RangeInclusive<u64>) -> bool {
18    a.start() <= b.end() && b.start() <= a.end()
19}
20
21fn spread(range: &RangeInclusive<u64>) -> u128 {
22    // the spread of `0..=u64::MAX` is `u64::MAX + 1`, so this could overflow as u64
23    u128::from(range.end() - range.start()) + 1
24}
25
26/// Extends the range `a` to include the range `b`, returns `true` if the range was extended.
27fn extend_range(a: &mut RangeInclusive<u64>, b: &RangeInclusive<u64>) -> bool {
28    let mut extended = false;
29    if b.start() < a.start() {
30        *a = (*b.start())..=(*a.end());
31        extended = true;
32    }
33    if b.end() > a.end() {
34        *a = (*a.start())..=(*b.end());
35        extended = true;
36    }
37    extended
38}
39
40#[derive(Debug)]
41pub struct CompactableMetrics {
42    /// The total coverage of the compactables.
43    pub coverage: f32,
44
45    /// The maximum overlap of the compactables.
46    pub overlap: f32,
47
48    /// The possible duplication of the compactables.
49    pub duplicated_size: u64,
50
51    /// The possible duplication of the compactables as factor to total size.
52    pub duplication: f32,
53}
54
55/// Computes metrics about the compactables.
56pub fn compute_metrics<T: Compactable>(
57    compactables: &[T],
58    full_range: RangeInclusive<u64>,
59) -> CompactableMetrics {
60    let mut interval_map = IntervalMap::<Option<(DuplicationInfo, usize)>>::new();
61    let mut coverage = 0.0f32;
62    for c in compactables {
63        let range = c.range();
64        coverage += spread(&range) as f32;
65        interval_map.update(range.clone(), |value| {
66            let (dup_info, count) = value.get_or_insert_default();
67            dup_info.add(c.size(), &range);
68            *count += 1;
69        });
70    }
71    let full_spread = spread(&full_range) as f32;
72
73    let (duplicated_size, duplication, overlap) = interval_map
74        .iter()
75        .flat_map(|(range, value)| Some((range, value.as_ref()?)))
76        .map(|(range, (dup_info, count))| {
77            let duplicated_size = dup_info.duplication(&range);
78            let total_size = dup_info.size(&range);
79            let overlap = spread(&range) as f32 * count.saturating_sub(1) as f32;
80            (duplicated_size, total_size, overlap)
81        })
82        .reduce(|(dup1, total1, overlap1), (dup2, total2, overlap2)| {
83            (dup1 + dup2, total1 + total2, overlap1 + overlap2)
84        })
85        .map(|(duplicated_size, total_size, overlap)| {
86            (
87                duplicated_size,
88                if total_size > 0 {
89                    duplicated_size as f32 / total_size as f32
90                } else {
91                    0.0
92                },
93                overlap,
94            )
95        })
96        .unwrap_or((0, 0.0, 0.0));
97
98    CompactableMetrics {
99        coverage: coverage / full_spread,
100        overlap: overlap / full_spread,
101        duplicated_size,
102        duplication,
103    }
104}
105
106/// Configuration for the compaction algorithm.
107pub struct CompactConfig {
108    /// The minimum number of files to merge at once.
109    pub min_merge_count: usize,
110
111    /// The optimal number of files to merge at once.
112    pub optimal_merge_count: usize,
113
114    /// The maximum number of files to merge at once.
115    pub max_merge_count: usize,
116
117    /// The maximum size of all files to merge at once.
118    pub max_merge_bytes: u64,
119
120    /// The amount of duplication that need to be in a merge job to be considered for merging.
121    pub min_merge_duplication_bytes: u64,
122
123    /// The optimal duplication size for merging.
124    pub optimal_merge_duplication_bytes: u64,
125
126    /// The maximum number of merge segments to determine.
127    pub max_merge_segment_count: usize,
128}
129
130impl Default for CompactConfig {
131    fn default() -> Self {
132        const MB: u64 = 1024 * 1024;
133        Self {
134            min_merge_count: 2,
135            optimal_merge_count: 8,
136            max_merge_count: 32,
137            max_merge_bytes: 500 * MB,
138            min_merge_duplication_bytes: MB,
139            optimal_merge_duplication_bytes: 10 * MB,
140            max_merge_segment_count: 8,
141        }
142    }
143}
144
145#[derive(Clone, Default, Eq, PartialEq)]
146struct DuplicationInfo {
147    /// The sum of all encountered scaled sizes.
148    total_size: u64,
149    /// The largest encountered single scaled size.
150    max_size: u64,
151}
152
153impl DuplicationInfo {
154    /// Get a value in the range `0..=u64` that represents the estimated amount of duplication
155    /// across the given range. The units are arbitrary, but linear.
156    fn duplication(&self, range: &RangeInclusive<u64>) -> u64 {
157        if self.total_size == 0 {
158            return 0;
159        }
160        // the maximum numerator value is `u64::MAX + 1`
161        u64::try_from(
162            u128::from(self.total_size - self.max_size) * spread(range)
163                / (u128::from(u64::MAX) + 1),
164        )
165        .expect("should not overflow, denominator was `u64::MAX+1`")
166    }
167
168    /// The estimated size (in bytes) of a database segment containing `range` keys.
169    fn size(&self, range: &RangeInclusive<u64>) -> u64 {
170        if self.total_size == 0 {
171            return 0;
172        }
173        // the maximum numerator value is `u64::MAX + 1`
174        u64::try_from(u128::from(self.total_size) * spread(range) / (u128::from(u64::MAX) + 1))
175            .expect("should not overflow, denominator was `u64::MAX+1`")
176    }
177
178    fn add(&mut self, size: u64, range: &RangeInclusive<u64>) {
179        // Assumption: `size` is typically much smaller than `spread(range)`. The spread is some
180        // fraction of `u64` (the full possible key-space), but no SST file is anywhere close to
181        // `u64::MAX` bytes.
182
183        // Scale size to full range:
184        let scaled_size =
185            u64::try_from(u128::from(size) * (u128::from(u64::MAX) + 1) / spread(range))
186                .unwrap_or(u64::MAX);
187        self.total_size = self.total_size.saturating_add(scaled_size);
188        self.max_size = self.max_size.max(scaled_size);
189    }
190}
191
192fn total_duplication_size(duplication: &IntervalMap<Option<DuplicationInfo>>) -> u64 {
193    duplication
194        .iter()
195        .flat_map(|(range, info)| Some((range, info.as_ref()?)))
196        .map(|(range, info)| info.duplication(&range))
197        .sum()
198}
199
200type MergeSegments = Vec<SmallVec<[usize; 1]>>;
201
202pub fn get_merge_segments<T: Compactable>(
203    compactables: &[T],
204    config: &CompactConfig,
205) -> MergeSegments {
206    // Process all compactables in reverse order.
207    // For each compactable, find the smallest set of compactables that overlaps with it and matches
208    // the conditions.
209    // To find the set:
210    // - Set the current range to the range of the first unused compactable.
211    // - When the set matches the conditions, add the set as merge job, mark all used compactables
212    //   and continue.
213    // - Find the next unused compactable that overlaps with the current range.
214    // - If the range need to be extended, restart the search with the new range.
215    // - If the compactable is within the range, add it to the current set.
216    // - If the set is too large, mark the starting compactable as used and continue with the next
217
218    let mut unused_compactables = compactables.iter().collect::<Vec<_>>();
219    let mut used_compactables = vec![false; compactables.len()];
220
221    let mut merge_segments: MergeSegments = Vec::new();
222    let mut real_merge_segments = 0;
223
224    // Iterate in reverse order to process the compactables from the end.
225    // That's the order in which compactables are read, so we need to keep that order.
226    'outer: while let Some(start_compactable) = unused_compactables.pop() {
227        let start_index = unused_compactables.len();
228        if used_compactables[start_index] {
229            continue;
230        }
231        if real_merge_segments >= config.max_merge_segment_count {
232            // We have reached the maximum number of merge jobs, so we stop here.
233            break;
234        }
235        let mut current_range = start_compactable.range();
236
237        // We might need to restart the search if we need to extend the range.
238        'search: loop {
239            let mut current_set = smallvec![start_index];
240            let mut current_size = start_compactable.size();
241            let mut duplication = IntervalMap::<Option<DuplicationInfo>>::new();
242            let mut current_skip = 0;
243
244            // We will capture compactables in the current_range until we find a optimal merge
245            // segment or are limited by size or count.
246            loop {
247                // Early exit if we have found an optimal merge segment.
248                let duplication_size = total_duplication_size(&duplication);
249                let optimal_merge_job = current_set.len() >= config.optimal_merge_count
250                    && duplication_size >= config.optimal_merge_duplication_bytes;
251                if optimal_merge_job {
252                    for &i in current_set.iter() {
253                        used_compactables[i] = true;
254                    }
255                    current_set.reverse();
256                    merge_segments.push(current_set);
257                    real_merge_segments += 1;
258                    continue 'outer;
259                }
260
261                // If we are limited by size or count, we might also crate a merge segment if it's
262                // within the limits.
263                let valid_merge_job = current_set.len() >= config.min_merge_count
264                    && duplication_size >= config.min_merge_duplication_bytes;
265                let mut end_job =
266                    |mut current_set: SmallVec<[usize; 1]>, used_compactables: &mut Vec<bool>| {
267                        if valid_merge_job {
268                            for &i in current_set.iter() {
269                                used_compactables[i] = true;
270                            }
271                            current_set.reverse();
272                            merge_segments.push(current_set);
273                            real_merge_segments += 1;
274                        } else {
275                            merge_segments.push(smallvec![start_index]);
276                        }
277                    };
278
279                // Check if we run into the count or size limit.
280                if current_set.len() >= config.max_merge_count
281                    || current_size >= config.max_merge_bytes
282                {
283                    // The set is so large so we can't add more compactables to it.
284                    end_job(current_set, &mut used_compactables);
285                    continue 'outer;
286                }
287
288                // Find the next compactable that overlaps with the current range.
289                let Some((next_index, compactable)) = unused_compactables
290                    .iter()
291                    .enumerate()
292                    .rev()
293                    .skip(current_skip)
294                    .find(|(i, compactable)| {
295                        if used_compactables[*i] {
296                            return false;
297                        }
298                        let range = compactable.range();
299                        is_overlapping(&current_range, &range)
300                    })
301                else {
302                    // There are no more compactables that overlap with the current range.
303                    end_job(current_set, &mut used_compactables);
304                    continue 'outer;
305                };
306                current_skip = unused_compactables.len() - next_index;
307
308                // Check if we run into the size limit.
309                let size = compactable.size();
310                if current_size + size > config.max_merge_bytes {
311                    // The next compactable is too large to be added to the current set.
312                    end_job(current_set, &mut used_compactables);
313                    continue 'outer;
314                }
315
316                // Check if the next compactable is larger than the current range. We need to
317                // restart from beginning here as there could be previously skipped compactables
318                // that are within the larger range.
319                let range = compactable.range();
320                if extend_range(&mut current_range, &range) {
321                    // The range was extended, so we need to restart the search.
322                    continue 'search;
323                }
324
325                // The next compactable is within the current range, so we can add it to the current
326                // set.
327                current_set.push(next_index);
328                current_size += size;
329                duplication.update(range.clone(), |dup_info| {
330                    dup_info.get_or_insert_default().add(size, &range);
331                });
332            }
333        }
334    }
335
336    while merge_segments.last().is_some_and(|s| s.len() == 1) {
337        // Remove segments that only contain a single compactable.
338        merge_segments.pop();
339    }
340
341    // Reverse it since we processed in reverse order.
342    merge_segments.reverse();
343
344    // Remove single compectable segments that don't overlap with previous segments. We don't need
345    // to touch them.
346    let mut used_ranges = IntervalMap::<bool>::new();
347    merge_segments.retain(|segment| {
348        // Remove a single element segments which doesn't overlap with previous used ranges.
349        if segment.len() == 1 {
350            let range = compactables[segment[0]].range();
351            if !used_ranges.iter_intersecting(range).any(|(_, v)| *v) {
352                return false;
353            }
354        }
355        // Mark the ranges of the segment as used.
356        for i in segment {
357            let range = compactables[*i].range();
358            used_ranges.replace(range, true);
359        }
360        true
361    });
362
363    merge_segments
364}
365
366#[cfg(test)]
367mod tests {
368    use std::{
369        fmt::Debug,
370        mem::{replace, swap},
371    };
372
373    use rand::{Rng, SeedableRng, seq::SliceRandom};
374
375    use super::*;
376
377    struct TestCompactable {
378        range: RangeInclusive<u64>,
379        size: u64,
380    }
381
382    impl Compactable for TestCompactable {
383        fn range(&self) -> RangeInclusive<u64> {
384            self.range.clone()
385        }
386
387        fn size(&self) -> u64 {
388            self.size
389        }
390    }
391
392    fn compact<const N: usize>(
393        ranges: [RangeInclusive<u64>; N],
394        config: &CompactConfig,
395    ) -> Vec<Vec<usize>> {
396        let compactables = ranges
397            .into_iter()
398            .map(|range| TestCompactable { range, size: 100 })
399            .collect::<Vec<_>>();
400        let jobs = get_merge_segments(&compactables, config);
401        jobs.into_iter()
402            .map(|job| job.into_iter().collect())
403            .collect()
404    }
405
406    #[test]
407    fn test_compaction_jobs_by_count() {
408        let merge_jobs = compact(
409            [
410                0..=10,
411                10..=30,
412                9..=13,
413                0..=30,
414                40..=44,
415                41..=42,
416                41..=47,
417                90..=100,
418                30..=40,
419            ],
420            &CompactConfig {
421                min_merge_count: 2,
422                optimal_merge_count: 3,
423                max_merge_count: 4,
424                max_merge_bytes: u64::MAX,
425                min_merge_duplication_bytes: 0,
426                optimal_merge_duplication_bytes: 0,
427                max_merge_segment_count: usize::MAX,
428            },
429        );
430        assert_eq!(merge_jobs, vec![vec![1, 2, 3], vec![5, 6, 8]]);
431    }
432
433    #[test]
434    fn test_compaction_jobs_by_size() {
435        let merge_jobs = compact(
436            [
437                0..=10,
438                10..=30,
439                9..=13,
440                0..=30,
441                40..=44,
442                41..=42,
443                41..=47,
444                90..=100,
445                30..=40,
446            ],
447            &CompactConfig {
448                min_merge_count: 2,
449                optimal_merge_count: 2,
450                max_merge_count: usize::MAX,
451                max_merge_bytes: 300,
452                min_merge_duplication_bytes: 0,
453                optimal_merge_duplication_bytes: u64::MAX,
454                max_merge_segment_count: usize::MAX,
455            },
456        );
457        assert_eq!(merge_jobs, vec![vec![1, 2, 3], vec![5, 6, 8]]);
458    }
459
460    #[test]
461    fn test_compaction_jobs_full() {
462        let merge_jobs = compact(
463            [
464                0..=10,
465                10..=30,
466                9..=13,
467                0..=30,
468                40..=44,
469                41..=42,
470                41..=47,
471                90..=100,
472                30..=40,
473            ],
474            &CompactConfig {
475                min_merge_count: 2,
476                optimal_merge_count: usize::MAX,
477                max_merge_count: usize::MAX,
478                max_merge_bytes: u64::MAX,
479                min_merge_duplication_bytes: 0,
480                optimal_merge_duplication_bytes: u64::MAX,
481                max_merge_segment_count: usize::MAX,
482            },
483        );
484        assert_eq!(merge_jobs, vec![vec![0, 1, 2, 3, 4, 5, 6, 8]]);
485    }
486
487    #[test]
488    fn test_compaction_jobs_big() {
489        let merge_jobs = compact(
490            [
491                0..=10,
492                10..=30,
493                9..=13,
494                0..=30,
495                40..=44,
496                41..=42,
497                41..=47,
498                90..=100,
499                30..=40,
500            ],
501            &CompactConfig {
502                min_merge_count: 2,
503                optimal_merge_count: 7,
504                max_merge_count: usize::MAX,
505                max_merge_bytes: u64::MAX,
506                min_merge_duplication_bytes: 0,
507                optimal_merge_duplication_bytes: 0,
508                max_merge_segment_count: usize::MAX,
509            },
510        );
511        assert_eq!(merge_jobs, vec![vec![1, 2, 3, 4, 5, 6, 8]]);
512    }
513
514    #[test]
515    fn test_compaction_jobs_small() {
516        let merge_jobs = compact(
517            [
518                0..=10,
519                10..=30,
520                9..=13,
521                0..=30,
522                40..=44,
523                41..=42,
524                41..=47,
525                90..=100,
526                30..=40,
527            ],
528            &CompactConfig {
529                min_merge_count: 2,
530                optimal_merge_count: 2,
531                max_merge_count: usize::MAX,
532                max_merge_bytes: u64::MAX,
533                min_merge_duplication_bytes: 0,
534                optimal_merge_duplication_bytes: 0,
535                max_merge_segment_count: usize::MAX,
536            },
537        );
538        assert_eq!(
539            merge_jobs,
540            vec![vec![0, 1], vec![2, 3], vec![4, 5], vec![6, 8]]
541        );
542    }
543
544    pub fn debug_print_compactables<T: Compactable>(compactables: &[T], max_key: u64) {
545        const WIDTH: usize = 128;
546        let char_width: u64 = max_key / WIDTH as u64;
547        for (i, c) in compactables.iter().enumerate() {
548            let range = c.range();
549            let size = c.size();
550            let start = usize::try_from(range.start() / char_width).unwrap();
551            let end = usize::try_from(range.end() / char_width).unwrap();
552            let mut line = format!("{i:>3} | ");
553            for j in 0..WIDTH {
554                if j >= start && j <= end {
555                    line.push('█');
556                } else {
557                    line.push(' ');
558                }
559            }
560            println!("{line} | {size:>6}");
561        }
562    }
563
564    #[test]
565    fn simulate_compactions() {
566        const KEY_RANGE: u64 = 10000;
567        const WARM_KEY_COUNT: usize = 100;
568        const INITIAL_CHUNK_SIZE: usize = 100;
569        const ITERATIONS: usize = 100;
570
571        let mut rnd = rand::rngs::SmallRng::from_seed([0; 32]);
572        let mut keys = (0..KEY_RANGE).collect::<Vec<_>>();
573        keys.shuffle(&mut rnd);
574
575        let mut batch_index = 0;
576        let mut containers = keys
577            .chunks(INITIAL_CHUNK_SIZE)
578            .map(|keys| Container::new(batch_index, keys.to_vec()))
579            .collect::<Vec<_>>();
580
581        let mut warm_keys = (0..WARM_KEY_COUNT)
582            .map(|_| {
583                let i = rnd.random_range(0..keys.len());
584                keys.swap_remove(i)
585            })
586            .collect::<Vec<_>>();
587
588        let mut number_of_compactions = 0;
589
590        for _ in 0..ITERATIONS {
591            let total_size = containers.iter().map(|c| c.keys.len()).sum::<usize>();
592            let metrics = compute_metrics(&containers, 0..=KEY_RANGE);
593            debug_print_compactables(&containers, KEY_RANGE);
594            println!(
595                "size: {}, coverage: {}, overlap: {}, duplication: {}, items: {}",
596                total_size,
597                metrics.coverage,
598                metrics.overlap,
599                metrics.duplication,
600                containers.len()
601            );
602
603            assert!(containers.len() < 400);
604            // assert!(metrics.duplication < 4.0);
605
606            let config = CompactConfig {
607                max_merge_count: 16,
608                min_merge_count: 2,
609                optimal_merge_count: 4,
610                max_merge_bytes: 5000,
611                min_merge_duplication_bytes: 200,
612                optimal_merge_duplication_bytes: 500,
613                max_merge_segment_count: 4,
614            };
615            let jobs = get_merge_segments(&containers, &config);
616            if !jobs.is_empty() {
617                println!("{jobs:?}");
618
619                batch_index += 1;
620                do_compact(&mut containers, jobs, batch_index);
621                number_of_compactions += 1;
622
623                let new_metrics = compute_metrics(&containers, 0..=KEY_RANGE);
624                println!(
625                    "Compaction done: coverage: {} ({}), overlap: {} ({}), duplication: {} ({})",
626                    new_metrics.coverage,
627                    new_metrics.coverage - metrics.coverage,
628                    new_metrics.overlap,
629                    new_metrics.overlap - metrics.overlap,
630                    new_metrics.duplication,
631                    new_metrics.duplication - metrics.duplication
632                );
633            } else {
634                println!("No compaction needed");
635            }
636
637            // Modify warm keys
638            batch_index += 1;
639            let pieces = rnd.random_range(1..4);
640            for chunk in warm_keys.chunks(warm_keys.len().div_ceil(pieces)) {
641                containers.push(Container::new(batch_index, chunk.to_vec()));
642            }
643
644            // Change some warm keys
645            let changes = rnd.random_range(0..100);
646            for _ in 0..changes {
647                let i = rnd.random_range(0..warm_keys.len());
648                let j = rnd.random_range(0..keys.len());
649                swap(&mut warm_keys[i], &mut keys[j]);
650            }
651        }
652        println!("Number of compactions: {number_of_compactions}");
653
654        let metrics = compute_metrics(&containers, 0..=KEY_RANGE);
655        assert!(number_of_compactions < 40);
656        assert!(containers.len() < 30);
657        assert!(metrics.duplication < 0.5);
658    }
659
660    struct Container {
661        batch_index: usize,
662        keys: Vec<u64>,
663    }
664
665    impl Container {
666        fn new(batch_index: usize, mut keys: Vec<u64>) -> Self {
667            keys.sort_unstable();
668            Self { batch_index, keys }
669        }
670    }
671
672    impl Compactable for Container {
673        fn range(&self) -> RangeInclusive<u64> {
674            (self.keys[0])..=(*self.keys.last().unwrap())
675        }
676
677        fn size(&self) -> u64 {
678            self.keys.len() as u64
679        }
680    }
681
682    impl Debug for Container {
683        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
684            let (l, r) = self.range().into_inner();
685            write!(
686                f,
687                "#{} {}b {l} - {r} ({})",
688                self.batch_index,
689                self.keys.len(),
690                r - l
691            )
692        }
693    }
694
695    fn do_compact(containers: &mut Vec<Container>, segments: MergeSegments, batch_index: usize) {
696        let total_size = containers.iter().map(|c| c.keys.len()).sum::<usize>();
697        for merge_job in segments {
698            if merge_job.len() < 2 {
699                let container = replace(
700                    &mut containers[merge_job[0]],
701                    Container {
702                        batch_index: 0,
703                        keys: Default::default(),
704                    },
705                );
706                containers.push(container);
707            } else {
708                let mut keys = Vec::new();
709                for i in merge_job {
710                    keys.append(&mut containers[i].keys);
711                }
712                keys.sort_unstable();
713                keys.dedup();
714                containers.extend(keys.chunks(1000).map(|keys| Container {
715                    batch_index,
716                    keys: keys.to_vec(),
717                }));
718            }
719        }
720
721        containers.retain(|c| !c.keys.is_empty());
722        let total_size2 = containers.iter().map(|c| c.keys.len()).sum::<usize>();
723        println!("Compaction done: {total_size} -> {total_size2}",);
724    }
725}