risingwave_storage/hummock/
block_stream.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::collections::VecDeque;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicUsize, Ordering};
17
18use bytes::{Bytes, BytesMut};
19use fail::fail_point;
20use risingwave_object_store::object::{MonitoredStreamingReader, ObjectError};
21
22use super::{Block, BlockMeta};
23use crate::hummock::{BlockHolder, HummockResult};
24
25#[async_trait::async_trait]
26pub trait BlockStream: Send + Sync + 'static {
27    /// Reads the next block from the stream and returns it. Returns `None` if there are no blocks
28    /// left to read.
29    async fn next_block(&mut self) -> HummockResult<Option<BlockHolder>>;
30    fn next_block_index(&self) -> usize;
31}
32
33pub struct MemoryUsageTracker {
34    total_usage: Arc<AtomicUsize>,
35    usage: usize,
36}
37
38impl MemoryUsageTracker {
39    pub fn new(total_usage: Arc<AtomicUsize>, usage: usize) -> Self {
40        total_usage.fetch_add(usage, Ordering::SeqCst);
41        Self { total_usage, usage }
42    }
43}
44
45impl Drop for MemoryUsageTracker {
46    fn drop(&mut self) {
47        self.total_usage.fetch_sub(self.usage, Ordering::SeqCst);
48    }
49}
50
51/// An iterator that reads the blocks of an SST step by step from a given stream of bytes.
52pub struct BlockDataStream {
53    buf_reader: MonitoredStreamingReader,
54
55    /// The index of the next block. Note that `block_idx` is relative to the start index of the
56    /// stream (and is compatible with `block_size_vec`); it is not relative to the corresponding
57    /// SST. That is, if streaming starts at block 2 of a given SST `T`, then `block_idx = 0`
58    /// refers to the third block of `T`.
59    block_idx: usize,
60
61    /// The sizes of each block which the stream reads. The first number states the compressed size
62    /// in the stream. The second number is the block's uncompressed size.  Note that the list does
63    /// not contain the size of blocks which precede the first streamed block. That is, if
64    /// streaming starts at block 2 of a given SST, then the list does not contain information
65    /// about block 0 and block 1.
66    block_metas: Vec<BlockMeta>,
67
68    buf: Bytes,
69
70    buff_offset: usize,
71}
72
73impl BlockDataStream {
74    /// Constructs a new `BlockStream` object that reads from the given `byte_stream` and interprets
75    /// the data as blocks of the SST described in `sst_meta`, starting at block `block_index`.
76    ///
77    /// If `block_index >= sst_meta.block_metas.len()`, then `BlockStream` will not read any data
78    /// from `byte_stream`.
79    pub fn new(
80        // The stream that provides raw data.
81        byte_stream: MonitoredStreamingReader,
82        // Meta data of the SST that is streamed.
83        block_metas: Vec<BlockMeta>,
84    ) -> Self {
85        Self {
86            buf_reader: byte_stream,
87            block_idx: 0,
88            block_metas,
89            buf: Bytes::default(),
90            buff_offset: 0,
91        }
92    }
93
94    /// Reads the next block from the stream and returns it. Returns `None` if there are no blocks
95    /// left to read.
96    pub async fn next_block_impl(&mut self) -> HummockResult<Option<(Bytes, usize)>> {
97        if self.block_idx >= self.block_metas.len() {
98            return Ok(None);
99        }
100
101        let block_meta = &self.block_metas[self.block_idx];
102        fail_point!("stream_read_err", |_| Err(ObjectError::internal(
103            "stream read error"
104        )
105        .into()));
106        let uncompressed_size = block_meta.uncompressed_size as usize;
107        let end = self.buff_offset + block_meta.len as usize;
108        let data = if end > self.buf.len() {
109            let current_block = self.read_next_buf(block_meta.len as usize).await?;
110            self.buff_offset = 0;
111            current_block
112        } else {
113            let data = self.buf.slice(self.buff_offset..end);
114            self.buff_offset = end;
115            data
116        };
117
118        self.block_idx += 1;
119        Ok(Some((data, uncompressed_size)))
120    }
121
122    async fn read_next_buf(&mut self, read_size: usize) -> HummockResult<Bytes> {
123        let mut read_buf = BytesMut::with_capacity(read_size);
124        let start_pos = if self.buff_offset < self.buf.len() {
125            read_buf.extend_from_slice(&self.buf[self.buff_offset..]);
126            self.buf.len() - self.buff_offset
127        } else {
128            0
129        };
130        let mut rest = read_size - start_pos;
131        while rest > 0 {
132            let next_packet = self
133                .buf_reader
134                .read_bytes()
135                .await
136                .unwrap_or_else(|| Err(ObjectError::internal("read unexpected EOF")))?;
137            let read_len = std::cmp::min(next_packet.len(), rest);
138            read_buf.extend_from_slice(&next_packet[..read_len]);
139            rest -= read_len;
140            if rest == 0 {
141                self.buf = next_packet.slice(read_len..);
142                return Ok(read_buf.freeze());
143            }
144        }
145        self.buf = Bytes::default();
146        Ok(read_buf.freeze())
147    }
148
149    pub fn next_block_index(&self) -> usize {
150        self.block_idx
151    }
152
153    pub async fn next_block(&mut self) -> HummockResult<Option<Box<Block>>> {
154        match self.next_block_impl().await? {
155            None => Ok(None),
156            Some((buf, uncompressed_size)) => {
157                Ok(Some(Box::new(Block::decode(buf, uncompressed_size)?)))
158            }
159        }
160    }
161}
162
163pub struct PrefetchBlockStream {
164    blocks: VecDeque<BlockHolder>,
165    block_index: usize,
166    _tracker: Option<MemoryUsageTracker>,
167}
168
169impl PrefetchBlockStream {
170    pub fn new(
171        blocks: VecDeque<BlockHolder>,
172        block_index: usize,
173        _tracker: Option<MemoryUsageTracker>,
174    ) -> Self {
175        Self {
176            blocks,
177            block_index,
178            _tracker,
179        }
180    }
181}
182
183#[async_trait::async_trait]
184impl BlockStream for PrefetchBlockStream {
185    fn next_block_index(&self) -> usize {
186        self.block_index
187    }
188
189    async fn next_block(&mut self) -> HummockResult<Option<BlockHolder>> {
190        if let Some(block) = self.blocks.pop_front() {
191            self.block_index += 1;
192            return Ok(Some(block));
193        }
194        Ok(None)
195    }
196}