risingwave_storage/hummock/
block_stream.rs1use std::collections::VecDeque;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicUsize, Ordering};
18
19use bytes::{Bytes, BytesMut};
20use fail::fail_point;
21use risingwave_object_store::object::{MonitoredStreamingReader, ObjectError};
22
23use super::{Block, BlockMeta};
24use crate::hummock::{BlockHolder, HummockResult};
25
26#[async_trait::async_trait]
27pub trait BlockStream: Send + Sync + 'static {
28 async fn next_block(&mut self) -> HummockResult<Option<BlockHolder>>;
31 fn next_block_index(&self) -> usize;
32}
33
34pub struct MemoryUsageTracker {
35 total_usage: Arc<AtomicUsize>,
36 usage: usize,
37}
38
39impl MemoryUsageTracker {
40 pub fn new(total_usage: Arc<AtomicUsize>, usage: usize) -> Self {
41 total_usage.fetch_add(usage, Ordering::SeqCst);
42 Self { total_usage, usage }
43 }
44}
45
46impl Drop for MemoryUsageTracker {
47 fn drop(&mut self) {
48 self.total_usage.fetch_sub(self.usage, Ordering::SeqCst);
49 }
50}
51
52pub struct BlockDataStream {
54 buf_reader: MonitoredStreamingReader,
55
56 block_idx: usize,
61
62 block_metas: Vec<BlockMeta>,
68
69 buf: Bytes,
70
71 buff_offset: usize,
72}
73
74impl BlockDataStream {
75 pub fn new(
81 byte_stream: MonitoredStreamingReader,
83 block_metas: Vec<BlockMeta>,
85 ) -> Self {
86 Self {
87 buf_reader: byte_stream,
88 block_idx: 0,
89 block_metas,
90 buf: Bytes::default(),
91 buff_offset: 0,
92 }
93 }
94
95 pub async fn next_block_impl(&mut self) -> HummockResult<Option<(Bytes, usize)>> {
98 if self.block_idx >= self.block_metas.len() {
99 return Ok(None);
100 }
101
102 let block_meta = &self.block_metas[self.block_idx];
103 fail_point!("stream_read_err", |_| Err(ObjectError::internal(
104 "stream read error"
105 )
106 .into()));
107 let uncompressed_size = block_meta.uncompressed_size as usize;
108 let end = self.buff_offset + block_meta.len as usize;
109 let data = if end > self.buf.len() {
110 let current_block = self.read_next_buf(block_meta.len as usize).await?;
111 self.buff_offset = 0;
112 current_block
113 } else {
114 let data = self.buf.slice(self.buff_offset..end);
115 self.buff_offset = end;
116 data
117 };
118
119 self.block_idx += 1;
120 Ok(Some((data, uncompressed_size)))
121 }
122
123 async fn read_next_buf(&mut self, read_size: usize) -> HummockResult<Bytes> {
124 let mut read_buf = BytesMut::with_capacity(read_size);
125 let start_pos = if self.buff_offset < self.buf.len() {
126 read_buf.extend_from_slice(&self.buf[self.buff_offset..]);
127 self.buf.len() - self.buff_offset
128 } else {
129 0
130 };
131 let mut rest = read_size - start_pos;
132 while rest > 0 {
133 let next_packet = self
134 .buf_reader
135 .read_bytes()
136 .await
137 .unwrap_or_else(|| Err(ObjectError::internal("read unexpected EOF")))?;
138 let read_len = std::cmp::min(next_packet.len(), rest);
139 read_buf.extend_from_slice(&next_packet[..read_len]);
140 rest -= read_len;
141 if rest == 0 {
142 self.buf = next_packet.slice(read_len..);
143 return Ok(read_buf.freeze());
144 }
145 }
146 self.buf = Bytes::default();
147 Ok(read_buf.freeze())
148 }
149
150 pub fn next_block_index(&self) -> usize {
151 self.block_idx
152 }
153
154 pub async fn next_block(&mut self) -> HummockResult<Option<Box<Block>>> {
155 match self.next_block_impl().await? {
156 None => Ok(None),
157 Some((buf, uncompressed_size)) => {
158 Ok(Some(Box::new(Block::decode(buf, uncompressed_size)?)))
159 }
160 }
161 }
162}
163
164pub struct PrefetchBlockStream {
165 blocks: VecDeque<BlockHolder>,
166 block_index: usize,
167 _tracker: Option<MemoryUsageTracker>,
168}
169
170impl PrefetchBlockStream {
171 pub fn new(
172 blocks: VecDeque<BlockHolder>,
173 block_index: usize,
174 _tracker: Option<MemoryUsageTracker>,
175 ) -> Self {
176 Self {
177 blocks,
178 block_index,
179 _tracker,
180 }
181 }
182}
183
184#[async_trait::async_trait]
185impl BlockStream for PrefetchBlockStream {
186 fn next_block_index(&self) -> usize {
187 self.block_index
188 }
189
190 async fn next_block(&mut self) -> HummockResult<Option<BlockHolder>> {
191 if let Some(block) = self.blocks.pop_front() {
192 self.block_index += 1;
193 return Ok(Some(block));
194 }
195 Ok(None)
196 }
197}