risingwave_storage/hummock/
block_stream.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicUsize, Ordering};
18
19use bytes::{Bytes, BytesMut};
20use fail::fail_point;
21use risingwave_object_store::object::{MonitoredStreamingReader, ObjectError};
22
23use super::{Block, BlockMeta};
24use crate::hummock::{BlockHolder, HummockResult};
25
26#[async_trait::async_trait]
27pub trait BlockStream: Send + Sync + 'static {
28    /// Reads the next block from the stream and returns it. Returns `None` if there are no blocks
29    /// left to read.
30    async fn next_block(&mut self) -> HummockResult<Option<BlockHolder>>;
31    fn next_block_index(&self) -> usize;
32}
33
34pub struct MemoryUsageTracker {
35    total_usage: Arc<AtomicUsize>,
36    usage: usize,
37}
38
39impl MemoryUsageTracker {
40    pub fn new(total_usage: Arc<AtomicUsize>, usage: usize) -> Self {
41        total_usage.fetch_add(usage, Ordering::SeqCst);
42        Self { total_usage, usage }
43    }
44}
45
46impl Drop for MemoryUsageTracker {
47    fn drop(&mut self) {
48        self.total_usage.fetch_sub(self.usage, Ordering::SeqCst);
49    }
50}
51
52/// An iterator that reads the blocks of an SST step by step from a given stream of bytes.
53pub struct BlockDataStream {
54    buf_reader: MonitoredStreamingReader,
55
56    /// The index of the next block. Note that `block_idx` is relative to the start index of the
57    /// stream (and is compatible with `block_size_vec`); it is not relative to the corresponding
58    /// SST. That is, if streaming starts at block 2 of a given SST `T`, then `block_idx = 0`
59    /// refers to the third block of `T`.
60    block_idx: usize,
61
62    /// The sizes of each block which the stream reads. The first number states the compressed size
63    /// in the stream. The second number is the block's uncompressed size.  Note that the list does
64    /// not contain the size of blocks which precede the first streamed block. That is, if
65    /// streaming starts at block 2 of a given SST, then the list does not contain information
66    /// about block 0 and block 1.
67    block_metas: Vec<BlockMeta>,
68
69    buf: Bytes,
70
71    buff_offset: usize,
72}
73
74impl BlockDataStream {
75    /// Constructs a new `BlockStream` object that reads from the given `byte_stream` and interprets
76    /// the data as blocks of the SST described in `sst_meta`, starting at block `block_index`.
77    ///
78    /// If `block_index >= sst_meta.block_metas.len()`, then `BlockStream` will not read any data
79    /// from `byte_stream`.
80    pub fn new(
81        // The stream that provides raw data.
82        byte_stream: MonitoredStreamingReader,
83        // Meta data of the SST that is streamed.
84        block_metas: Vec<BlockMeta>,
85    ) -> Self {
86        Self {
87            buf_reader: byte_stream,
88            block_idx: 0,
89            block_metas,
90            buf: Bytes::default(),
91            buff_offset: 0,
92        }
93    }
94
95    /// Reads the next block from the stream and returns it. Returns `None` if there are no blocks
96    /// left to read.
97    pub async fn next_block_impl(&mut self) -> HummockResult<Option<(Bytes, usize)>> {
98        if self.block_idx >= self.block_metas.len() {
99            return Ok(None);
100        }
101
102        let block_meta = &self.block_metas[self.block_idx];
103        fail_point!("stream_read_err", |_| Err(ObjectError::internal(
104            "stream read error"
105        )
106        .into()));
107        let uncompressed_size = block_meta.uncompressed_size as usize;
108        let end = self.buff_offset + block_meta.len as usize;
109        let data = if end > self.buf.len() {
110            let current_block = self.read_next_buf(block_meta.len as usize).await?;
111            self.buff_offset = 0;
112            current_block
113        } else {
114            let data = self.buf.slice(self.buff_offset..end);
115            self.buff_offset = end;
116            data
117        };
118
119        self.block_idx += 1;
120        Ok(Some((data, uncompressed_size)))
121    }
122
123    async fn read_next_buf(&mut self, read_size: usize) -> HummockResult<Bytes> {
124        let mut read_buf = BytesMut::with_capacity(read_size);
125        let start_pos = if self.buff_offset < self.buf.len() {
126            read_buf.extend_from_slice(&self.buf[self.buff_offset..]);
127            self.buf.len() - self.buff_offset
128        } else {
129            0
130        };
131        let mut rest = read_size - start_pos;
132        while rest > 0 {
133            let next_packet = self
134                .buf_reader
135                .read_bytes()
136                .await
137                .unwrap_or_else(|| Err(ObjectError::internal("read unexpected EOF")))?;
138            let read_len = std::cmp::min(next_packet.len(), rest);
139            read_buf.extend_from_slice(&next_packet[..read_len]);
140            rest -= read_len;
141            if rest == 0 {
142                self.buf = next_packet.slice(read_len..);
143                return Ok(read_buf.freeze());
144            }
145        }
146        self.buf = Bytes::default();
147        Ok(read_buf.freeze())
148    }
149
150    pub fn next_block_index(&self) -> usize {
151        self.block_idx
152    }
153
154    pub async fn next_block(&mut self) -> HummockResult<Option<Box<Block>>> {
155        match self.next_block_impl().await? {
156            None => Ok(None),
157            Some((buf, uncompressed_size)) => {
158                Ok(Some(Box::new(Block::decode(buf, uncompressed_size)?)))
159            }
160        }
161    }
162}
163
164pub struct PrefetchBlockStream {
165    blocks: VecDeque<BlockHolder>,
166    block_index: usize,
167    _tracker: Option<MemoryUsageTracker>,
168}
169
170impl PrefetchBlockStream {
171    pub fn new(
172        blocks: VecDeque<BlockHolder>,
173        block_index: usize,
174        _tracker: Option<MemoryUsageTracker>,
175    ) -> Self {
176        Self {
177            blocks,
178            block_index,
179            _tracker,
180        }
181    }
182}
183
184#[async_trait::async_trait]
185impl BlockStream for PrefetchBlockStream {
186    fn next_block_index(&self) -> usize {
187        self.block_index
188    }
189
190    async fn next_block(&mut self) -> HummockResult<Option<BlockHolder>> {
191        if let Some(block) = self.blocks.pop_front() {
192            self.block_index += 1;
193            return Ok(Some(block));
194        }
195        Ok(None)
196    }
197}