1use std::sync::Arc;
16
17use bytes::Bytes;
18use fail::fail_point;
19use risingwave_hummock_sdk::HummockSstableObjectId;
20use risingwave_object_store::object::ObjectStreamingUploader;
21use tokio::task::JoinHandle;
22use zstd::zstd_safe::WriteBuf;
23
24use super::multi_builder::UploadJoinHandle;
25use super::{Block, BlockMeta};
26use crate::hummock::utils::MemoryTracker;
27use crate::hummock::{
28 CachePolicy, HummockResult, SstableBlockIndex, SstableBuilderOptions, SstableMeta,
29 SstableStore, SstableStoreRef,
30};
31
32#[async_trait::async_trait]
34pub trait SstableWriter: Send {
35 type Output;
36
37 async fn write_block(&mut self, block: &[u8], meta: &BlockMeta) -> HummockResult<()>;
39
40 async fn write_block_bytes(&mut self, block: Bytes, meta: &BlockMeta) -> HummockResult<()>;
41
42 async fn finish(self, meta: SstableMeta) -> HummockResult<Self::Output>;
44
45 fn data_len(&self) -> usize;
47}
48
49pub struct InMemWriter {
51 buf: Vec<u8>,
52}
53
54impl InMemWriter {
55 pub fn new(capacity: usize) -> Self {
56 Self {
57 buf: Vec::with_capacity(capacity),
58 }
59 }
60}
61
62impl From<&SstableBuilderOptions> for InMemWriter {
63 fn from(options: &SstableBuilderOptions) -> Self {
64 Self::new(options.capacity + options.block_capacity)
65 }
66}
67
68#[async_trait::async_trait]
69impl SstableWriter for InMemWriter {
70 type Output = (Bytes, SstableMeta);
71
72 async fn write_block(&mut self, block: &[u8], _meta: &BlockMeta) -> HummockResult<()> {
73 self.buf.extend_from_slice(block);
74 Ok(())
75 }
76
77 async fn write_block_bytes(&mut self, block: Bytes, _meta: &BlockMeta) -> HummockResult<()> {
78 self.buf.extend_from_slice(&block);
79 Ok(())
80 }
81
82 async fn finish(mut self, meta: SstableMeta) -> HummockResult<Self::Output> {
83 meta.encode_to(&mut self.buf);
84 Ok((Bytes::from(self.buf), meta))
85 }
86
87 fn data_len(&self) -> usize {
88 self.buf.len()
89 }
90}
91
92pub struct SstableWriterOptions {
93 pub capacity_hint: Option<usize>,
95 pub tracker: Option<MemoryTracker>,
96 pub policy: CachePolicy,
97}
98
99impl Default for SstableWriterOptions {
100 fn default() -> Self {
101 Self {
102 capacity_hint: None,
103 tracker: None,
104 policy: CachePolicy::NotFill,
105 }
106 }
107}
108#[async_trait::async_trait]
109pub trait SstableWriterFactory: Send {
110 type Writer: SstableWriter<Output = UploadJoinHandle>;
111
112 async fn create_sst_writer(
113 &mut self,
114 object_id: HummockSstableObjectId,
115 options: SstableWriterOptions,
116 ) -> HummockResult<Self::Writer>;
117}
118
119pub struct BatchSstableWriterFactory {
120 sstable_store: SstableStoreRef,
121}
122
123impl BatchSstableWriterFactory {
124 pub fn new(sstable_store: SstableStoreRef) -> Self {
125 BatchSstableWriterFactory { sstable_store }
126 }
127}
128
129#[async_trait::async_trait]
130impl SstableWriterFactory for BatchSstableWriterFactory {
131 type Writer = BatchUploadWriter;
132
133 async fn create_sst_writer(
134 &mut self,
135 object_id: HummockSstableObjectId,
136 options: SstableWriterOptions,
137 ) -> HummockResult<Self::Writer> {
138 Ok(BatchUploadWriter::new(
139 object_id,
140 self.sstable_store.clone(),
141 options,
142 ))
143 }
144}
145
146pub struct BatchUploadWriter {
149 object_id: HummockSstableObjectId,
150 sstable_store: SstableStoreRef,
151 policy: CachePolicy,
152 buf: Vec<u8>,
153 block_info: Vec<Block>,
154 tracker: Option<MemoryTracker>,
155}
156
157impl BatchUploadWriter {
158 pub fn new(
159 object_id: HummockSstableObjectId,
160 sstable_store: Arc<SstableStore>,
161 options: SstableWriterOptions,
162 ) -> Self {
163 Self {
164 object_id,
165 sstable_store,
166 policy: options.policy,
167 buf: Vec::with_capacity(options.capacity_hint.unwrap_or(0)),
168 block_info: Vec::new(),
169 tracker: options.tracker,
170 }
171 }
172}
173
174#[async_trait::async_trait]
175impl SstableWriter for BatchUploadWriter {
176 type Output = JoinHandle<HummockResult<()>>;
177
178 async fn write_block(&mut self, block: &[u8], meta: &BlockMeta) -> HummockResult<()> {
179 self.buf.extend_from_slice(block);
180 if let CachePolicy::Fill(_) = self.policy {
181 self.block_info.push(Block::decode(
182 Bytes::from(block.to_vec()),
183 meta.uncompressed_size as usize,
184 )?);
185 }
186 Ok(())
187 }
188
189 async fn write_block_bytes(&mut self, block: Bytes, meta: &BlockMeta) -> HummockResult<()> {
190 self.buf.extend_from_slice(&block);
191 if let CachePolicy::Fill(_) = self.policy {
192 self.block_info
193 .push(Block::decode(block, meta.uncompressed_size as usize)?);
194 }
195 Ok(())
196 }
197
198 async fn finish(mut self, meta: SstableMeta) -> HummockResult<Self::Output> {
199 fail_point!("data_upload_err");
200 let join_handle = tokio::spawn(async move {
201 meta.encode_to(&mut self.buf);
202 let data = Bytes::from(self.buf);
203 let _tracker = self.tracker.map(|mut t| {
204 if !t.try_increase_memory(data.capacity() as u64) {
205 tracing::debug!("failed to allocate increase memory for data file, sst object id: {}, file size: {}",
206 self.object_id, data.capacity());
207 }
208 t
209 });
210
211 self.sstable_store
213 .clone()
214 .put_sst_data(self.object_id, data)
215 .await?;
216 self.sstable_store.insert_meta_cache(self.object_id, meta);
217
218 if let Some(filter) = self.sstable_store.recent_filter() {
220 filter.insert((self.object_id, usize::MAX));
221 }
222
223 if let CachePolicy::Fill(fill_cache_priority) = self.policy {
225 for (block_idx, block) in self.block_info.into_iter().enumerate() {
228 self.sstable_store.block_cache().insert_with_hint(
229 SstableBlockIndex {
230 sst_id: self.object_id,
231 block_idx: block_idx as _,
232 },
233 Box::new(block),
234 fill_cache_priority,
235 );
236 }
237 }
238 Ok(())
239 });
240 Ok(join_handle)
241 }
242
243 fn data_len(&self) -> usize {
244 self.buf.len()
245 }
246}
247
248pub struct StreamingUploadWriter {
249 object_id: HummockSstableObjectId,
250 sstable_store: SstableStoreRef,
251 policy: CachePolicy,
252 object_uploader: ObjectStreamingUploader,
254 blocks: Vec<Block>,
256 data_len: usize,
257 tracker: Option<MemoryTracker>,
258}
259
260impl StreamingUploadWriter {
261 pub fn new(
262 object_id: HummockSstableObjectId,
263 sstable_store: SstableStoreRef,
264 object_uploader: ObjectStreamingUploader,
265 options: SstableWriterOptions,
266 ) -> Self {
267 Self {
268 object_id,
269 sstable_store,
270 policy: options.policy,
271 object_uploader,
272 blocks: Vec::new(),
273 data_len: 0,
274 tracker: options.tracker,
275 }
276 }
277}
278
279pub enum UnifiedSstableWriter {
280 StreamingSstableWriter(StreamingUploadWriter),
281 BatchSstableWriter(BatchUploadWriter),
282}
283
284#[async_trait::async_trait]
285impl SstableWriter for StreamingUploadWriter {
286 type Output = JoinHandle<HummockResult<()>>;
287
288 async fn write_block(&mut self, block_data: &[u8], meta: &BlockMeta) -> HummockResult<()> {
289 self.data_len += block_data.len();
290 let block_data = Bytes::from(block_data.to_vec());
291 if let CachePolicy::Fill(_) = self.policy {
292 let block = Block::decode(block_data.clone(), meta.uncompressed_size as usize)?;
293 self.blocks.push(block);
294 }
295 self.object_uploader
296 .write_bytes(block_data)
297 .await
298 .map_err(Into::into)
299 }
300
301 async fn write_block_bytes(&mut self, block: Bytes, meta: &BlockMeta) -> HummockResult<()> {
302 self.data_len += block.len();
303 if let CachePolicy::Fill(_) = self.policy {
304 let block = Block::decode(block.clone(), meta.uncompressed_size as usize)?;
305 self.blocks.push(block);
306 }
307 self.object_uploader
308 .write_bytes(block)
309 .await
310 .map_err(Into::into)
311 }
312
313 async fn finish(mut self, meta: SstableMeta) -> HummockResult<UploadJoinHandle> {
314 let metadata = Bytes::from(meta.encode_to_bytes());
315
316 self.object_uploader.write_bytes(metadata).await?;
317 let join_handle = tokio::spawn(async move {
318 let uploader_memory_usage = self.object_uploader.get_memory_usage();
319 let _tracker = self.tracker.map(|mut t| {
320 if !t.try_increase_memory(uploader_memory_usage) {
321 tracing::debug!("failed to allocate increase memory for data file, sst object id: {}, file size: {}",
322 self.object_id, uploader_memory_usage);
323 }
324 t
325 });
326
327 assert!(!meta.block_metas.is_empty());
328
329 self.object_uploader.finish().await?;
331 self.sstable_store.insert_meta_cache(self.object_id, meta);
333
334 if let CachePolicy::Fill(fill_high_priority_cache) = self.policy
336 && !self.blocks.is_empty()
337 {
338 for (block_idx, block) in self.blocks.into_iter().enumerate() {
339 self.sstable_store.block_cache().insert_with_hint(
340 SstableBlockIndex {
341 sst_id: self.object_id,
342 block_idx: block_idx as _,
343 },
344 Box::new(block),
345 fill_high_priority_cache,
346 );
347 }
348 }
349 Ok(())
350 });
351 Ok(join_handle)
352 }
353
354 fn data_len(&self) -> usize {
355 self.data_len
356 }
357}
358
359pub struct StreamingSstableWriterFactory {
360 sstable_store: SstableStoreRef,
361}
362
363impl StreamingSstableWriterFactory {
364 pub fn new(sstable_store: SstableStoreRef) -> Self {
365 StreamingSstableWriterFactory { sstable_store }
366 }
367}
368pub struct UnifiedSstableWriterFactory {
369 sstable_store: SstableStoreRef,
370}
371
372impl UnifiedSstableWriterFactory {
373 pub fn new(sstable_store: SstableStoreRef) -> Self {
374 UnifiedSstableWriterFactory { sstable_store }
375 }
376}
377
378#[async_trait::async_trait]
379impl SstableWriterFactory for UnifiedSstableWriterFactory {
380 type Writer = UnifiedSstableWriter;
381
382 async fn create_sst_writer(
383 &mut self,
384 object_id: HummockSstableObjectId,
385 options: SstableWriterOptions,
386 ) -> HummockResult<Self::Writer> {
387 if self.sstable_store.store().support_streaming_upload() {
388 let path = self.sstable_store.get_sst_data_path(object_id);
389 let uploader = self.sstable_store.create_streaming_uploader(&path).await?;
390 let streaming_uploader_writer = StreamingUploadWriter::new(
391 object_id,
392 self.sstable_store.clone(),
393 uploader,
394 options,
395 );
396
397 Ok(UnifiedSstableWriter::StreamingSstableWriter(
398 streaming_uploader_writer,
399 ))
400 } else {
401 let batch_uploader_writer =
402 BatchUploadWriter::new(object_id, self.sstable_store.clone(), options);
403 Ok(UnifiedSstableWriter::BatchSstableWriter(
404 batch_uploader_writer,
405 ))
406 }
407 }
408}
409
410#[async_trait::async_trait]
411impl SstableWriterFactory for StreamingSstableWriterFactory {
412 type Writer = StreamingUploadWriter;
413
414 async fn create_sst_writer(
415 &mut self,
416 object_id: HummockSstableObjectId,
417 options: SstableWriterOptions,
418 ) -> HummockResult<Self::Writer> {
419 let path = self.sstable_store.get_sst_data_path(object_id);
420 let uploader = self.sstable_store.create_streaming_uploader(&path).await?;
421 Ok(StreamingUploadWriter::new(
422 object_id,
423 self.sstable_store.clone(),
424 uploader,
425 options,
426 ))
427 }
428}
429
430#[async_trait::async_trait]
431impl SstableWriter for UnifiedSstableWriter {
432 type Output = JoinHandle<HummockResult<()>>;
433
434 async fn write_block(&mut self, block_data: &[u8], meta: &BlockMeta) -> HummockResult<()> {
435 match self {
436 UnifiedSstableWriter::StreamingSstableWriter(stream) => {
437 stream.write_block(block_data, meta).await
438 }
439 UnifiedSstableWriter::BatchSstableWriter(batch) => {
440 batch.write_block(block_data, meta).await
441 }
442 }
443 }
444
445 async fn write_block_bytes(&mut self, block: Bytes, meta: &BlockMeta) -> HummockResult<()> {
446 match self {
447 UnifiedSstableWriter::StreamingSstableWriter(stream) => {
448 stream.write_block_bytes(block, meta).await
449 }
450 UnifiedSstableWriter::BatchSstableWriter(batch) => {
451 batch.write_block_bytes(block, meta).await
452 }
453 }
454 }
455
456 async fn finish(self, meta: SstableMeta) -> HummockResult<UploadJoinHandle> {
457 match self {
458 UnifiedSstableWriter::StreamingSstableWriter(stream) => stream.finish(meta).await,
459 UnifiedSstableWriter::BatchSstableWriter(batch) => batch.finish(meta).await,
460 }
461 }
462
463 fn data_len(&self) -> usize {
464 match self {
465 UnifiedSstableWriter::StreamingSstableWriter(stream) => stream.data_len(),
466 UnifiedSstableWriter::BatchSstableWriter(batch) => batch.data_len(),
467 }
468 }
469}
470
471#[cfg(test)]
472mod tests {
473
474 use bytes::Bytes;
475 use rand::{Rng, SeedableRng};
476 use risingwave_common::util::iter_util::ZipEqFast;
477
478 use crate::hummock::sstable::VERSION;
479 use crate::hummock::{BlockMeta, InMemWriter, SstableMeta, SstableWriter};
480
481 fn get_sst() -> (Bytes, Vec<Bytes>, SstableMeta) {
482 let mut rng = rand::rngs::StdRng::seed_from_u64(0);
483 let mut buffer: Vec<u8> = vec![0; 5000];
484 rng.fill(&mut buffer[..]);
485 buffer.extend((5_u32).to_le_bytes());
486 let data = Bytes::from(buffer);
487
488 let mut blocks = Vec::with_capacity(5);
489 let mut block_metas = Vec::with_capacity(5);
490 for i in 0..5 {
491 block_metas.push(BlockMeta {
492 smallest_key: Vec::new(),
493 len: 1000,
494 offset: i * 1000,
495 ..Default::default()
496 });
497 blocks.push(data.slice((i * 1000) as usize..((i + 1) * 1000) as usize));
498 }
499 #[expect(deprecated)]
500 let meta = SstableMeta {
501 block_metas,
502 bloom_filter: vec![],
503 estimated_size: 0,
504 key_count: 0,
505 smallest_key: Vec::new(),
506 largest_key: Vec::new(),
507 meta_offset: data.len() as u64,
508 monotonic_tombstone_events: vec![],
509 version: VERSION,
510 };
511
512 (data, blocks, meta)
513 }
514
515 #[tokio::test]
516 async fn test_in_mem_writer() {
517 let (data, blocks, meta) = get_sst();
518 let mut writer = Box::new(InMemWriter::new(0));
519 for (block, meta) in blocks.iter().zip_eq_fast(meta.block_metas.iter()) {
520 writer.write_block(&block[..], meta).await.unwrap();
521 }
522
523 let meta_offset = meta.meta_offset as usize;
524 let (output_data, _) = writer.finish(meta).await.unwrap();
525 assert_eq!(output_data.slice(0..meta_offset), data);
526 }
527}