turbo_tasks/
no_move_vec.rs

1use std::{
2    ptr::null_mut,
3    slice::from_raw_parts_mut,
4    sync::{
5        Mutex,
6        atomic::{AtomicPtr, Ordering},
7    },
8};
9
10const BUCKETS: usize = (usize::BITS + 1) as usize;
11
12/// An `Option`-like type that guarantees that a fully zeroed value is a valid
13/// `None` variant.
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
15#[repr(u8)]
16enum COption<T> {
17    // TODO(alexkirsz) We need a way to guarantee that a fully zeroed value is a
18    // valid `None` variant. This is theoretically possible when the wrapped
19    // type has no valid value that can be represented by all zeros, but there
20    // is no way to enforce this at the type level. For now, we just use a custom
21    // option type with explicit discriminant for the `None` variant.
22    // The issue with this implementation is that it disables niche optimization.
23    None = 0,
24    Some(T),
25}
26
27impl<T> Default for COption<T> {
28    fn default() -> Self {
29        Self::None
30    }
31}
32
33impl<T> COption<T> {
34    /// Returns a slice of the given size filled with the `None` variant.
35    fn new_none_slice(size: usize) -> Box<[Self]> {
36        let slice = Box::<[COption<T>]>::new_zeroed_slice(size);
37        // Safety:
38        // We know that a zeroed COption<T> is a valid COption::None value.
39        unsafe { slice.assume_init() }
40    }
41
42    /// Returns a reference to the contained value, or `None` if it is `None`.
43    fn as_option_ref(&self) -> Option<&T> {
44        match self {
45            COption::None => None,
46            COption::Some(t) => Some(t),
47        }
48    }
49
50    /// Takes the value out of the option, leaving a `None` in its place.
51    fn take(&mut self) -> Option<T> {
52        match std::mem::take(self) {
53            COption::None => None,
54            COption::Some(t) => Some(t),
55        }
56    }
57}
58
59impl<T: Default> COption<T> {
60    /// Returns a slice of the given size filled with the `Some` variant and
61    /// filled with default values.
62    fn new_default_slice(size: usize) -> Box<[Self]> {
63        (0..size)
64            .map(|_| COption::Some(Default::default()))
65            .collect::<Vec<_>>()
66            .into_boxed_slice()
67    }
68}
69
70pub struct NoMoveVec<T, const INITIAL_CAPACITY_BITS: u32 = 6> {
71    buckets: [(AtomicPtr<COption<T>>, Mutex<()>); BUCKETS],
72}
73
74fn get_bucket_index<const INITIAL_CAPACITY_BITS: u32>(idx: usize) -> u32 {
75    (usize::BITS - idx.leading_zeros()).saturating_sub(INITIAL_CAPACITY_BITS)
76}
77
78fn get_bucket_size<const INITIAL_CAPACITY_BITS: u32>(bucket_index: u32) -> usize {
79    if bucket_index != 0 {
80        1 << (bucket_index + INITIAL_CAPACITY_BITS - 1)
81    } else {
82        1 << INITIAL_CAPACITY_BITS
83    }
84}
85
86fn get_index_in_bucket<const INITIAL_CAPACITY_BITS: u32>(idx: usize, bucket_index: u32) -> usize {
87    if bucket_index != 0 {
88        idx ^ (1 << (bucket_index + INITIAL_CAPACITY_BITS - 1))
89    } else {
90        idx
91    }
92}
93
94/// Allocates a new bucket of `COption<T>`s, all initialized to `None`.
95fn allocate_bucket<const INITIAL_CAPACITY_BITS: u32, T>(bucket_index: u32) -> *mut COption<T> {
96    let size = get_bucket_size::<INITIAL_CAPACITY_BITS>(bucket_index);
97    let slice = COption::<T>::new_none_slice(size);
98    Box::into_raw(slice) as *mut COption<T>
99}
100
101/// Allocates a new bucket of `COption<T>`s, all initialized to `None`.
102fn allocate_default_bucket<const INITIAL_CAPACITY_BITS: u32, T: Default>(
103    bucket_index: u32,
104) -> *mut COption<T> {
105    let size = get_bucket_size::<INITIAL_CAPACITY_BITS>(bucket_index);
106    let slice = COption::<T>::new_default_slice(size);
107    Box::into_raw(slice) as *mut COption<T>
108}
109
110impl<T, const INITIAL_CAPACITY_BITS: u32> Default for NoMoveVec<T, INITIAL_CAPACITY_BITS> {
111    fn default() -> Self {
112        Self::new()
113    }
114}
115
116impl<T, const INITIAL_CAPACITY_BITS: u32> NoMoveVec<T, INITIAL_CAPACITY_BITS> {
117    pub fn new() -> Self {
118        let mut buckets = [null_mut(); BUCKETS];
119        buckets[0] = allocate_bucket::<INITIAL_CAPACITY_BITS, T>(0);
120        let buckets = buckets.map(|p| (AtomicPtr::new(p), Mutex::new(())));
121        NoMoveVec { buckets }
122    }
123
124    pub fn get(&self, idx: usize) -> Option<&T> {
125        let bucket_idx = get_bucket_index::<INITIAL_CAPACITY_BITS>(idx);
126        let bucket_ptr = unsafe { self.buckets.get_unchecked(bucket_idx as usize) }
127            .0
128            .load(Ordering::Acquire);
129        if bucket_ptr.is_null() {
130            return None;
131        }
132        let index = get_index_in_bucket::<INITIAL_CAPACITY_BITS>(idx, bucket_idx);
133        unsafe { &*bucket_ptr.add(index) }.as_option_ref()
134    }
135
136    /// # Safety
137    /// There must not be a concurrent operation to this idx
138    pub unsafe fn take(&self, idx: usize) -> Option<T> {
139        let bucket_idx = get_bucket_index::<INITIAL_CAPACITY_BITS>(idx);
140        let bucket = unsafe { self.buckets.get_unchecked(bucket_idx as usize) };
141        let bucket_ptr = bucket.0.load(Ordering::Acquire);
142        if bucket_ptr.is_null() {
143            return None;
144        }
145        let index = get_index_in_bucket::<INITIAL_CAPACITY_BITS>(idx, bucket_idx);
146        let item = unsafe { &mut *bucket_ptr.add(index) };
147        let item = item.take();
148        // To sync with any acquire load of the bucket ptr
149        bucket.0.store(bucket_ptr, Ordering::Release);
150        item
151    }
152
153    /// # Safety
154    /// There must not be a concurrent operation to this idx
155    pub unsafe fn insert(&self, idx: usize, value: T) -> &T {
156        let bucket_idx = get_bucket_index::<INITIAL_CAPACITY_BITS>(idx);
157        let bucket = unsafe { self.buckets.get_unchecked(bucket_idx as usize) };
158        // SAFETY: This is safe to be relaxed as the bucket will never become null
159        // again. We perform a acquire load when it's null.
160        let mut bucket_ptr = bucket.0.load(Ordering::Relaxed);
161        if bucket_ptr.is_null() {
162            bucket_ptr = bucket.0.load(Ordering::Acquire);
163            if bucket_ptr.is_null() {
164                let lock = bucket.1.lock();
165                let guarded_bucket_ptr = bucket.0.load(Ordering::Acquire);
166                if guarded_bucket_ptr.is_null() {
167                    let new_bucket = allocate_bucket::<INITIAL_CAPACITY_BITS, T>(bucket_idx);
168                    bucket_ptr = match bucket.0.compare_exchange(
169                        null_mut(),
170                        new_bucket,
171                        Ordering::AcqRel,
172                        Ordering::Relaxed,
173                    ) {
174                        Ok(_) => new_bucket,
175                        Err(current_bucket) => {
176                            drop(unsafe { Box::from_raw(new_bucket) });
177                            current_bucket
178                        }
179                    };
180                    drop(lock);
181                } else {
182                    bucket_ptr = guarded_bucket_ptr;
183                }
184            }
185        }
186        let index = get_index_in_bucket::<INITIAL_CAPACITY_BITS>(idx, bucket_idx);
187        let item = unsafe { &mut *bucket_ptr.add(index) };
188        *item = COption::Some(value);
189        // To sync with any acquire load of the bucket ptr
190        bucket.0.store(bucket_ptr, Ordering::Release);
191        item.as_option_ref().unwrap()
192    }
193
194    /// # Safety
195    /// There must not be a concurrent operation to this idx
196    pub unsafe fn remove(&self, idx: usize) {
197        let bucket_idx = get_bucket_index::<INITIAL_CAPACITY_BITS>(idx);
198        let bucket = unsafe { self.buckets.get_unchecked(bucket_idx as usize) };
199        let bucket_ptr = bucket.0.load(Ordering::Acquire);
200        if bucket_ptr.is_null() {
201            return;
202        }
203        let index = get_index_in_bucket::<INITIAL_CAPACITY_BITS>(idx, bucket_idx);
204        let item = unsafe { &mut *bucket_ptr.add(index) };
205        *item = COption::None;
206        // To sync with any acquire load of the bucket ptr
207        bucket.0.store(bucket_ptr, Ordering::Release);
208    }
209}
210
211impl<T: Default, const INITIAL_CAPACITY_BITS: u32> NoMoveVec<T, INITIAL_CAPACITY_BITS> {
212    pub fn new_init_default() -> Self {
213        let mut buckets = [null_mut(); BUCKETS];
214        buckets[0] = allocate_default_bucket::<INITIAL_CAPACITY_BITS, T>(0);
215        let buckets = buckets.map(|p| (AtomicPtr::new(p), Mutex::new(())));
216        NoMoveVec { buckets }
217    }
218
219    pub fn get_init_default(&self, idx: usize) -> &T {
220        let bucket_idx = get_bucket_index::<INITIAL_CAPACITY_BITS>(idx);
221        let bucket = unsafe { self.buckets.get_unchecked(bucket_idx as usize) };
222        // SAFETY: This is safe to be relaxed as the bucket will never become null
223        // again. We perform a acquire load when it's null.
224        let mut bucket_ptr = bucket.0.load(Ordering::Relaxed);
225        if bucket_ptr.is_null() {
226            bucket_ptr = bucket.0.load(Ordering::Acquire);
227            if bucket_ptr.is_null() {
228                let lock = bucket.1.lock();
229                let guarded_bucket_ptr = bucket.0.load(Ordering::Acquire);
230                if guarded_bucket_ptr.is_null() {
231                    let new_bucket =
232                        allocate_default_bucket::<INITIAL_CAPACITY_BITS, T>(bucket_idx);
233                    bucket_ptr = match bucket.0.compare_exchange(
234                        null_mut(),
235                        new_bucket,
236                        Ordering::AcqRel,
237                        Ordering::Relaxed,
238                    ) {
239                        Ok(_) => new_bucket,
240                        Err(current_bucket) => {
241                            drop(unsafe { Box::from_raw(new_bucket) });
242                            current_bucket
243                        }
244                    };
245                    drop(lock);
246                } else {
247                    bucket_ptr = guarded_bucket_ptr;
248                }
249            }
250        }
251        let index = get_index_in_bucket::<INITIAL_CAPACITY_BITS>(idx, bucket_idx);
252        let value = unsafe { &*bucket_ptr.add(index) }.as_option_ref();
253        value.expect("get_init_default must not be combined with normal insert")
254    }
255}
256
257impl<T, const INITIAL_CAPACITY_BITS: u32> Drop for NoMoveVec<T, INITIAL_CAPACITY_BITS> {
258    fn drop(&mut self) {
259        for (bucket_index, (bucket, _)) in self.buckets.iter_mut().enumerate() {
260            if bucket_index < (usize::BITS + 1 - INITIAL_CAPACITY_BITS) as usize {
261                let bucket_size = get_bucket_size::<INITIAL_CAPACITY_BITS>(bucket_index as u32);
262                let bucket_ptr = *bucket.get_mut();
263
264                if !bucket_ptr.is_null() {
265                    drop(unsafe { Box::from_raw(from_raw_parts_mut(bucket_ptr, bucket_size)) });
266                }
267            }
268        }
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use super::NoMoveVec;
275
276    #[test]
277    fn basic_operations() {
278        let v = NoMoveVec::<(usize, usize)>::new();
279        assert_eq!(v.get(0), None);
280        assert_eq!(v.get(1), None);
281        assert_eq!(v.get(8), None);
282        assert_eq!(v.get(9), None);
283        assert_eq!(v.get(15), None);
284        assert_eq!(v.get(16), None);
285        assert_eq!(v.get(100), None);
286        assert_eq!(v.get(1000), None);
287
288        for i in 0..1000 {
289            unsafe {
290                v.insert(i, (i, i));
291            }
292            assert_eq!(v.get(i), Some(&(i, i)));
293        }
294        for i in 0..1000 {
295            assert_eq!(v.get(i), Some(&(i, i)));
296        }
297        assert_eq!(v.get(1001), None);
298
299        unsafe {
300            v.insert(1000000, (0, 0));
301        }
302        assert_eq!(v.get(1000000), Some(&(0, 0)));
303        assert_eq!(v.get(10000), None);
304    }
305}