risingwave_storage/hummock/
block_stream.rs1use std::collections::VecDeque;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicUsize, Ordering};
17
18use bytes::{Bytes, BytesMut};
19use fail::fail_point;
20use risingwave_object_store::object::{MonitoredStreamingReader, ObjectError};
21
22use super::{Block, BlockMeta};
23use crate::hummock::{BlockHolder, HummockResult};
24
25#[async_trait::async_trait]
26pub trait BlockStream: Send + Sync + 'static {
27 async fn next_block(&mut self) -> HummockResult<Option<BlockHolder>>;
30 fn next_block_index(&self) -> usize;
31}
32
33pub struct MemoryUsageTracker {
34 total_usage: Arc<AtomicUsize>,
35 usage: usize,
36}
37
38impl MemoryUsageTracker {
39 pub fn new(total_usage: Arc<AtomicUsize>, usage: usize) -> Self {
40 total_usage.fetch_add(usage, Ordering::SeqCst);
41 Self { total_usage, usage }
42 }
43}
44
45impl Drop for MemoryUsageTracker {
46 fn drop(&mut self) {
47 self.total_usage.fetch_sub(self.usage, Ordering::SeqCst);
48 }
49}
50
51pub struct BlockDataStream {
53 buf_reader: MonitoredStreamingReader,
54
55 block_idx: usize,
60
61 block_metas: Vec<BlockMeta>,
67
68 buf: Bytes,
69
70 buff_offset: usize,
71}
72
73impl BlockDataStream {
74 pub fn new(
80 byte_stream: MonitoredStreamingReader,
82 block_metas: Vec<BlockMeta>,
84 ) -> Self {
85 Self {
86 buf_reader: byte_stream,
87 block_idx: 0,
88 block_metas,
89 buf: Bytes::default(),
90 buff_offset: 0,
91 }
92 }
93
94 pub async fn next_block_impl(&mut self) -> HummockResult<Option<(Bytes, usize)>> {
97 if self.block_idx >= self.block_metas.len() {
98 return Ok(None);
99 }
100
101 let block_meta = &self.block_metas[self.block_idx];
102 fail_point!("stream_read_err", |_| Err(ObjectError::internal(
103 "stream read error"
104 )
105 .into()));
106 let uncompressed_size = block_meta.uncompressed_size as usize;
107 let end = self.buff_offset + block_meta.len as usize;
108 let data = if end > self.buf.len() {
109 let current_block = self.read_next_buf(block_meta.len as usize).await?;
110 self.buff_offset = 0;
111 current_block
112 } else {
113 let data = self.buf.slice(self.buff_offset..end);
114 self.buff_offset = end;
115 data
116 };
117
118 self.block_idx += 1;
119 Ok(Some((data, uncompressed_size)))
120 }
121
122 async fn read_next_buf(&mut self, read_size: usize) -> HummockResult<Bytes> {
123 let mut read_buf = BytesMut::with_capacity(read_size);
124 let start_pos = if self.buff_offset < self.buf.len() {
125 read_buf.extend_from_slice(&self.buf[self.buff_offset..]);
126 self.buf.len() - self.buff_offset
127 } else {
128 0
129 };
130 let mut rest = read_size - start_pos;
131 while rest > 0 {
132 let next_packet = self
133 .buf_reader
134 .read_bytes()
135 .await
136 .unwrap_or_else(|| Err(ObjectError::internal("read unexpected EOF")))?;
137 let read_len = std::cmp::min(next_packet.len(), rest);
138 read_buf.extend_from_slice(&next_packet[..read_len]);
139 rest -= read_len;
140 if rest == 0 {
141 self.buf = next_packet.slice(read_len..);
142 return Ok(read_buf.freeze());
143 }
144 }
145 self.buf = Bytes::default();
146 Ok(read_buf.freeze())
147 }
148
149 pub fn next_block_index(&self) -> usize {
150 self.block_idx
151 }
152
153 pub async fn next_block(&mut self) -> HummockResult<Option<Box<Block>>> {
154 match self.next_block_impl().await? {
155 None => Ok(None),
156 Some((buf, uncompressed_size)) => {
157 Ok(Some(Box::new(Block::decode(buf, uncompressed_size)?)))
158 }
159 }
160 }
161}
162
163pub struct PrefetchBlockStream {
164 blocks: VecDeque<BlockHolder>,
165 block_index: usize,
166 _tracker: Option<MemoryUsageTracker>,
167}
168
169impl PrefetchBlockStream {
170 pub fn new(
171 blocks: VecDeque<BlockHolder>,
172 block_index: usize,
173 _tracker: Option<MemoryUsageTracker>,
174 ) -> Self {
175 Self {
176 blocks,
177 block_index,
178 _tracker,
179 }
180 }
181}
182
183#[async_trait::async_trait]
184impl BlockStream for PrefetchBlockStream {
185 fn next_block_index(&self) -> usize {
186 self.block_index
187 }
188
189 async fn next_block(&mut self) -> HummockResult<Option<BlockHolder>> {
190 if let Some(block) = self.blocks.pop_front() {
191 self.block_index += 1;
192 return Ok(Some(block));
193 }
194 Ok(None)
195 }
196}