turbo_tasks_fs/
rope.rs

1use std::{
2    borrow::Cow,
3    cmp::{Ordering, min},
4    fmt,
5    io::{BufRead, Read, Result as IoResult, Write},
6    mem,
7    ops::{AddAssign, Deref},
8    pin::Pin,
9    task::{Context as TaskContext, Poll},
10};
11
12use RopeElem::{Local, Shared};
13use anyhow::{Context, Result};
14use bincode::{
15    Decode, Encode,
16    de::{Decoder, read::Reader as _},
17    enc::{Encoder, write::Writer as _},
18    error::{DecodeError, EncodeError},
19    impl_borrow_decode,
20};
21use bytes::Bytes;
22use futures::Stream;
23use tokio::io::{AsyncRead, ReadBuf};
24use triomphe::Arc;
25use turbo_tasks_hash::{DeterministicHash, DeterministicHasher};
26
27static EMPTY_BUF: &[u8] = &[];
28
29/// An efficient structure for sharing bytes/strings between multiple sources.
30///
31/// Cloning a Rope is extremely cheap (Arc and usize), and
32/// sharing the contents of one Rope can be done by just cloning an Arc.
33///
34/// Ropes are immutable, in order to construct one see [RopeBuilder].
35#[turbo_tasks::value(shared, serialization = "custom", eq = "manual", operation)]
36#[derive(Clone, Debug, Default)]
37pub struct Rope {
38    /// Total length of all held bytes.
39    length: usize,
40
41    /// A shareable container holding the rope's bytes.
42    #[turbo_tasks(debug_ignore, trace_ignore)]
43    data: InnerRope,
44}
45
46/// An Arc container for ropes. This indirection allows for easily sharing the
47/// contents between Ropes (and also RopeBuilders/RopeReaders).
48#[derive(Clone, Debug)]
49struct InnerRope(Arc<Vec<RopeElem>>);
50
51/// Differentiates the types of stored bytes in a rope.
52#[derive(Clone, Debug)]
53enum RopeElem {
54    /// Local bytes are owned directly by this rope.
55    Local(Bytes),
56
57    /// Shared holds the Arc container of another rope.
58    Shared(InnerRope),
59}
60
61/// RopeBuilder provides a mutable container to append bytes/strings. This can
62/// also append _other_ Rope instances cheaply, allowing efficient sharing of
63/// the contents without a full clone of the bytes.
64#[derive(Default, Debug)]
65pub struct RopeBuilder {
66    /// Total length of all previously committed bytes.
67    length: usize,
68
69    /// Immutable bytes references that have been appended to this builder. The
70    /// rope is the combination of all these committed bytes.
71    committed: Vec<RopeElem>,
72
73    /// Stores bytes that have been pushed, but are not yet committed. This is
74    /// either an attempt to push a static lifetime, or a push of owned bytes.
75    /// When the builder is flushed, we will commit these bytes into a real
76    /// Bytes instance.
77    uncommitted: Uncommitted,
78}
79
80/// Stores any bytes which have been pushed, but we haven't decided to commit
81/// yet. Uncommitted bytes allow us to build larger buffers out of possibly
82/// small pushes.
83#[derive(Default)]
84enum Uncommitted {
85    #[default]
86    None,
87
88    /// Stores our attempt to push static lifetime bytes into the rope. If we
89    /// build the Rope or concatenate another Rope, we can commit a static
90    /// Bytes reference and save memory. If not, we'll concatenate this into
91    /// writable bytes to be committed later.
92    Static(&'static [u8]),
93
94    /// Mutable bytes collection where non-static/non-shared bytes are written.
95    /// This builds until the next time a static or shared bytes is
96    /// appended, in which case we split the buffer and commit. Finishing
97    /// the builder also commits these bytes.
98    Owned(Vec<u8>),
99}
100
101impl Rope {
102    pub fn len(&self) -> usize {
103        self.length
104    }
105
106    pub fn is_empty(&self) -> bool {
107        self.length == 0
108    }
109
110    /// Returns a [Read]/[AsyncRead]/[Iterator] instance over all bytes.
111    pub fn read(&self) -> RopeReader<'_> {
112        RopeReader::new(&self.data, 0)
113    }
114
115    /// Returns a String instance of all bytes.
116    pub fn to_str(&self) -> Result<Cow<'_, str>> {
117        self.data.to_str(self.length)
118    }
119
120    /// Returns a slice of all bytes
121    pub fn to_bytes(&self) -> Cow<'_, [u8]> {
122        self.data.to_bytes(self.length)
123    }
124
125    pub fn into_bytes(self) -> Bytes {
126        self.data.into_bytes(self.length)
127    }
128}
129
130impl From<Vec<u8>> for Rope {
131    fn from(mut bytes: Vec<u8>) -> Self {
132        bytes.shrink_to_fit();
133        Rope::from(Bytes::from(bytes))
134    }
135}
136
137impl From<String> for Rope {
138    fn from(mut bytes: String) -> Self {
139        bytes.shrink_to_fit();
140        Rope::from(Bytes::from(bytes))
141    }
142}
143
144impl<T: Into<Bytes>> From<T> for Rope {
145    default fn from(bytes: T) -> Self {
146        let bytes = bytes.into();
147        // We can't have an InnerRope which contains an empty Local section.
148        if bytes.is_empty() {
149            Default::default()
150        } else {
151            Rope {
152                length: bytes.len(),
153                data: InnerRope(Arc::from(vec![Local(bytes)])),
154            }
155        }
156    }
157}
158
159impl RopeBuilder {
160    /// Push owned bytes into the Rope.
161    ///
162    /// If possible use [push_static_bytes] or `+=` operation instead, as they
163    /// will create a reference to shared memory instead of cloning the bytes.
164    pub fn push_bytes(&mut self, bytes: &[u8]) {
165        if bytes.is_empty() {
166            return;
167        }
168
169        self.uncommitted.push_bytes(bytes);
170    }
171
172    /// Reserve additional capacity for owned bytes in the Rope.
173    ///
174    /// This is useful to call before multiple `push_bytes` calls to avoid
175    /// multiple allocations.
176    pub fn reserve_bytes(&mut self, additional: usize) {
177        self.uncommitted.reserve_bytes(additional);
178    }
179
180    /// Push static lifetime bytes into the Rope.
181    ///
182    /// This is more efficient than pushing owned bytes, because the internal
183    /// data does not need to be copied when the rope is read.
184    pub fn push_static_bytes(&mut self, bytes: &'static [u8]) {
185        if bytes.is_empty() {
186            return;
187        }
188
189        // If the string is smaller than the cost of a Bytes reference (4 usizes), then
190        // it's more efficient to own the bytes in a new buffer. We may be able to reuse
191        // that buffer when more bytes are pushed.
192        if bytes.len() < mem::size_of::<Bytes>() {
193            return self.uncommitted.push_static_bytes(bytes);
194        }
195
196        // We may have pending bytes from a prior push.
197        self.finish();
198
199        self.length += bytes.len();
200        self.committed.push(Local(Bytes::from_static(bytes)));
201    }
202
203    /// Concatenate another Rope instance into our builder.
204    ///
205    /// This is much more efficient than pushing actual bytes, since we can
206    /// share the other Rope's references without copying the underlying data.
207    pub fn concat(&mut self, other: &Rope) {
208        if other.is_empty() {
209            return;
210        }
211
212        // We may have pending bytes from a prior push.
213        self.finish();
214
215        self.length += other.len();
216        self.committed.push(Shared(other.data.clone()));
217    }
218
219    /// Writes any pending bytes into our committed queue. This is called automatically by other
220    /// `RopeBuilder` methods.
221    ///
222    /// This may be called multiple times without issue.
223    fn finish(&mut self) {
224        if let Some(b) = self.uncommitted.finish() {
225            debug_assert!(!b.is_empty(), "must not have empty uncommitted bytes");
226            self.length += b.len();
227            self.committed.push(Local(b));
228        }
229    }
230
231    pub fn len(&self) -> usize {
232        self.length + self.uncommitted.len()
233    }
234
235    pub fn is_empty(&self) -> bool {
236        self.len() == 0
237    }
238
239    /// Constructs our final, immutable Rope instance.
240    pub fn build(mut self) -> Rope {
241        self.finish();
242        Rope {
243            length: self.length,
244            data: InnerRope::from(self.committed),
245        }
246    }
247}
248
249impl From<&'static str> for RopeBuilder {
250    default fn from(bytes: &'static str) -> Self {
251        let mut r = RopeBuilder::default();
252        r += bytes;
253        r
254    }
255}
256
257impl From<Vec<u8>> for RopeBuilder {
258    fn from(bytes: Vec<u8>) -> Self {
259        RopeBuilder {
260            // Directly constructing the Uncommitted allows us to skip copying the bytes.
261            uncommitted: Uncommitted::from(bytes),
262            ..Default::default()
263        }
264    }
265}
266
267impl Write for RopeBuilder {
268    fn write(&mut self, bytes: &[u8]) -> IoResult<usize> {
269        self.push_bytes(bytes);
270        Ok(bytes.len())
271    }
272
273    fn flush(&mut self) -> IoResult<()> {
274        self.finish();
275        Ok(())
276    }
277}
278
279impl AddAssign<&'static str> for RopeBuilder {
280    /// Pushes a reference to static memory onto the rope.
281    ///
282    /// This is more efficient than pushing owned bytes, because the internal
283    /// data does not need to be copied when the rope is read.
284    fn add_assign(&mut self, rhs: &'static str) {
285        self.push_static_bytes(rhs.as_bytes());
286    }
287}
288
289impl AddAssign<&Rope> for RopeBuilder {
290    fn add_assign(&mut self, rhs: &Rope) {
291        self.concat(rhs);
292    }
293}
294
295impl Uncommitted {
296    fn len(&self) -> usize {
297        match self {
298            Uncommitted::None => 0,
299            Uncommitted::Static(s) => s.len(),
300            Uncommitted::Owned(v) => v.len(),
301        }
302    }
303
304    /// Pushes owned bytes, converting the current representation to an Owned if
305    /// it's not already.
306    fn push_bytes(&mut self, bytes: &[u8]) {
307        debug_assert!(!bytes.is_empty(), "must not push empty uncommitted bytes");
308        match self {
309            Self::None => *self = Self::Owned(bytes.to_vec()),
310            Self::Static(s) => {
311                // If we'd previously pushed static bytes, we instead concatenate those bytes
312                // with the new bytes in an attempt to use less memory rather than committing 2
313                // Bytes references (2 * 4 usizes).
314                let v = [s, bytes].concat();
315                *self = Self::Owned(v);
316            }
317            Self::Owned(v) => v.extend(bytes),
318        }
319    }
320
321    /// Reserves additional capacity for owned bytes, converting the current
322    /// representation to an Owned if it's not already.
323    fn reserve_bytes(&mut self, additional: usize) {
324        match self {
325            Self::None => {
326                *self = Self::Owned(Vec::with_capacity(additional));
327            }
328            Self::Static(s) => {
329                let mut v = Vec::with_capacity(s.len() + additional);
330                v.extend_from_slice(s);
331                *self = Self::Owned(v);
332            }
333            Self::Owned(v) => {
334                v.reserve(additional);
335            }
336        }
337    }
338
339    /// Pushes static lifetime bytes, but only if the current representation is
340    /// None. Else, it coverts to an Owned.
341    fn push_static_bytes(&mut self, bytes: &'static [u8]) {
342        debug_assert!(!bytes.is_empty(), "must not push empty uncommitted bytes");
343        match self {
344            // If we've not already pushed static bytes, we attempt to store the bytes for later. If
345            // we push owned bytes or another static bytes, then this attempt will fail and we'll
346            // instead concatenate into a single owned Bytes. But if we don't push anything (build
347            // the Rope), or concatenate another Rope (we can't join our bytes with the InnerRope of
348            // another Rope), we'll be able to commit a static Bytes reference and save overall
349            // memory (a small static Bytes reference is better than a small owned Bytes reference).
350            Self::None => *self = Self::Static(bytes),
351            _ => self.push_bytes(bytes),
352        }
353    }
354
355    /// Converts the current uncommitted bytes into a Bytes, resetting our
356    /// representation to None.
357    fn finish(&mut self) -> Option<Bytes> {
358        match mem::take(self) {
359            Self::None => None,
360            Self::Static(s) => Some(Bytes::from_static(s)),
361            Self::Owned(mut v) => {
362                v.shrink_to_fit();
363                Some(v.into())
364            }
365        }
366    }
367}
368
369impl fmt::Debug for Uncommitted {
370    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
371        match self {
372            Uncommitted::None => f.write_str("None"),
373            Uncommitted::Static(s) => f
374                .debug_tuple("Static")
375                .field(&Bytes::from_static(s))
376                .finish(),
377            Uncommitted::Owned(v) => f
378                .debug_tuple("Owned")
379                .field(&Bytes::from(v.clone()))
380                .finish(),
381        }
382    }
383}
384
385impl DeterministicHash for Rope {
386    /// Ropes with similar contents hash the same, regardless of their
387    /// structure.
388    fn deterministic_hash<H: DeterministicHasher>(&self, state: &mut H) {
389        state.write_usize(self.len());
390        self.data.deterministic_hash(state);
391    }
392}
393
394/// Encode as a len + raw bytes format using the encoder's [`bincode::enc::write::Writer`]. Encoding
395/// [`Rope::to_bytes`] instead would be easier, but would require copying to an intermediate buffer.
396///
397/// This len + bytes format is similar to how bincode would normally encode a `&[u8]`:
398/// https://docs.rs/bincode/latest/bincode/spec/index.html#collections
399impl Encode for Rope {
400    fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
401        self.length.encode(encoder)?;
402        let mut reader = self.read();
403        for chunk in &mut reader {
404            encoder.writer().write(chunk)?;
405        }
406
407        Ok(())
408    }
409}
410
411impl<Context> Decode<Context> for Rope {
412    #[allow(clippy::uninit_vec)]
413    fn decode<D: Decoder<Context = Context>>(decoder: &mut D) -> Result<Self, DecodeError> {
414        let length = usize::decode(decoder)?;
415        let mut bytes = Vec::with_capacity(length);
416
417        // SAFETY:
418        // - `bytes` has capacity of `length` already
419        // - `read` writes to (does not read) `bytes` and will return an error if exactly `length`
420        //   bytes is not written, so no uninitialized memory ever escapes this function.
421        // We can't use `MaybeUninit` here because `read` doesn't support it.
422        unsafe {
423            bytes.set_len(length);
424        }
425        // the decoder API requires that we claim a length *before* reading (not after)
426        decoder.claim_bytes_read(length)?;
427        decoder.reader().read(&mut bytes)?;
428
429        Ok(Rope::from(bytes))
430    }
431}
432
433impl_borrow_decode!(Rope);
434
435pub mod ser_as_string {
436    use serde::{Serializer, ser::Error};
437
438    use super::Rope;
439
440    /// Serializes a Rope into a string.
441    pub fn serialize<S: Serializer>(rope: &Rope, serializer: S) -> Result<S::Ok, S::Error> {
442        let s = rope.to_str().map_err(Error::custom)?;
443        serializer.serialize_str(&s)
444    }
445}
446
447pub mod ser_option_as_string {
448    use serde::{Serializer, ser::Error};
449
450    use super::Rope;
451
452    /// Serializes a Rope into a string.
453    pub fn serialize<S: Serializer>(rope: &Option<Rope>, serializer: S) -> Result<S::Ok, S::Error> {
454        if let Some(rope) = rope {
455            let s = rope.to_str().map_err(Error::custom)?;
456            serializer.serialize_some(&s)
457        } else {
458            serializer.serialize_none()
459        }
460    }
461}
462
463impl PartialEq for Rope {
464    // Ropes with similar contents are equals, regardless of their structure.
465    fn eq(&self, other: &Self) -> bool {
466        if self.len() != other.len() {
467            return false;
468        }
469        self.cmp(other) == Ordering::Equal
470    }
471}
472
473impl Eq for Rope {}
474
475impl Ord for Rope {
476    fn cmp(&self, other: &Self) -> Ordering {
477        if Arc::ptr_eq(&self.data, &other.data) {
478            return Ordering::Equal;
479        }
480
481        // Fast path for structurally equal Ropes. With this, we can do memory reference
482        // checks and skip some contents equality.
483        let left = &self.data;
484        let right = &other.data;
485        let len = min(left.len(), right.len());
486        let mut index = 0;
487        while index < len {
488            let a = &left[index];
489            let b = &right[index];
490
491            match a.maybe_cmp(b) {
492                // Bytes or InnerRope point to the same memory, or Bytes are contents equal.
493                Some(Ordering::Equal) => index += 1,
494                // Bytes are not contents equal.
495                Some(ordering) => return ordering,
496                // InnerRopes point to different memory, or the Ropes weren't structurally equal.
497                None => break,
498            }
499        }
500        // If we reach the end of iteration without finding a mismatch (or early
501        // breaking), then we know the ropes are either equal or not equal.
502        if index == len {
503            // We know that any remaining RopeElem in the InnerRope must contain content, so
504            // if either one contains more RopeElem than they cannot be equal.
505            return left.len().cmp(&right.len());
506        }
507
508        // At this point, we need to do slower contents equality. It's possible we'll
509        // still get some memory reference equality for Bytes.
510        let mut left = RopeReader::new(left, index);
511        let mut right = RopeReader::new(right, index);
512        loop {
513            match (left.fill_buf(), right.fill_buf()) {
514                // fill_buf should always return Ok, with either some number of bytes or 0 bytes
515                // when consumed.
516                (Ok(a), Ok(b)) => {
517                    let len = min(a.len(), b.len());
518
519                    // When one buffer is consumed, both must be consumed.
520                    if len == 0 {
521                        return a.len().cmp(&b.len());
522                    }
523
524                    match a[0..len].cmp(&b[0..len]) {
525                        Ordering::Equal => {
526                            left.consume(len);
527                            right.consume(len);
528                        }
529                        ordering => return ordering,
530                    }
531                }
532
533                // If an error is ever returned (which shouldn't happen for us) for either/both,
534                // then we can't prove equality.
535                _ => unreachable!(),
536            }
537        }
538    }
539}
540
541impl PartialOrd for Rope {
542    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
543        Some(self.cmp(other))
544    }
545}
546
547impl From<Vec<u8>> for Uncommitted {
548    fn from(bytes: Vec<u8>) -> Self {
549        if bytes.is_empty() {
550            Uncommitted::None
551        } else {
552            Uncommitted::Owned(bytes)
553        }
554    }
555}
556
557impl InnerRope {
558    /// Returns a String instance of all bytes.
559    fn to_str(&self, len: usize) -> Result<Cow<'_, str>> {
560        match &self[..] {
561            [] => Ok(Cow::Borrowed("")),
562            [Shared(inner)] => inner.to_str(len),
563            [Local(bytes)] => {
564                let utf8 = std::str::from_utf8(bytes);
565                utf8.context("failed to convert rope into string")
566                    .map(Cow::Borrowed)
567            }
568            _ => {
569                let mut read = RopeReader::new(self, 0);
570                let mut string = String::with_capacity(len);
571                let res = read.read_to_string(&mut string);
572                res.context("failed to convert rope into string")?;
573                Ok(Cow::Owned(string))
574            }
575        }
576    }
577
578    /// Returns a slice of all bytes.
579    fn to_bytes(&self, len: usize) -> Cow<'_, [u8]> {
580        match &self[..] {
581            [] => Cow::Borrowed(EMPTY_BUF),
582            [Shared(inner)] => inner.to_bytes(len),
583            [Local(bytes)] => Cow::Borrowed(bytes),
584            _ => {
585                let mut read = RopeReader::new(self, 0);
586                let mut buf = Vec::with_capacity(len);
587                read.read_to_end(&mut buf)
588                    .expect("rope reader should not fail");
589                buf.into()
590            }
591        }
592    }
593
594    fn into_bytes(mut self, len: usize) -> Bytes {
595        if self.0.is_empty() {
596            return Bytes::default();
597        } else if self.0.len() == 1 {
598            let data = Arc::try_unwrap(self.0);
599            match data {
600                Ok(data) => {
601                    return data.into_iter().next().unwrap().into_bytes(len);
602                }
603                Err(data) => {
604                    // If we have a single element, we can return it directly.
605                    if let Local(bytes) = &data[0] {
606                        return bytes.clone();
607                    }
608                    self.0 = data;
609                }
610            }
611        }
612
613        let mut read = RopeReader::new(&self, 0);
614        let mut buf = Vec::with_capacity(len);
615        read.read_to_end(&mut buf)
616            .expect("read of rope cannot fail");
617        buf.into()
618    }
619}
620
621impl Default for InnerRope {
622    fn default() -> Self {
623        InnerRope(Arc::new(vec![]))
624    }
625}
626
627impl DeterministicHash for InnerRope {
628    /// Ropes with similar contents hash the same, regardless of their
629    /// structure. Notice the InnerRope does not contain a length (and any
630    /// shared InnerRopes won't either), so the exact structure isn't
631    /// relevant at this point.
632    fn deterministic_hash<H: DeterministicHasher>(&self, state: &mut H) {
633        for v in self.0.iter() {
634            v.deterministic_hash(state);
635        }
636    }
637}
638
639impl From<Vec<RopeElem>> for InnerRope {
640    fn from(mut els: Vec<RopeElem>) -> Self {
641        if cfg!(debug_assertions) {
642            // It's important that an InnerRope never contain an empty Bytes section.
643            for el in els.iter() {
644                match el {
645                    Local(b) => debug_assert!(!b.is_empty(), "must not have empty Bytes"),
646                    Shared(s) => {
647                        // We check whether the shared slice is empty, and not its elements. The
648                        // only way to construct the Shared's InnerRope is
649                        // in this mod, and we have already checked that
650                        // none of its elements are empty.
651                        debug_assert!(!s.is_empty(), "must not have empty InnerRope");
652                    }
653                }
654            }
655        }
656        els.shrink_to_fit();
657        InnerRope(Arc::from(els))
658    }
659}
660
661impl Deref for InnerRope {
662    type Target = Arc<Vec<RopeElem>>;
663
664    fn deref(&self) -> &Self::Target {
665        &self.0
666    }
667}
668
669impl RopeElem {
670    fn maybe_cmp(&self, other: &Self) -> Option<Ordering> {
671        match (self, other) {
672            (Local(a), Local(b)) => {
673                if a.len() == b.len() {
674                    return Some(a.cmp(b));
675                }
676
677                // But if not, the rope may still be contents equal if a following section
678                // contains the missing bytes.
679                None
680            }
681            (Shared(a), Shared(b)) => {
682                if Arc::ptr_eq(&a.0, &b.0) {
683                    return Some(Ordering::Equal);
684                }
685
686                // But if not, they might still be equal and we need to fallback to slower
687                // equality.
688                None
689            }
690            _ => None,
691        }
692    }
693
694    fn into_bytes(self, len: usize) -> Bytes {
695        match self {
696            Local(bytes) => bytes,
697            Shared(inner) => inner.into_bytes(len),
698        }
699    }
700}
701
702impl DeterministicHash for RopeElem {
703    /// Ropes with similar contents hash the same, regardless of their
704    /// structure. Notice the Bytes length is not hashed, and shared InnerRopes
705    /// do not contain a length.
706    fn deterministic_hash<H: DeterministicHasher>(&self, state: &mut H) {
707        match self {
708            Local(bytes) => state.write_bytes(bytes),
709            Shared(inner) => inner.deterministic_hash(state),
710        }
711    }
712}
713
714#[derive(Debug, Default)]
715/// Implements the [Read]/[AsyncRead]/[Iterator] trait over a [Rope].
716pub struct RopeReader<'a> {
717    /// The Rope's tree is kept as a stack, allowing us to accomplish incremental yielding.
718    stack: Vec<StackElem<'a>>,
719    /// An offset in the current buffer, used by the `read` implementation.
720    offset: usize,
721}
722
723/// A StackElem holds the current index into either a Bytes or a shared Rope.
724/// When the index reaches the end of the associated data, it is removed and we
725/// continue onto the next item in the stack.
726#[derive(Debug)]
727enum StackElem<'a> {
728    Local(&'a Bytes),
729    Shared(&'a InnerRope, usize),
730}
731
732impl<'a> RopeReader<'a> {
733    fn new(inner: &'a InnerRope, index: usize) -> Self {
734        if index >= inner.len() {
735            Default::default()
736        } else {
737            RopeReader {
738                stack: vec![StackElem::Shared(inner, index)],
739                offset: 0,
740            }
741        }
742    }
743
744    /// A shared implementation for reading bytes. This takes the basic
745    /// operations needed for both Read and AsyncRead.
746    fn read_internal(&mut self, want: usize, buf: &mut ReadBuf<'_>) -> usize {
747        let mut remaining = want;
748
749        while remaining > 0 {
750            let bytes = match self.next_internal() {
751                None => break,
752                Some(b) => b,
753            };
754
755            let lower = self.offset;
756            let upper = min(bytes.len(), lower + remaining);
757
758            buf.put_slice(&bytes[self.offset..upper]);
759
760            if upper < bytes.len() {
761                self.offset = upper;
762                self.stack.push(StackElem::Local(bytes))
763            } else {
764                self.offset = 0;
765            }
766            remaining -= upper - lower;
767        }
768
769        want - remaining
770    }
771
772    /// Returns the next item in the iterator without modifying `self.offset`.
773    fn next_internal(&mut self) -> Option<&'a Bytes> {
774        // Iterates the rope's elements recursively until we find the next Local
775        // section, returning its Bytes.
776        loop {
777            let (inner, mut index) = match self.stack.pop() {
778                None => return None,
779                Some(StackElem::Local(b)) => {
780                    debug_assert!(!b.is_empty(), "must not have empty Bytes section");
781                    return Some(b);
782                }
783                Some(StackElem::Shared(r, i)) => (r, i),
784            };
785
786            let el = &inner[index];
787            index += 1;
788            if index < inner.len() {
789                self.stack.push(StackElem::Shared(inner, index));
790            }
791
792            self.stack.push(StackElem::from(el));
793        }
794    }
795}
796
797impl<'a> Iterator for RopeReader<'a> {
798    type Item = &'a Bytes;
799
800    fn next(&mut self) -> Option<Self::Item> {
801        self.offset = 0;
802        self.next_internal()
803    }
804}
805
806impl Read for RopeReader<'_> {
807    fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
808        Ok(self.read_internal(buf.len(), &mut ReadBuf::new(buf)))
809    }
810}
811
812impl AsyncRead for RopeReader<'_> {
813    fn poll_read(
814        self: Pin<&mut Self>,
815        _cx: &mut TaskContext<'_>,
816        buf: &mut ReadBuf<'_>,
817    ) -> Poll<IoResult<()>> {
818        let this = self.get_mut();
819        this.read_internal(buf.remaining(), buf);
820        Poll::Ready(Ok(()))
821    }
822}
823
824impl BufRead for RopeReader<'_> {
825    /// Never returns an error.
826    fn fill_buf(&mut self) -> IoResult<&[u8]> {
827        // Returns the full buffer without coping any data. The same bytes will
828        // continue to be returned until [consume] is called.
829        let bytes = match self.next_internal() {
830            None => return Ok(EMPTY_BUF),
831            Some(b) => b,
832        };
833
834        // This is just so we can get a reference to the asset that is kept alive by the
835        // RopeReader itself. We can then auto-convert that reference into the needed u8
836        // slice reference.
837        self.stack.push(StackElem::Local(bytes));
838        let Some(StackElem::Local(bytes)) = self.stack.last() else {
839            unreachable!()
840        };
841
842        Ok(&bytes[self.offset..])
843    }
844
845    fn consume(&mut self, amt: usize) {
846        if let Some(StackElem::Local(b)) = self.stack.last_mut() {
847            // https://doc.rust-lang.org/std/io/trait.BufRead.html#tymethod.consume
848            debug_assert!(
849                self.offset + amt <= b.len(),
850                "It is a logic error if `amount` exceeds the number of unread bytes in the \
851                 internal buffer, which is returned by `fill_buf`."
852            );
853            // Consume some amount of bytes from the current Bytes instance, ensuring those bytes
854            // are not returned on the next call to `fill_buf`.
855            self.offset += amt;
856            if self.offset == b.len() {
857                // whole Bytes instance was consumed
858                self.stack.pop();
859                self.offset = 0;
860            }
861        }
862    }
863}
864
865impl<'a> Stream for RopeReader<'a> {
866    /// This is efficiently streamable into a [`Hyper::Body`] if each item is cloned into an owned
867    /// `Bytes` instance.
868    type Item = Result<&'a Bytes>;
869
870    /// Returns a "result" of reading the next shared bytes reference. This
871    /// differs from [`Read::read`] by not copying any memory.
872    fn poll_next(self: Pin<&mut Self>, _cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
873        let this = self.get_mut();
874        Poll::Ready(this.next().map(Ok))
875    }
876}
877
878impl<'a> From<&'a RopeElem> for StackElem<'a> {
879    fn from(el: &'a RopeElem) -> Self {
880        match el {
881            Local(bytes) => Self::Local(bytes),
882            Shared(inner) => Self::Shared(inner, 0),
883        }
884    }
885}
886
887#[cfg(test)]
888mod test {
889    use std::{
890        borrow::Cow,
891        cmp::min,
892        io::{BufRead, Read},
893    };
894
895    use anyhow::Result;
896
897    use super::{InnerRope, Rope, RopeBuilder, RopeElem};
898
899    // These are intentionally not exposed, because they do inefficient conversions
900    // in order to fully test cases.
901    impl From<&str> for RopeElem {
902        fn from(value: &str) -> Self {
903            RopeElem::Local(value.to_string().into())
904        }
905    }
906    impl From<Vec<RopeElem>> for RopeElem {
907        fn from(value: Vec<RopeElem>) -> Self {
908            RopeElem::Shared(InnerRope::from(value))
909        }
910    }
911    impl From<Rope> for RopeElem {
912        fn from(value: Rope) -> Self {
913            RopeElem::Shared(value.data)
914        }
915    }
916    impl Rope {
917        fn new(value: Vec<RopeElem>) -> Self {
918            let data = InnerRope::from(value);
919            Rope {
920                length: data.len(),
921                data,
922            }
923        }
924    }
925    impl InnerRope {
926        fn len(&self) -> usize {
927            self.iter().map(|v| v.len()).sum()
928        }
929    }
930    impl RopeElem {
931        fn len(&self) -> usize {
932            match self {
933                RopeElem::Local(b) => b.len(),
934                RopeElem::Shared(r) => r.len(),
935            }
936        }
937    }
938
939    #[test]
940    fn empty_build_without_pushes() {
941        let empty = RopeBuilder::default().build();
942        let mut reader = empty.read();
943        assert!(reader.next().is_none());
944    }
945
946    #[test]
947    fn empty_build_with_empty_static_push() {
948        let mut builder = RopeBuilder::default();
949        builder += "";
950
951        let empty = builder.build();
952        let mut reader = empty.read();
953        assert!(reader.next().is_none());
954    }
955
956    #[test]
957    fn empty_build_with_empty_bytes_push() {
958        let mut builder = RopeBuilder::default();
959        builder.push_bytes(&[]);
960
961        let empty = builder.build();
962        let mut reader = empty.read();
963        assert!(reader.next().is_none());
964    }
965
966    #[test]
967    fn empty_build_with_empty_concat() {
968        let mut builder = RopeBuilder::default();
969        builder += &RopeBuilder::default().build();
970
971        let empty = builder.build();
972        let mut reader = empty.read();
973        assert!(reader.next().is_none());
974    }
975
976    #[test]
977    fn empty_from_empty_static_str() {
978        let empty = Rope::from("");
979        let mut reader = empty.read();
980        assert!(reader.next().is_none());
981    }
982
983    #[test]
984    fn empty_from_empty_string() {
985        let empty = Rope::from("".to_string());
986        let mut reader = empty.read();
987        assert!(reader.next().is_none());
988    }
989
990    #[test]
991    fn empty_equality() {
992        let a = Rope::from("");
993        let b = Rope::from("");
994
995        assert_eq!(a, b);
996    }
997
998    #[test]
999    fn cloned_equality() {
1000        let a = Rope::from("abc");
1001        let b = a.clone();
1002
1003        assert_eq!(a, b);
1004    }
1005
1006    #[test]
1007    fn value_equality() {
1008        let a = Rope::from("abc".to_string());
1009        let b = Rope::from("abc".to_string());
1010
1011        assert_eq!(a, b);
1012    }
1013
1014    #[test]
1015    fn value_inequality() {
1016        let a = Rope::from("abc".to_string());
1017        let b = Rope::from("def".to_string());
1018
1019        assert_ne!(a, b);
1020    }
1021
1022    #[test]
1023    fn value_equality_shared_1() {
1024        let shared = Rope::from("def");
1025        let a = Rope::new(vec!["abc".into(), shared.clone().into(), "ghi".into()]);
1026        let b = Rope::new(vec!["abc".into(), shared.into(), "ghi".into()]);
1027
1028        assert_eq!(a, b);
1029    }
1030
1031    #[test]
1032    fn value_equality_shared_2() {
1033        let a = Rope::new(vec!["abc".into(), vec!["def".into()].into(), "ghi".into()]);
1034        let b = Rope::new(vec!["abc".into(), vec!["def".into()].into(), "ghi".into()]);
1035
1036        assert_eq!(a, b);
1037    }
1038
1039    #[test]
1040    fn value_equality_splits_1() {
1041        let a = Rope::new(vec!["a".into(), "aa".into()]);
1042        let b = Rope::new(vec!["aa".into(), "a".into()]);
1043
1044        assert_eq!(a, b);
1045    }
1046
1047    #[test]
1048    fn value_equality_splits_2() {
1049        let a = Rope::new(vec![vec!["a".into()].into(), "aa".into()]);
1050        let b = Rope::new(vec![vec!["aa".into()].into(), "a".into()]);
1051
1052        assert_eq!(a, b);
1053    }
1054
1055    #[test]
1056    fn value_inequality_shared_1() {
1057        let shared = Rope::from("def");
1058        let a = Rope::new(vec!["aaa".into(), shared.clone().into(), "ghi".into()]);
1059        let b = Rope::new(vec!["bbb".into(), shared.into(), "ghi".into()]);
1060
1061        assert_ne!(a, b);
1062    }
1063
1064    #[test]
1065    fn value_inequality_shared_2() {
1066        let a = Rope::new(vec!["abc".into(), vec!["ddd".into()].into(), "ghi".into()]);
1067        let b = Rope::new(vec!["abc".into(), vec!["eee".into()].into(), "ghi".into()]);
1068
1069        assert_ne!(a, b);
1070    }
1071
1072    #[test]
1073    fn value_inequality_shared_3() {
1074        let shared = Rope::from("def");
1075        let a = Rope::new(vec!["abc".into(), shared.clone().into(), "ggg".into()]);
1076        let b = Rope::new(vec!["abc".into(), shared.into(), "hhh".into()]);
1077
1078        assert_ne!(a, b);
1079    }
1080
1081    #[test]
1082    fn iteration() {
1083        let shared = Rope::from("def");
1084        let rope = Rope::new(vec!["abc".into(), shared.into(), "ghi".into()]);
1085
1086        let chunks = rope.read().collect::<Vec<_>>();
1087
1088        assert_eq!(chunks, vec!["abc", "def", "ghi"]);
1089    }
1090
1091    #[test]
1092    fn read() {
1093        let shared = Rope::from("def");
1094        let rope = Rope::new(vec!["abc".into(), shared.into(), "ghi".into()]);
1095
1096        let mut chunks = vec![];
1097        let mut buf = [0_u8; 2];
1098        let mut reader = rope.read();
1099        loop {
1100            let amt = reader.read(&mut buf).unwrap();
1101            if amt == 0 {
1102                break;
1103            }
1104            chunks.push(Vec::from(&buf[0..amt]));
1105        }
1106
1107        assert_eq!(
1108            chunks,
1109            vec![
1110                Vec::from(*b"ab"),
1111                Vec::from(*b"cd"),
1112                Vec::from(*b"ef"),
1113                Vec::from(*b"gh"),
1114                Vec::from(*b"i")
1115            ]
1116        );
1117    }
1118
1119    #[test]
1120    fn fill_buf() {
1121        let shared = Rope::from("def");
1122        let rope = Rope::new(vec!["abc".into(), shared.into(), "ghi".into()]);
1123
1124        let mut chunks = vec![];
1125        let mut reader = rope.read();
1126        loop {
1127            let buf = reader.fill_buf().unwrap();
1128            if buf.is_empty() {
1129                break;
1130            }
1131            let c = min(2, buf.len());
1132            chunks.push(Vec::from(buf));
1133            reader.consume(c);
1134        }
1135
1136        assert_eq!(
1137            chunks,
1138            // We're receiving a full buf, then only consuming 2 bytes, so we'll still get the
1139            // third.
1140            vec![
1141                Vec::from(*b"abc"),
1142                Vec::from(*b"c"),
1143                Vec::from(*b"def"),
1144                Vec::from(*b"f"),
1145                Vec::from(*b"ghi"),
1146                Vec::from(*b"i")
1147            ]
1148        );
1149    }
1150
1151    #[test]
1152    fn test_to_bytes() -> Result<()> {
1153        let rope = Rope::from("abc");
1154        assert_eq!(rope.to_bytes(), Cow::Borrowed::<[u8]>(&[0x61, 0x62, 0x63]));
1155        Ok(())
1156    }
1157}