turbo_persistence/compaction/
selector.rs

1use std::ops::RangeInclusive;
2
3use smallvec::{SmallVec, smallvec};
4
5use crate::compaction::interval_map::IntervalMap;
6
7/// Represents part of a database (i.e. an SST file) with a range of keys (i.e. hashes) and a size
8/// of that data in bytes.
9pub trait Compactable {
10    /// The range of keys stored in this database segment.
11    fn range(&self) -> RangeInclusive<u64>;
12
13    /// The size of the compactable database segment in bytes.
14    fn size(&self) -> u64;
15}
16
17fn is_overlapping(a: &RangeInclusive<u64>, b: &RangeInclusive<u64>) -> bool {
18    a.start() <= b.end() && b.start() <= a.end()
19}
20
21fn spread(range: &RangeInclusive<u64>) -> u128 {
22    // the spread of `0..=u64::MAX` is `u64::MAX + 1`, so this could overflow as u64
23    u128::from(range.end() - range.start()) + 1
24}
25
26/// Extends the range `a` to include the range `b`, returns `true` if the range was extended.
27fn extend_range(a: &mut RangeInclusive<u64>, b: &RangeInclusive<u64>) -> bool {
28    let mut extended = false;
29    if b.start() < a.start() {
30        *a = (*b.start())..=(*a.end());
31        extended = true;
32    }
33    if b.end() > a.end() {
34        *a = (*a.start())..=(*b.end());
35        extended = true;
36    }
37    extended
38}
39
40#[derive(Debug)]
41pub struct CompactableMetrics {
42    /// The total coverage of the compactables.
43    pub coverage: f32,
44
45    /// The maximum overlap of the compactables.
46    pub overlap: f32,
47
48    /// The possible duplication of the compactables.
49    pub duplicated_size: u64,
50
51    /// The possible duplication of the compactables as factor to total size.
52    pub duplication: f32,
53}
54
55/// Computes metrics about the compactables.
56pub fn compute_metrics<T: Compactable>(
57    compactables: &[T],
58    full_range: RangeInclusive<u64>,
59) -> CompactableMetrics {
60    let mut interval_map = IntervalMap::<Option<(DuplicationInfo, usize)>>::new();
61    let mut coverage = 0.0f32;
62    for c in compactables {
63        let range = c.range();
64        coverage += spread(&range) as f32;
65        interval_map.update(range.clone(), |value| {
66            let (dup_info, count) = value.get_or_insert_default();
67            dup_info.add(c.size(), &range);
68            *count += 1;
69        });
70    }
71    let full_spread = spread(&full_range) as f32;
72
73    let (duplicated_size, duplication, overlap) = interval_map
74        .iter()
75        .flat_map(|(range, value)| Some((range, value.as_ref()?)))
76        .map(|(range, (dup_info, count))| {
77            let duplicated_size = dup_info.duplication(&range);
78            let total_size = dup_info.size(&range);
79            let overlap = spread(&range) as f32 * count.saturating_sub(1) as f32;
80            (duplicated_size, total_size, overlap)
81        })
82        .reduce(|(dup1, total1, overlap1), (dup2, total2, overlap2)| {
83            (dup1 + dup2, total1 + total2, overlap1 + overlap2)
84        })
85        .map(|(duplicated_size, total_size, overlap)| {
86            (
87                duplicated_size,
88                if total_size > 0 {
89                    duplicated_size as f32 / total_size as f32
90                } else {
91                    0.0
92                },
93                overlap,
94            )
95        })
96        .unwrap_or((0, 0.0, 0.0));
97
98    CompactableMetrics {
99        coverage: coverage / full_spread,
100        overlap: overlap / full_spread,
101        duplicated_size,
102        duplication,
103    }
104}
105
106/// Configuration for the compaction algorithm.
107#[derive(Clone)]
108pub struct CompactConfig {
109    /// The minimum number of files to merge at once.
110    pub min_merge_count: usize,
111
112    /// The optimal number of files to merge at once.
113    pub optimal_merge_count: usize,
114
115    /// The maximum number of files to merge at once.
116    pub max_merge_count: usize,
117
118    /// The maximum size of all files to merge at once.
119    pub max_merge_bytes: u64,
120
121    /// The amount of duplication that need to be in a merge job to be considered for merging.
122    pub min_merge_duplication_bytes: u64,
123
124    /// The optimal duplication size for merging.
125    pub optimal_merge_duplication_bytes: u64,
126
127    /// The maximum number of merge segments to determine.
128    pub max_merge_segment_count: usize,
129}
130
131impl Default for CompactConfig {
132    fn default() -> Self {
133        const MB: u64 = 1024 * 1024;
134        Self {
135            min_merge_count: 2,
136            optimal_merge_count: 8,
137            max_merge_count: 32,
138            max_merge_bytes: 500 * MB,
139            min_merge_duplication_bytes: 50 * MB,
140            optimal_merge_duplication_bytes: 100 * MB,
141            max_merge_segment_count: 8,
142        }
143    }
144}
145
146#[derive(Clone, Default, Eq, PartialEq)]
147struct DuplicationInfo {
148    /// The sum of all encountered scaled sizes.
149    total_size: u64,
150    /// The largest encountered single scaled size.
151    max_size: u64,
152}
153
154impl DuplicationInfo {
155    /// Get a value in the range `0..=u64` that represents the estimated amount of duplication
156    /// across the given range. The units are arbitrary, but linear.
157    fn duplication(&self, range: &RangeInclusive<u64>) -> u64 {
158        if self.total_size == 0 {
159            return 0;
160        }
161        // the maximum numerator value is `u64::MAX + 1`
162        u64::try_from(
163            u128::from(self.total_size - self.max_size) * spread(range)
164                / (u128::from(u64::MAX) + 1),
165        )
166        .expect("should not overflow, denominator was `u64::MAX+1`")
167    }
168
169    /// The estimated size (in bytes) of a database segment containing `range` keys.
170    fn size(&self, range: &RangeInclusive<u64>) -> u64 {
171        if self.total_size == 0 {
172            return 0;
173        }
174        // the maximum numerator value is `u64::MAX + 1`
175        u64::try_from(u128::from(self.total_size) * spread(range) / (u128::from(u64::MAX) + 1))
176            .expect("should not overflow, denominator was `u64::MAX+1`")
177    }
178
179    fn add(&mut self, size: u64, range: &RangeInclusive<u64>) {
180        // Assumption: `size` is typically much smaller than `spread(range)`. The spread is some
181        // fraction of `u64` (the full possible key-space), but no SST file is anywhere close to
182        // `u64::MAX` bytes.
183
184        // Scale size to full range:
185        let scaled_size =
186            u64::try_from(u128::from(size) * (u128::from(u64::MAX) + 1) / spread(range))
187                .unwrap_or(u64::MAX);
188        self.total_size = self.total_size.saturating_add(scaled_size);
189        self.max_size = self.max_size.max(scaled_size);
190    }
191}
192
193fn total_duplication_size(duplication: &IntervalMap<Option<DuplicationInfo>>) -> u64 {
194    duplication
195        .iter()
196        .flat_map(|(range, info)| Some((range, info.as_ref()?)))
197        .map(|(range, info)| info.duplication(&range))
198        .sum()
199}
200
201type MergeSegments = Vec<SmallVec<[usize; 1]>>;
202
203/// Computes the set of merge segments that should be run to compact the given compactables.
204///
205/// Returns both the mergeable segments and the actual number of merge segments that were created.
206pub fn get_merge_segments<T: Compactable>(
207    compactables: &[T],
208    config: &CompactConfig,
209) -> (MergeSegments, usize) {
210    // Process all compactables in reverse order.
211    // For each compactable, find the smallest set of compactables that overlaps with it and matches
212    // the conditions.
213    // To find the set:
214    // - Set the current range to the range of the first unused compactable.
215    // - When the set matches the conditions, add the set as merge job, mark all used compactables
216    //   and continue.
217    // - Find the next unused compactable that overlaps with the current range.
218    // - If the range need to be extended, restart the search with the new range.
219    // - If the compactable is within the range, add it to the current set.
220    // - If the set is too large, mark the starting compactable as used and continue with the next
221
222    let mut unused_compactables = compactables.iter().collect::<Vec<_>>();
223    let mut used_compactables = vec![false; compactables.len()];
224
225    let mut merge_segments: MergeSegments = Vec::new();
226    let mut real_merge_segments = 0;
227
228    // Iterate in reverse order to process the compactables from the end.
229    // That's the order in which compactables are read, so we need to keep that order.
230    'outer: while let Some(start_compactable) = unused_compactables.pop() {
231        let start_index = unused_compactables.len();
232        if used_compactables[start_index] {
233            continue;
234        }
235        if real_merge_segments >= config.max_merge_segment_count {
236            // We have reached the maximum number of merge jobs, so we stop here.
237            break;
238        }
239        let start_compactable_range = start_compactable.range();
240        let start_compactable_size = start_compactable.size();
241        let mut current_range = start_compactable_range.clone();
242
243        // We might need to restart the search if we need to extend the range.
244        'search: loop {
245            let mut current_set = smallvec![start_index];
246            let mut current_size = start_compactable_size;
247            let mut duplication = IntervalMap::<Option<DuplicationInfo>>::new();
248            duplication.update(start_compactable_range.clone(), |dup_info| {
249                dup_info
250                    .get_or_insert_default()
251                    .add(start_compactable_size, &start_compactable_range);
252            });
253            let mut current_skip = 0;
254
255            // We will capture compactables in the current_range until we find a optimal merge
256            // segment or are limited by size or count.
257            loop {
258                // Early exit if we have found an optimal merge segment.
259                let duplication_size = total_duplication_size(&duplication);
260                let optimal_merge_job = current_set.len() >= config.optimal_merge_count
261                    && duplication_size >= config.optimal_merge_duplication_bytes;
262                if optimal_merge_job {
263                    for &i in current_set.iter() {
264                        used_compactables[i] = true;
265                    }
266                    current_set.reverse();
267                    merge_segments.push(current_set);
268                    real_merge_segments += 1;
269                    continue 'outer;
270                }
271
272                // If we are limited by size or count, we might also create a merge segment if it's
273                // within the limits.
274                let valid_merge_job = current_set.len() >= config.min_merge_count
275                    && duplication_size >= config.min_merge_duplication_bytes;
276                let mut end_job =
277                    |mut current_set: SmallVec<[usize; 1]>, used_compactables: &mut Vec<bool>| {
278                        if valid_merge_job {
279                            for &i in current_set.iter() {
280                                used_compactables[i] = true;
281                            }
282                            current_set.reverse();
283                            merge_segments.push(current_set);
284                            real_merge_segments += 1;
285                        } else {
286                            merge_segments.push(smallvec![start_index]);
287                        }
288                    };
289
290                // Check if we run into the count or size limit.
291                if current_set.len() >= config.max_merge_count
292                    || current_size >= config.max_merge_bytes
293                {
294                    // The set is so large so we can't add more compactables to it.
295                    end_job(current_set, &mut used_compactables);
296                    continue 'outer;
297                }
298
299                // Find the next compactable that overlaps with the current range.
300                let Some((next_index, compactable)) = unused_compactables
301                    .iter()
302                    .enumerate()
303                    .rev()
304                    .skip(current_skip)
305                    .find(|(i, compactable)| {
306                        if used_compactables[*i] {
307                            return false;
308                        }
309                        let range = compactable.range();
310                        is_overlapping(&current_range, &range)
311                    })
312                else {
313                    // There are no more compactables that overlap with the current range.
314                    end_job(current_set, &mut used_compactables);
315                    continue 'outer;
316                };
317                current_skip = unused_compactables.len() - next_index;
318
319                // Check if we run into the size limit.
320                let size = compactable.size();
321                if current_size + size > config.max_merge_bytes {
322                    // The next compactable is too large to be added to the current set.
323                    end_job(current_set, &mut used_compactables);
324                    continue 'outer;
325                }
326
327                // Check if the next compactable is larger than the current range. We need to
328                // restart from beginning here as there could be previously skipped compactables
329                // that are within the larger range.
330                let range = compactable.range();
331                if extend_range(&mut current_range, &range) {
332                    // The range was extended, so we need to restart the search.
333                    continue 'search;
334                }
335
336                // The next compactable is within the current range, so we can add it to the current
337                // set.
338                current_set.push(next_index);
339                current_size += size;
340                duplication.update(range.clone(), |dup_info| {
341                    dup_info.get_or_insert_default().add(size, &range);
342                });
343            }
344        }
345    }
346
347    while merge_segments.last().is_some_and(|s| s.len() == 1) {
348        // Remove segments that only contain a single compactable.
349        merge_segments.pop();
350    }
351
352    // Reverse it since we processed in reverse order.
353    merge_segments.reverse();
354
355    // Remove single compectable segments that don't overlap with previous segments. We don't need
356    // to touch them.
357    let mut used_ranges = IntervalMap::<bool>::new();
358    merge_segments.retain(|segment| {
359        // Remove a single element segments which doesn't overlap with previous used ranges.
360        if segment.len() == 1 {
361            let range = compactables[segment[0]].range();
362            if !used_ranges.iter_intersecting(range).any(|(_, v)| *v) {
363                return false;
364            }
365        }
366        // Mark the ranges of the segment as used.
367        for i in segment {
368            let range = compactables[*i].range();
369            used_ranges.replace(range, true);
370        }
371        true
372    });
373
374    (merge_segments, real_merge_segments)
375}
376
377#[cfg(test)]
378mod tests {
379    use std::{
380        fmt::Debug,
381        mem::{replace, swap},
382    };
383
384    use rand::{Rng, SeedableRng, seq::SliceRandom};
385
386    use super::*;
387
388    struct TestCompactable {
389        range: RangeInclusive<u64>,
390        size: u64,
391    }
392
393    impl Compactable for TestCompactable {
394        fn range(&self) -> RangeInclusive<u64> {
395            self.range.clone()
396        }
397
398        fn size(&self) -> u64 {
399            self.size
400        }
401    }
402
403    fn compact<const N: usize>(
404        ranges: [RangeInclusive<u64>; N],
405        config: &CompactConfig,
406    ) -> Vec<Vec<usize>> {
407        let compactables = ranges
408            .into_iter()
409            .map(|range| TestCompactable { range, size: 100 })
410            .collect::<Vec<_>>();
411        let (jobs, _) = get_merge_segments(&compactables, config);
412        jobs.into_iter()
413            .map(|job| job.into_iter().collect())
414            .collect()
415    }
416
417    #[test]
418    fn test_compaction_jobs_by_count() {
419        let merge_jobs = compact(
420            [
421                0..=10,
422                10..=30,
423                9..=13,
424                0..=30,
425                40..=44,
426                41..=42,
427                41..=47,
428                90..=100,
429                30..=40,
430            ],
431            &CompactConfig {
432                min_merge_count: 2,
433                optimal_merge_count: 3,
434                max_merge_count: 4,
435                max_merge_bytes: u64::MAX,
436                min_merge_duplication_bytes: 0,
437                optimal_merge_duplication_bytes: 0,
438                max_merge_segment_count: usize::MAX,
439            },
440        );
441        assert_eq!(merge_jobs, vec![vec![1, 2, 3], vec![5, 6, 8]]);
442    }
443
444    #[test]
445    fn test_compaction_jobs_by_size() {
446        let merge_jobs = compact(
447            [
448                0..=10,
449                10..=30,
450                9..=13,
451                0..=30,
452                40..=44,
453                41..=42,
454                41..=47,
455                90..=100,
456                30..=40,
457            ],
458            &CompactConfig {
459                min_merge_count: 2,
460                optimal_merge_count: 2,
461                max_merge_count: usize::MAX,
462                max_merge_bytes: 300,
463                min_merge_duplication_bytes: 0,
464                optimal_merge_duplication_bytes: u64::MAX,
465                max_merge_segment_count: usize::MAX,
466            },
467        );
468        assert_eq!(merge_jobs, vec![vec![1, 2, 3], vec![5, 6, 8]]);
469    }
470
471    #[test]
472    fn test_compaction_jobs_full() {
473        let merge_jobs = compact(
474            [
475                0..=10,
476                10..=30,
477                9..=13,
478                0..=30,
479                40..=44,
480                41..=42,
481                41..=47,
482                90..=100,
483                30..=40,
484            ],
485            &CompactConfig {
486                min_merge_count: 2,
487                optimal_merge_count: usize::MAX,
488                max_merge_count: usize::MAX,
489                max_merge_bytes: u64::MAX,
490                min_merge_duplication_bytes: 0,
491                optimal_merge_duplication_bytes: u64::MAX,
492                max_merge_segment_count: usize::MAX,
493            },
494        );
495        assert_eq!(merge_jobs, vec![vec![0, 1, 2, 3, 4, 5, 6, 8]]);
496    }
497
498    #[test]
499    fn test_compaction_jobs_big() {
500        let merge_jobs = compact(
501            [
502                0..=10,
503                10..=30,
504                9..=13,
505                0..=30,
506                40..=44,
507                41..=42,
508                41..=47,
509                90..=100,
510                30..=40,
511            ],
512            &CompactConfig {
513                min_merge_count: 2,
514                optimal_merge_count: 7,
515                max_merge_count: usize::MAX,
516                max_merge_bytes: u64::MAX,
517                min_merge_duplication_bytes: 0,
518                optimal_merge_duplication_bytes: 0,
519                max_merge_segment_count: usize::MAX,
520            },
521        );
522        assert_eq!(merge_jobs, vec![vec![1, 2, 3, 4, 5, 6, 8]]);
523    }
524
525    #[test]
526    fn test_compaction_jobs_small() {
527        let merge_jobs = compact(
528            [
529                0..=10,
530                10..=30,
531                9..=13,
532                0..=30,
533                40..=44,
534                41..=42,
535                41..=47,
536                90..=100,
537                30..=40,
538            ],
539            &CompactConfig {
540                min_merge_count: 2,
541                optimal_merge_count: 2,
542                max_merge_count: usize::MAX,
543                max_merge_bytes: u64::MAX,
544                min_merge_duplication_bytes: 0,
545                optimal_merge_duplication_bytes: 0,
546                max_merge_segment_count: usize::MAX,
547            },
548        );
549        assert_eq!(
550            merge_jobs,
551            vec![vec![0, 1], vec![2, 3], vec![4, 5], vec![6, 8]]
552        );
553    }
554
555    pub fn debug_print_compactables<T: Compactable>(compactables: &[T], max_key: u64) {
556        const WIDTH: usize = 128;
557        let char_width: u64 = max_key / WIDTH as u64;
558        for (i, c) in compactables.iter().enumerate() {
559            let range = c.range();
560            let size = c.size();
561            let start = usize::try_from(range.start() / char_width).unwrap();
562            let end = usize::try_from(range.end() / char_width).unwrap();
563            let mut line = format!("{i:>3} | ");
564            for j in 0..WIDTH {
565                if j >= start && j <= end {
566                    line.push('█');
567                } else {
568                    line.push(' ');
569                }
570            }
571            println!("{line} | {size:>6}");
572        }
573    }
574
575    #[test]
576    fn simulate_compactions() {
577        const KEY_RANGE: u64 = 10000;
578        const WARM_KEY_COUNT: usize = 100;
579        const INITIAL_CHUNK_SIZE: usize = 100;
580        const ITERATIONS: usize = 100;
581
582        let mut rnd = rand::rngs::SmallRng::from_seed([0; 32]);
583        let mut keys = (0..KEY_RANGE).collect::<Vec<_>>();
584        keys.shuffle(&mut rnd);
585
586        let mut batch_index = 0;
587        let mut containers = keys
588            .chunks(INITIAL_CHUNK_SIZE)
589            .map(|keys| Container::new(batch_index, keys.to_vec()))
590            .collect::<Vec<_>>();
591
592        let mut warm_keys = (0..WARM_KEY_COUNT)
593            .map(|_| {
594                let i = rnd.random_range(0..keys.len());
595                keys.swap_remove(i)
596            })
597            .collect::<Vec<_>>();
598
599        let mut number_of_compactions = 0;
600
601        for _ in 0..ITERATIONS {
602            let total_size = containers.iter().map(|c| c.keys.len()).sum::<usize>();
603            let metrics = compute_metrics(&containers, 0..=KEY_RANGE);
604            debug_print_compactables(&containers, KEY_RANGE);
605            println!(
606                "size: {}, coverage: {}, overlap: {}, duplication: {}, items: {}",
607                total_size,
608                metrics.coverage,
609                metrics.overlap,
610                metrics.duplication,
611                containers.len()
612            );
613
614            assert!(containers.len() < 400);
615            // assert!(metrics.duplication < 4.0);
616
617            let config = CompactConfig {
618                max_merge_count: 16,
619                min_merge_count: 2,
620                optimal_merge_count: 4,
621                max_merge_bytes: 5000,
622                min_merge_duplication_bytes: 500,
623                optimal_merge_duplication_bytes: 1000,
624                max_merge_segment_count: 4,
625            };
626            let (jobs, _) = get_merge_segments(&containers, &config);
627            if !jobs.is_empty() {
628                println!("{jobs:?}");
629
630                batch_index += 1;
631                do_compact(&mut containers, jobs, batch_index);
632                number_of_compactions += 1;
633
634                let new_metrics = compute_metrics(&containers, 0..=KEY_RANGE);
635                println!(
636                    "Compaction done: coverage: {} ({}), overlap: {} ({}), duplication: {} ({})",
637                    new_metrics.coverage,
638                    new_metrics.coverage - metrics.coverage,
639                    new_metrics.overlap,
640                    new_metrics.overlap - metrics.overlap,
641                    new_metrics.duplication,
642                    new_metrics.duplication - metrics.duplication
643                );
644            } else {
645                println!("No compaction needed");
646            }
647
648            // Modify warm keys
649            batch_index += 1;
650            let pieces = rnd.random_range(1..4);
651            for chunk in warm_keys.chunks(warm_keys.len().div_ceil(pieces)) {
652                containers.push(Container::new(batch_index, chunk.to_vec()));
653            }
654
655            // Change some warm keys
656            let changes = rnd.random_range(0..100);
657            for _ in 0..changes {
658                let i = rnd.random_range(0..warm_keys.len());
659                let j = rnd.random_range(0..keys.len());
660                swap(&mut warm_keys[i], &mut keys[j]);
661            }
662        }
663        println!("Number of compactions: {number_of_compactions}");
664
665        let metrics = compute_metrics(&containers, 0..=KEY_RANGE);
666        assert!(number_of_compactions < 30);
667        assert!(containers.len() < 30);
668        assert!(metrics.duplication < 0.5);
669    }
670
671    struct Container {
672        batch_index: usize,
673        keys: Vec<u64>,
674    }
675
676    impl Container {
677        fn new(batch_index: usize, mut keys: Vec<u64>) -> Self {
678            keys.sort_unstable();
679            Self { batch_index, keys }
680        }
681    }
682
683    impl Compactable for Container {
684        fn range(&self) -> RangeInclusive<u64> {
685            (self.keys[0])..=(*self.keys.last().unwrap())
686        }
687
688        fn size(&self) -> u64 {
689            self.keys.len() as u64
690        }
691    }
692
693    impl Debug for Container {
694        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
695            let (l, r) = self.range().into_inner();
696            write!(
697                f,
698                "#{} {}b {l} - {r} ({})",
699                self.batch_index,
700                self.keys.len(),
701                r - l
702            )
703        }
704    }
705
706    fn do_compact(containers: &mut Vec<Container>, segments: MergeSegments, batch_index: usize) {
707        let total_size = containers.iter().map(|c| c.keys.len()).sum::<usize>();
708        for merge_job in segments {
709            if merge_job.len() < 2 {
710                let container = replace(
711                    &mut containers[merge_job[0]],
712                    Container {
713                        batch_index: 0,
714                        keys: Default::default(),
715                    },
716                );
717                containers.push(container);
718            } else {
719                let mut keys = Vec::new();
720                for i in merge_job {
721                    keys.append(&mut containers[i].keys);
722                }
723                keys.sort_unstable();
724                keys.dedup();
725                containers.extend(keys.chunks(1000).map(|keys| Container {
726                    batch_index,
727                    keys: keys.to_vec(),
728                }));
729            }
730        }
731
732        containers.retain(|c| !c.keys.is_empty());
733        let total_size2 = containers.iter().map(|c| c.keys.len()).sum::<usize>();
734        println!("Compaction done: {total_size} -> {total_size2}",);
735    }
736}