Skip to main content

turbo_tasks_fs/
rope.rs

1use std::{
2    borrow::Cow,
3    cmp::{Ordering, min},
4    fmt,
5    io::{BufRead, Read, Result as IoResult, Write},
6    mem,
7    ops::{AddAssign, Deref},
8    pin::Pin,
9    task::{Context as TaskContext, Poll},
10};
11
12use RopeElem::{Local, Shared};
13use anyhow::{Context, Result};
14use bincode::{
15    Decode, Encode,
16    de::{Decoder, read::Reader as _},
17    enc::{Encoder, write::Writer as _},
18    error::{DecodeError, EncodeError},
19    impl_borrow_decode,
20};
21use bytes::Bytes;
22use futures::Stream;
23use tokio::io::{AsyncRead, ReadBuf};
24use triomphe::Arc;
25use turbo_tasks_hash::{DeterministicHash, DeterministicHasher};
26
27static EMPTY_BUF: &[u8] = &[];
28
29/// An efficient structure for sharing bytes/strings between multiple sources.
30///
31/// Cloning a Rope is extremely cheap (Arc and usize), and
32/// sharing the contents of one Rope can be done by just cloning an Arc.
33///
34/// Ropes are immutable, in order to construct one see [RopeBuilder].
35#[turbo_tasks::value(shared, serialization = "custom", eq = "manual", operation)]
36#[derive(Clone, Debug, Default)]
37pub struct Rope {
38    /// Total length of all held bytes.
39    length: usize,
40
41    /// A shareable container holding the rope's bytes.
42    #[turbo_tasks(debug_ignore, trace_ignore)]
43    data: InnerRope,
44}
45
46/// An Arc container for ropes. This indirection allows for easily sharing the
47/// contents between Ropes (and also RopeBuilders/RopeReaders).
48#[derive(Clone, Debug)]
49struct InnerRope(Arc<Vec<RopeElem>>);
50
51/// Differentiates the types of stored bytes in a rope.
52#[derive(Clone, Debug)]
53enum RopeElem {
54    /// Local bytes are owned directly by this rope.
55    Local(Bytes),
56
57    /// Shared holds the Arc container of another rope.
58    Shared(InnerRope),
59}
60
61/// RopeBuilder provides a mutable container to append bytes/strings. This can
62/// also append _other_ Rope instances cheaply, allowing efficient sharing of
63/// the contents without a full clone of the bytes.
64#[derive(Default, Debug)]
65pub struct RopeBuilder {
66    /// Total length of all previously committed bytes.
67    length: usize,
68
69    /// Immutable bytes references that have been appended to this builder. The
70    /// rope is the combination of all these committed bytes.
71    committed: Vec<RopeElem>,
72
73    /// Stores bytes that have been pushed, but are not yet committed. This is
74    /// either an attempt to push a static lifetime, or a push of owned bytes.
75    /// When the builder is flushed, we will commit these bytes into a real
76    /// Bytes instance.
77    uncommitted: Uncommitted,
78}
79
80/// Stores any bytes which have been pushed, but we haven't decided to commit
81/// yet. Uncommitted bytes allow us to build larger buffers out of possibly
82/// small pushes.
83#[derive(Default)]
84enum Uncommitted {
85    #[default]
86    None,
87
88    /// Stores our attempt to push static lifetime bytes into the rope. If we
89    /// build the Rope or concatenate another Rope, we can commit a static
90    /// Bytes reference and save memory. If not, we'll concatenate this into
91    /// writable bytes to be committed later.
92    Static(&'static [u8]),
93
94    /// Mutable bytes collection where non-static/non-shared bytes are written.
95    /// This builds until the next time a static or shared bytes is
96    /// appended, in which case we split the buffer and commit. Finishing
97    /// the builder also commits these bytes.
98    Owned(Vec<u8>),
99}
100
101impl Rope {
102    pub fn len(&self) -> usize {
103        self.length
104    }
105
106    pub fn is_empty(&self) -> bool {
107        self.length == 0
108    }
109
110    /// Returns a [Read]/[AsyncRead]/[Iterator] instance over all bytes.
111    pub fn read(&self) -> RopeReader<'_> {
112        RopeReader::new(&self.data, 0)
113    }
114
115    /// Returns a String instance of all bytes.
116    pub fn to_str(&self) -> Result<Cow<'_, str>> {
117        self.data.to_str(self.length)
118    }
119
120    /// Returns a slice of all bytes
121    pub fn to_bytes(&self) -> Cow<'_, [u8]> {
122        self.data.to_bytes(self.length)
123    }
124
125    pub fn into_bytes(self) -> Bytes {
126        self.data.into_bytes(self.length)
127    }
128}
129
130impl From<Vec<u8>> for Rope {
131    fn from(mut bytes: Vec<u8>) -> Self {
132        bytes.shrink_to_fit();
133        Rope::from(Bytes::from(bytes))
134    }
135}
136
137impl From<String> for Rope {
138    fn from(mut bytes: String) -> Self {
139        bytes.shrink_to_fit();
140        Rope::from(Bytes::from(bytes))
141    }
142}
143
144impl<T: Into<Bytes>> From<T> for Rope {
145    default fn from(bytes: T) -> Self {
146        let bytes = bytes.into();
147        // We can't have an InnerRope which contains an empty Local section.
148        if bytes.is_empty() {
149            Default::default()
150        } else {
151            Rope {
152                length: bytes.len(),
153                data: InnerRope(Arc::from(vec![Local(bytes)])),
154            }
155        }
156    }
157}
158
159impl RopeBuilder {
160    /// Push owned bytes into the Rope.
161    ///
162    /// If possible use [push_static_bytes] or `+=` operation instead, as they
163    /// will create a reference to shared memory instead of cloning the bytes.
164    pub fn push_bytes(&mut self, bytes: &[u8]) {
165        if bytes.is_empty() {
166            return;
167        }
168
169        self.uncommitted.push_bytes(bytes);
170    }
171
172    /// Reserve additional capacity for owned bytes in the Rope.
173    ///
174    /// This is useful to call before multiple `push_bytes` calls to avoid
175    /// multiple allocations.
176    pub fn reserve_bytes(&mut self, additional: usize) {
177        self.uncommitted.reserve_bytes(additional);
178    }
179
180    /// Push static lifetime bytes into the Rope.
181    ///
182    /// This is more efficient than pushing owned bytes, because the internal
183    /// data does not need to be copied when the rope is read.
184    pub fn push_static_bytes(&mut self, bytes: &'static [u8]) {
185        if bytes.is_empty() {
186            return;
187        }
188
189        // If the string is smaller than the cost of a Bytes reference (4 usizes), then
190        // it's more efficient to own the bytes in a new buffer. We may be able to reuse
191        // that buffer when more bytes are pushed.
192        if bytes.len() < mem::size_of::<Bytes>() {
193            return self.uncommitted.push_static_bytes(bytes);
194        }
195
196        // We may have pending bytes from a prior push.
197        self.finish();
198
199        self.length += bytes.len();
200        self.committed.push(Local(Bytes::from_static(bytes)));
201    }
202
203    /// Concatenate another Rope instance into our builder.
204    ///
205    /// This is much more efficient than pushing actual bytes, since we can
206    /// share the other Rope's references without copying the underlying data.
207    pub fn concat(&mut self, other: &Rope) {
208        if other.is_empty() {
209            return;
210        }
211
212        // We may have pending bytes from a prior push.
213        self.finish();
214
215        self.length += other.len();
216        self.committed.push(Shared(other.data.clone()));
217    }
218
219    /// Writes any pending bytes into our committed queue. This is called automatically by other
220    /// `RopeBuilder` methods.
221    ///
222    /// This may be called multiple times without issue.
223    fn finish(&mut self) {
224        if let Some(b) = self.uncommitted.finish() {
225            debug_assert!(!b.is_empty(), "must not have empty uncommitted bytes");
226            self.length += b.len();
227            self.committed.push(Local(b));
228        }
229    }
230
231    pub fn len(&self) -> usize {
232        self.length + self.uncommitted.len()
233    }
234
235    pub fn is_empty(&self) -> bool {
236        self.len() == 0
237    }
238
239    /// Constructs our final, immutable Rope instance.
240    pub fn build(mut self) -> Rope {
241        self.finish();
242        Rope {
243            length: self.length,
244            data: InnerRope::from(self.committed),
245        }
246    }
247}
248
249impl From<&'static str> for RopeBuilder {
250    default fn from(bytes: &'static str) -> Self {
251        let mut r = RopeBuilder::default();
252        r += bytes;
253        r
254    }
255}
256
257impl From<Vec<u8>> for RopeBuilder {
258    fn from(bytes: Vec<u8>) -> Self {
259        RopeBuilder {
260            // Directly constructing the Uncommitted allows us to skip copying the bytes.
261            uncommitted: Uncommitted::from(bytes),
262            ..Default::default()
263        }
264    }
265}
266
267impl Write for RopeBuilder {
268    fn write(&mut self, bytes: &[u8]) -> IoResult<usize> {
269        self.push_bytes(bytes);
270        Ok(bytes.len())
271    }
272
273    fn flush(&mut self) -> IoResult<()> {
274        self.finish();
275        Ok(())
276    }
277}
278
279impl AddAssign<&'static str> for RopeBuilder {
280    /// Pushes a reference to static memory onto the rope.
281    ///
282    /// This is more efficient than pushing owned bytes, because the internal
283    /// data does not need to be copied when the rope is read.
284    fn add_assign(&mut self, rhs: &'static str) {
285        self.push_static_bytes(rhs.as_bytes());
286    }
287}
288
289impl AddAssign<&Rope> for RopeBuilder {
290    fn add_assign(&mut self, rhs: &Rope) {
291        self.concat(rhs);
292    }
293}
294
295impl Uncommitted {
296    fn len(&self) -> usize {
297        match self {
298            Uncommitted::None => 0,
299            Uncommitted::Static(s) => s.len(),
300            Uncommitted::Owned(v) => v.len(),
301        }
302    }
303
304    /// Pushes owned bytes, converting the current representation to an Owned if
305    /// it's not already.
306    fn push_bytes(&mut self, bytes: &[u8]) {
307        debug_assert!(!bytes.is_empty(), "must not push empty uncommitted bytes");
308        match self {
309            Self::None => *self = Self::Owned(bytes.to_vec()),
310            Self::Static(s) => {
311                // If we'd previously pushed static bytes, we instead concatenate those bytes
312                // with the new bytes in an attempt to use less memory rather than committing 2
313                // Bytes references (2 * 4 usizes).
314                let v = [s, bytes].concat();
315                *self = Self::Owned(v);
316            }
317            Self::Owned(v) => v.extend(bytes),
318        }
319    }
320
321    /// Reserves additional capacity for owned bytes, converting the current
322    /// representation to an Owned if it's not already.
323    fn reserve_bytes(&mut self, additional: usize) {
324        match self {
325            Self::None => {
326                *self = Self::Owned(Vec::with_capacity(additional));
327            }
328            Self::Static(s) => {
329                let mut v = Vec::with_capacity(s.len() + additional);
330                v.extend_from_slice(s);
331                *self = Self::Owned(v);
332            }
333            Self::Owned(v) => {
334                v.reserve(additional);
335            }
336        }
337    }
338
339    /// Pushes static lifetime bytes, but only if the current representation is
340    /// None. Else, it coverts to an Owned.
341    fn push_static_bytes(&mut self, bytes: &'static [u8]) {
342        debug_assert!(!bytes.is_empty(), "must not push empty uncommitted bytes");
343        match self {
344            // If we've not already pushed static bytes, we attempt to store the bytes for later. If
345            // we push owned bytes or another static bytes, then this attempt will fail and we'll
346            // instead concatenate into a single owned Bytes. But if we don't push anything (build
347            // the Rope), or concatenate another Rope (we can't join our bytes with the InnerRope of
348            // another Rope), we'll be able to commit a static Bytes reference and save overall
349            // memory (a small static Bytes reference is better than a small owned Bytes reference).
350            Self::None => *self = Self::Static(bytes),
351            _ => self.push_bytes(bytes),
352        }
353    }
354
355    /// Converts the current uncommitted bytes into a Bytes, resetting our
356    /// representation to None.
357    fn finish(&mut self) -> Option<Bytes> {
358        match mem::take(self) {
359            Self::None => None,
360            Self::Static(s) => Some(Bytes::from_static(s)),
361            Self::Owned(mut v) => {
362                v.shrink_to_fit();
363                Some(v.into())
364            }
365        }
366    }
367}
368
369impl fmt::Debug for Uncommitted {
370    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
371        match self {
372            Uncommitted::None => f.write_str("None"),
373            Uncommitted::Static(s) => f
374                .debug_tuple("Static")
375                .field(&Bytes::from_static(s))
376                .finish(),
377            Uncommitted::Owned(v) => f
378                .debug_tuple("Owned")
379                .field(&Bytes::from(v.clone()))
380                .finish(),
381        }
382    }
383}
384
385impl DeterministicHash for Rope {
386    /// Ropes with similar contents hash the same, regardless of their
387    /// structure.
388    fn deterministic_hash<H: DeterministicHasher>(&self, state: &mut H) {
389        state.write_usize(self.len());
390        self.data.deterministic_hash(state);
391    }
392}
393
394impl Rope {
395    /// Returns a DeterministicHash impl that only hashes the bytes of the rope (still regardless of
396    /// their structure).
397    ///
398    /// The default (Deterministic)Hash implementation also includes the length of the rope. Be
399    /// careful when using this, as it would case `(Rope("abc"), Rope("def"))` and `(Rope("abcd"),
400    /// Rope("ef"))` to have the same hash. The best usecase is when the rope is the _whole_
401    /// datastructure being hashed and it isn't part of some other structure.
402    pub fn content_hash(&self) -> impl DeterministicHash + '_ {
403        RopeBytesOnlyHash(self)
404    }
405}
406pub struct RopeBytesOnlyHash<'a>(&'a Rope);
407impl DeterministicHash for RopeBytesOnlyHash<'_> {
408    fn deterministic_hash<H: DeterministicHasher>(&self, state: &mut H) {
409        self.0.data.deterministic_hash(state);
410    }
411}
412
413/// Encode as a len + raw bytes format using the encoder's [`bincode::enc::write::Writer`]. Encoding
414/// [`Rope::to_bytes`] instead would be easier, but would require copying to an intermediate buffer.
415///
416/// This len + bytes format is similar to how bincode would normally encode a `&[u8]`:
417/// https://docs.rs/bincode/latest/bincode/spec/index.html#collections
418impl Encode for Rope {
419    fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
420        self.length.encode(encoder)?;
421        let mut reader = self.read();
422        for chunk in &mut reader {
423            encoder.writer().write(chunk)?;
424        }
425
426        Ok(())
427    }
428}
429
430impl<Context> Decode<Context> for Rope {
431    #[allow(clippy::uninit_vec)]
432    fn decode<D: Decoder<Context = Context>>(decoder: &mut D) -> Result<Self, DecodeError> {
433        let length = usize::decode(decoder)?;
434        let mut bytes = Vec::with_capacity(length);
435
436        // SAFETY:
437        // - `bytes` has capacity of `length` already
438        // - `read` writes to (does not read) `bytes` and will return an error if exactly `length`
439        //   bytes is not written, so no uninitialized memory ever escapes this function.
440        // We can't use `MaybeUninit` here because `read` doesn't support it.
441        unsafe {
442            bytes.set_len(length);
443        }
444        // the decoder API requires that we claim a length *before* reading (not after)
445        decoder.claim_bytes_read(length)?;
446        decoder.reader().read(&mut bytes)?;
447
448        Ok(Rope::from(bytes))
449    }
450}
451
452impl_borrow_decode!(Rope);
453
454pub mod ser_as_string {
455    use serde::{Serializer, ser::Error};
456
457    use super::Rope;
458
459    /// Serializes a Rope into a string.
460    pub fn serialize<S: Serializer>(rope: &Rope, serializer: S) -> Result<S::Ok, S::Error> {
461        let s = rope.to_str().map_err(Error::custom)?;
462        serializer.serialize_str(&s)
463    }
464}
465
466pub mod ser_option_as_string {
467    use serde::{Serializer, ser::Error};
468
469    use super::Rope;
470
471    /// Serializes a Rope into a string.
472    pub fn serialize<S: Serializer>(rope: &Option<Rope>, serializer: S) -> Result<S::Ok, S::Error> {
473        if let Some(rope) = rope {
474            let s = rope.to_str().map_err(Error::custom)?;
475            serializer.serialize_some(&s)
476        } else {
477            serializer.serialize_none()
478        }
479    }
480}
481
482impl PartialEq for Rope {
483    // Ropes with similar contents are equals, regardless of their structure.
484    fn eq(&self, other: &Self) -> bool {
485        if self.len() != other.len() {
486            return false;
487        }
488        self.cmp(other) == Ordering::Equal
489    }
490}
491
492impl Eq for Rope {}
493
494impl Ord for Rope {
495    fn cmp(&self, other: &Self) -> Ordering {
496        if Arc::ptr_eq(&self.data, &other.data) {
497            return Ordering::Equal;
498        }
499
500        // Fast path for structurally equal Ropes. With this, we can do memory reference
501        // checks and skip some contents equality.
502        let left = &self.data;
503        let right = &other.data;
504        let len = min(left.len(), right.len());
505        let mut index = 0;
506        while index < len {
507            let a = &left[index];
508            let b = &right[index];
509
510            match a.maybe_cmp(b) {
511                // Bytes or InnerRope point to the same memory, or Bytes are contents equal.
512                Some(Ordering::Equal) => index += 1,
513                // Bytes are not contents equal.
514                Some(ordering) => return ordering,
515                // InnerRopes point to different memory, or the Ropes weren't structurally equal.
516                None => break,
517            }
518        }
519        // If we reach the end of iteration without finding a mismatch (or early
520        // breaking), then we know the ropes are either equal or not equal.
521        if index == len {
522            // We know that any remaining RopeElem in the InnerRope must contain content, so
523            // if either one contains more RopeElem than they cannot be equal.
524            return left.len().cmp(&right.len());
525        }
526
527        // At this point, we need to do slower contents equality. It's possible we'll
528        // still get some memory reference equality for Bytes.
529        let mut left = RopeReader::new(left, index);
530        let mut right = RopeReader::new(right, index);
531        loop {
532            match (left.fill_buf(), right.fill_buf()) {
533                // fill_buf should always return Ok, with either some number of bytes or 0 bytes
534                // when consumed.
535                (Ok(a), Ok(b)) => {
536                    let len = min(a.len(), b.len());
537
538                    // When one buffer is consumed, both must be consumed.
539                    if len == 0 {
540                        return a.len().cmp(&b.len());
541                    }
542
543                    match a[0..len].cmp(&b[0..len]) {
544                        Ordering::Equal => {
545                            left.consume(len);
546                            right.consume(len);
547                        }
548                        ordering => return ordering,
549                    }
550                }
551
552                // If an error is ever returned (which shouldn't happen for us) for either/both,
553                // then we can't prove equality.
554                _ => unreachable!(),
555            }
556        }
557    }
558}
559
560impl PartialOrd for Rope {
561    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
562        Some(self.cmp(other))
563    }
564}
565
566impl From<Vec<u8>> for Uncommitted {
567    fn from(bytes: Vec<u8>) -> Self {
568        if bytes.is_empty() {
569            Uncommitted::None
570        } else {
571            Uncommitted::Owned(bytes)
572        }
573    }
574}
575
576impl InnerRope {
577    /// Returns a String instance of all bytes.
578    fn to_str(&self, len: usize) -> Result<Cow<'_, str>> {
579        match &self[..] {
580            [] => Ok(Cow::Borrowed("")),
581            [Shared(inner)] => inner.to_str(len),
582            [Local(bytes)] => {
583                let utf8 = std::str::from_utf8(bytes);
584                utf8.context("failed to convert rope into string")
585                    .map(Cow::Borrowed)
586            }
587            _ => {
588                let mut read = RopeReader::new(self, 0);
589                let mut string = String::with_capacity(len);
590                let res = read.read_to_string(&mut string);
591                res.context("failed to convert rope into string")?;
592                Ok(Cow::Owned(string))
593            }
594        }
595    }
596
597    /// Returns a slice of all bytes.
598    fn to_bytes(&self, len: usize) -> Cow<'_, [u8]> {
599        match &self[..] {
600            [] => Cow::Borrowed(EMPTY_BUF),
601            [Shared(inner)] => inner.to_bytes(len),
602            [Local(bytes)] => Cow::Borrowed(bytes),
603            _ => {
604                let mut read = RopeReader::new(self, 0);
605                let mut buf = Vec::with_capacity(len);
606                read.read_to_end(&mut buf)
607                    .expect("rope reader should not fail");
608                buf.into()
609            }
610        }
611    }
612
613    fn into_bytes(mut self, len: usize) -> Bytes {
614        if self.0.is_empty() {
615            return Bytes::default();
616        } else if self.0.len() == 1 {
617            let data = Arc::try_unwrap(self.0);
618            match data {
619                Ok(data) => {
620                    return data.into_iter().next().unwrap().into_bytes(len);
621                }
622                Err(data) => {
623                    // If we have a single element, we can return it directly.
624                    if let Local(bytes) = &data[0] {
625                        return bytes.clone();
626                    }
627                    self.0 = data;
628                }
629            }
630        }
631
632        let mut read = RopeReader::new(&self, 0);
633        let mut buf = Vec::with_capacity(len);
634        read.read_to_end(&mut buf)
635            .expect("read of rope cannot fail");
636        buf.into()
637    }
638}
639
640impl Default for InnerRope {
641    fn default() -> Self {
642        InnerRope(Arc::new(vec![]))
643    }
644}
645
646impl DeterministicHash for InnerRope {
647    /// Ropes with similar contents hash the same, regardless of their
648    /// structure. Notice the InnerRope does not contain a length (and any
649    /// shared InnerRopes won't either), so the exact structure isn't
650    /// relevant at this point.
651    fn deterministic_hash<H: DeterministicHasher>(&self, state: &mut H) {
652        for v in self.0.iter() {
653            v.deterministic_hash(state);
654        }
655    }
656}
657
658impl From<Vec<RopeElem>> for InnerRope {
659    fn from(mut els: Vec<RopeElem>) -> Self {
660        if cfg!(debug_assertions) {
661            // It's important that an InnerRope never contain an empty Bytes section.
662            for el in els.iter() {
663                match el {
664                    Local(b) => debug_assert!(!b.is_empty(), "must not have empty Bytes"),
665                    Shared(s) => {
666                        // We check whether the shared slice is empty, and not its elements. The
667                        // only way to construct the Shared's InnerRope is
668                        // in this mod, and we have already checked that
669                        // none of its elements are empty.
670                        debug_assert!(!s.is_empty(), "must not have empty InnerRope");
671                    }
672                }
673            }
674        }
675        els.shrink_to_fit();
676        InnerRope(Arc::from(els))
677    }
678}
679
680impl Deref for InnerRope {
681    type Target = Arc<Vec<RopeElem>>;
682
683    fn deref(&self) -> &Self::Target {
684        &self.0
685    }
686}
687
688impl RopeElem {
689    fn maybe_cmp(&self, other: &Self) -> Option<Ordering> {
690        match (self, other) {
691            (Local(a), Local(b)) => {
692                if a.len() == b.len() {
693                    return Some(a.cmp(b));
694                }
695
696                // But if not, the rope may still be contents equal if a following section
697                // contains the missing bytes.
698                None
699            }
700            (Shared(a), Shared(b)) => {
701                if Arc::ptr_eq(&a.0, &b.0) {
702                    return Some(Ordering::Equal);
703                }
704
705                // But if not, they might still be equal and we need to fallback to slower
706                // equality.
707                None
708            }
709            _ => None,
710        }
711    }
712
713    fn into_bytes(self, len: usize) -> Bytes {
714        match self {
715            Local(bytes) => bytes,
716            Shared(inner) => inner.into_bytes(len),
717        }
718    }
719}
720
721impl DeterministicHash for RopeElem {
722    /// Ropes with similar contents hash the same, regardless of their
723    /// structure. Notice the Bytes length is not hashed, and shared InnerRopes
724    /// do not contain a length.
725    fn deterministic_hash<H: DeterministicHasher>(&self, state: &mut H) {
726        match self {
727            Local(bytes) => state.write_bytes(bytes),
728            Shared(inner) => inner.deterministic_hash(state),
729        }
730    }
731}
732
733#[derive(Debug, Default)]
734/// Implements the [Read]/[AsyncRead]/[Iterator] trait over a [Rope].
735pub struct RopeReader<'a> {
736    /// The Rope's tree is kept as a stack, allowing us to accomplish incremental yielding.
737    stack: Vec<StackElem<'a>>,
738    /// An offset in the current buffer, used by the `read` implementation.
739    offset: usize,
740}
741
742/// A StackElem holds the current index into either a Bytes or a shared Rope.
743/// When the index reaches the end of the associated data, it is removed and we
744/// continue onto the next item in the stack.
745#[derive(Debug)]
746enum StackElem<'a> {
747    Local(&'a Bytes),
748    Shared(&'a InnerRope, usize),
749}
750
751impl<'a> RopeReader<'a> {
752    fn new(inner: &'a InnerRope, index: usize) -> Self {
753        if index >= inner.len() {
754            Default::default()
755        } else {
756            RopeReader {
757                stack: vec![StackElem::Shared(inner, index)],
758                offset: 0,
759            }
760        }
761    }
762
763    /// A shared implementation for reading bytes. This takes the basic
764    /// operations needed for both Read and AsyncRead.
765    fn read_internal(&mut self, want: usize, buf: &mut ReadBuf<'_>) -> usize {
766        let mut remaining = want;
767
768        while remaining > 0 {
769            let bytes = match self.next_internal() {
770                None => break,
771                Some(b) => b,
772            };
773
774            let lower = self.offset;
775            let upper = min(bytes.len(), lower + remaining);
776
777            buf.put_slice(&bytes[self.offset..upper]);
778
779            if upper < bytes.len() {
780                self.offset = upper;
781                self.stack.push(StackElem::Local(bytes))
782            } else {
783                self.offset = 0;
784            }
785            remaining -= upper - lower;
786        }
787
788        want - remaining
789    }
790
791    /// Returns the next item in the iterator without modifying `self.offset`.
792    fn next_internal(&mut self) -> Option<&'a Bytes> {
793        // Iterates the rope's elements recursively until we find the next Local
794        // section, returning its Bytes.
795        loop {
796            let (inner, mut index) = match self.stack.pop() {
797                None => return None,
798                Some(StackElem::Local(b)) => {
799                    debug_assert!(!b.is_empty(), "must not have empty Bytes section");
800                    return Some(b);
801                }
802                Some(StackElem::Shared(r, i)) => (r, i),
803            };
804
805            let el = &inner[index];
806            index += 1;
807            if index < inner.len() {
808                self.stack.push(StackElem::Shared(inner, index));
809            }
810
811            self.stack.push(StackElem::from(el));
812        }
813    }
814}
815
816impl<'a> Iterator for RopeReader<'a> {
817    type Item = &'a Bytes;
818
819    fn next(&mut self) -> Option<Self::Item> {
820        self.offset = 0;
821        self.next_internal()
822    }
823}
824
825impl Read for RopeReader<'_> {
826    fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
827        Ok(self.read_internal(buf.len(), &mut ReadBuf::new(buf)))
828    }
829}
830
831impl AsyncRead for RopeReader<'_> {
832    fn poll_read(
833        self: Pin<&mut Self>,
834        _cx: &mut TaskContext<'_>,
835        buf: &mut ReadBuf<'_>,
836    ) -> Poll<IoResult<()>> {
837        let this = self.get_mut();
838        this.read_internal(buf.remaining(), buf);
839        Poll::Ready(Ok(()))
840    }
841}
842
843impl BufRead for RopeReader<'_> {
844    /// Never returns an error.
845    fn fill_buf(&mut self) -> IoResult<&[u8]> {
846        // Returns the full buffer without coping any data. The same bytes will
847        // continue to be returned until [consume] is called.
848        let bytes = match self.next_internal() {
849            None => return Ok(EMPTY_BUF),
850            Some(b) => b,
851        };
852
853        // This is just so we can get a reference to the asset that is kept alive by the
854        // RopeReader itself. We can then auto-convert that reference into the needed u8
855        // slice reference.
856        self.stack.push(StackElem::Local(bytes));
857        let Some(StackElem::Local(bytes)) = self.stack.last() else {
858            unreachable!()
859        };
860
861        Ok(&bytes[self.offset..])
862    }
863
864    fn consume(&mut self, amt: usize) {
865        if let Some(StackElem::Local(b)) = self.stack.last_mut() {
866            // https://doc.rust-lang.org/std/io/trait.BufRead.html#tymethod.consume
867            debug_assert!(
868                self.offset + amt <= b.len(),
869                "It is a logic error if `amount` exceeds the number of unread bytes in the \
870                 internal buffer, which is returned by `fill_buf`."
871            );
872            // Consume some amount of bytes from the current Bytes instance, ensuring those bytes
873            // are not returned on the next call to `fill_buf`.
874            self.offset += amt;
875            if self.offset == b.len() {
876                // whole Bytes instance was consumed
877                self.stack.pop();
878                self.offset = 0;
879            }
880        }
881    }
882}
883
884impl<'a> Stream for RopeReader<'a> {
885    /// This is efficiently streamable into a [`Hyper::Body`] if each item is cloned into an owned
886    /// `Bytes` instance.
887    type Item = Result<&'a Bytes>;
888
889    /// Returns a "result" of reading the next shared bytes reference. This
890    /// differs from [`Read::read`] by not copying any memory.
891    fn poll_next(self: Pin<&mut Self>, _cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
892        let this = self.get_mut();
893        Poll::Ready(this.next().map(Ok))
894    }
895}
896
897impl<'a> From<&'a RopeElem> for StackElem<'a> {
898    fn from(el: &'a RopeElem) -> Self {
899        match el {
900            Local(bytes) => Self::Local(bytes),
901            Shared(inner) => Self::Shared(inner, 0),
902        }
903    }
904}
905
906#[cfg(test)]
907mod test {
908    use std::{
909        borrow::Cow,
910        cmp::min,
911        io::{BufRead, Read},
912    };
913
914    use anyhow::Result;
915    use turbo_tasks_hash::{DeterministicHasher, Xxh3Hash64Hasher, hash_xxh3_hash64};
916
917    use super::{InnerRope, Rope, RopeBuilder, RopeElem};
918
919    // These are intentionally not exposed, because they do inefficient conversions
920    // in order to fully test cases.
921    impl From<&str> for RopeElem {
922        fn from(value: &str) -> Self {
923            RopeElem::Local(value.to_string().into())
924        }
925    }
926    impl From<Vec<RopeElem>> for RopeElem {
927        fn from(value: Vec<RopeElem>) -> Self {
928            RopeElem::Shared(InnerRope::from(value))
929        }
930    }
931    impl From<Rope> for RopeElem {
932        fn from(value: Rope) -> Self {
933            RopeElem::Shared(value.data)
934        }
935    }
936    impl Rope {
937        fn new(value: Vec<RopeElem>) -> Self {
938            let data = InnerRope::from(value);
939            Rope {
940                length: data.len(),
941                data,
942            }
943        }
944    }
945    impl InnerRope {
946        fn len(&self) -> usize {
947            self.iter().map(|v| v.len()).sum()
948        }
949    }
950    impl RopeElem {
951        fn len(&self) -> usize {
952            match self {
953                RopeElem::Local(b) => b.len(),
954                RopeElem::Shared(r) => r.len(),
955            }
956        }
957    }
958
959    #[test]
960    fn empty_build_without_pushes() {
961        let empty = RopeBuilder::default().build();
962        let mut reader = empty.read();
963        assert!(reader.next().is_none());
964    }
965
966    #[test]
967    fn empty_build_with_empty_static_push() {
968        let mut builder = RopeBuilder::default();
969        builder += "";
970
971        let empty = builder.build();
972        let mut reader = empty.read();
973        assert!(reader.next().is_none());
974    }
975
976    #[test]
977    fn empty_build_with_empty_bytes_push() {
978        let mut builder = RopeBuilder::default();
979        builder.push_bytes(&[]);
980
981        let empty = builder.build();
982        let mut reader = empty.read();
983        assert!(reader.next().is_none());
984    }
985
986    #[test]
987    fn empty_build_with_empty_concat() {
988        let mut builder = RopeBuilder::default();
989        builder += &RopeBuilder::default().build();
990
991        let empty = builder.build();
992        let mut reader = empty.read();
993        assert!(reader.next().is_none());
994    }
995
996    #[test]
997    fn empty_from_empty_static_str() {
998        let empty = Rope::from("");
999        let mut reader = empty.read();
1000        assert!(reader.next().is_none());
1001    }
1002
1003    #[test]
1004    fn empty_from_empty_string() {
1005        let empty = Rope::from("".to_string());
1006        let mut reader = empty.read();
1007        assert!(reader.next().is_none());
1008    }
1009
1010    #[test]
1011    fn empty_equality() {
1012        let a = Rope::from("");
1013        let b = Rope::from("");
1014
1015        assert_eq!(a, b);
1016    }
1017
1018    #[test]
1019    fn cloned_equality() {
1020        let a = Rope::from("abc");
1021        let b = a.clone();
1022
1023        assert_eq!(a, b);
1024    }
1025
1026    #[test]
1027    fn value_equality() {
1028        let a = Rope::from("abc".to_string());
1029        let b = Rope::from("abc".to_string());
1030
1031        assert_eq!(a, b);
1032    }
1033
1034    #[test]
1035    fn value_inequality() {
1036        let a = Rope::from("abc".to_string());
1037        let b = Rope::from("def".to_string());
1038
1039        assert_ne!(a, b);
1040    }
1041
1042    #[test]
1043    fn value_equality_shared_1() {
1044        let shared = Rope::from("def");
1045        let a = Rope::new(vec!["abc".into(), shared.clone().into(), "ghi".into()]);
1046        let b = Rope::new(vec!["abc".into(), shared.into(), "ghi".into()]);
1047
1048        assert_eq!(a, b);
1049    }
1050
1051    #[test]
1052    fn value_equality_shared_2() {
1053        let a = Rope::new(vec!["abc".into(), vec!["def".into()].into(), "ghi".into()]);
1054        let b = Rope::new(vec!["abc".into(), vec!["def".into()].into(), "ghi".into()]);
1055
1056        assert_eq!(a, b);
1057    }
1058
1059    #[test]
1060    fn value_equality_splits_1() {
1061        let a = Rope::new(vec!["a".into(), "aa".into()]);
1062        let b = Rope::new(vec!["aa".into(), "a".into()]);
1063
1064        assert_eq!(a, b);
1065    }
1066
1067    #[test]
1068    fn value_equality_splits_2() {
1069        let a = Rope::new(vec![vec!["a".into()].into(), "aa".into()]);
1070        let b = Rope::new(vec![vec!["aa".into()].into(), "a".into()]);
1071
1072        assert_eq!(a, b);
1073    }
1074
1075    #[test]
1076    fn value_inequality_shared_1() {
1077        let shared = Rope::from("def");
1078        let a = Rope::new(vec!["aaa".into(), shared.clone().into(), "ghi".into()]);
1079        let b = Rope::new(vec!["bbb".into(), shared.into(), "ghi".into()]);
1080
1081        assert_ne!(a, b);
1082    }
1083
1084    #[test]
1085    fn value_inequality_shared_2() {
1086        let a = Rope::new(vec!["abc".into(), vec!["ddd".into()].into(), "ghi".into()]);
1087        let b = Rope::new(vec!["abc".into(), vec!["eee".into()].into(), "ghi".into()]);
1088
1089        assert_ne!(a, b);
1090    }
1091
1092    #[test]
1093    fn value_inequality_shared_3() {
1094        let shared = Rope::from("def");
1095        let a = Rope::new(vec!["abc".into(), shared.clone().into(), "ggg".into()]);
1096        let b = Rope::new(vec!["abc".into(), shared.into(), "hhh".into()]);
1097
1098        assert_ne!(a, b);
1099    }
1100
1101    #[test]
1102    fn hash_structure_invariance() {
1103        let shared = Rope::from("def");
1104        let a = Rope::new(vec!["abc".into(), shared.clone().into(), "ggg".into()]);
1105        let b = Rope::new(vec![
1106            "ab".into(),
1107            "c".into(),
1108            shared.into(),
1109            "g".into(),
1110            "gg".into(),
1111        ]);
1112
1113        assert_eq!(hash_xxh3_hash64(a), hash_xxh3_hash64(b));
1114    }
1115
1116    #[test]
1117    fn content_hash() {
1118        let rope = Rope::new(vec!["abc".into(), "def".into()]);
1119
1120        let string = "abcdef";
1121        let mut hasher = Xxh3Hash64Hasher::default();
1122        hasher.write_bytes(string.as_bytes());
1123
1124        assert_eq!(hash_xxh3_hash64(rope.content_hash()), hasher.finish());
1125    }
1126
1127    #[test]
1128    fn iteration() {
1129        let shared = Rope::from("def");
1130        let rope = Rope::new(vec!["abc".into(), shared.into(), "ghi".into()]);
1131
1132        let chunks = rope.read().collect::<Vec<_>>();
1133
1134        assert_eq!(chunks, vec!["abc", "def", "ghi"]);
1135    }
1136
1137    #[test]
1138    fn read() {
1139        let shared = Rope::from("def");
1140        let rope = Rope::new(vec!["abc".into(), shared.into(), "ghi".into()]);
1141
1142        let mut chunks = vec![];
1143        let mut buf = [0_u8; 2];
1144        let mut reader = rope.read();
1145        loop {
1146            let amt = reader.read(&mut buf).unwrap();
1147            if amt == 0 {
1148                break;
1149            }
1150            chunks.push(Vec::from(&buf[0..amt]));
1151        }
1152
1153        assert_eq!(
1154            chunks,
1155            vec![
1156                Vec::from(*b"ab"),
1157                Vec::from(*b"cd"),
1158                Vec::from(*b"ef"),
1159                Vec::from(*b"gh"),
1160                Vec::from(*b"i")
1161            ]
1162        );
1163    }
1164
1165    #[test]
1166    fn fill_buf() {
1167        let shared = Rope::from("def");
1168        let rope = Rope::new(vec!["abc".into(), shared.into(), "ghi".into()]);
1169
1170        let mut chunks = vec![];
1171        let mut reader = rope.read();
1172        loop {
1173            let buf = reader.fill_buf().unwrap();
1174            if buf.is_empty() {
1175                break;
1176            }
1177            let c = min(2, buf.len());
1178            chunks.push(Vec::from(buf));
1179            reader.consume(c);
1180        }
1181
1182        assert_eq!(
1183            chunks,
1184            // We're receiving a full buf, then only consuming 2 bytes, so we'll still get the
1185            // third.
1186            vec![
1187                Vec::from(*b"abc"),
1188                Vec::from(*b"c"),
1189                Vec::from(*b"def"),
1190                Vec::from(*b"f"),
1191                Vec::from(*b"ghi"),
1192                Vec::from(*b"i")
1193            ]
1194        );
1195    }
1196
1197    #[test]
1198    fn test_to_bytes() -> Result<()> {
1199        let rope = Rope::from("abc");
1200        assert_eq!(rope.to_bytes(), Cow::Borrowed::<[u8]>(&[0x61, 0x62, 0x63]));
1201        Ok(())
1202    }
1203}