risingwave_storage/hummock/sstable/
writer.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use bytes::Bytes;
18use fail::fail_point;
19use foyer::HybridCacheProperties;
20use risingwave_hummock_sdk::HummockSstableObjectId;
21use risingwave_object_store::object::ObjectStreamingUploader;
22use tokio::task::JoinHandle;
23use zstd::zstd_safe::WriteBuf;
24
25use super::multi_builder::UploadJoinHandle;
26use super::{Block, BlockMeta};
27use crate::hummock::utils::MemoryTracker;
28use crate::hummock::{
29    CachePolicy, HummockResult, SstableBlockIndex, SstableBuilderOptions, SstableMeta,
30    SstableStore, SstableStoreRef,
31};
32
33/// A consumer of SST data.
34#[async_trait::async_trait]
35pub trait SstableWriter: Send {
36    type Output;
37
38    /// Write an SST block to the writer.
39    async fn write_block(&mut self, block: &[u8], meta: &BlockMeta) -> HummockResult<()>;
40
41    async fn write_block_bytes(&mut self, block: Bytes, meta: &BlockMeta) -> HummockResult<()>;
42
43    /// Finish writing the SST.
44    async fn finish(self, meta: SstableMeta) -> HummockResult<Self::Output>;
45
46    /// Get the length of data that has already been written.
47    fn data_len(&self) -> usize;
48}
49
50/// Append SST data to a buffer. Used for tests and benchmarks.
51pub struct InMemWriter {
52    buf: Vec<u8>,
53}
54
55impl InMemWriter {
56    pub fn new(capacity: usize) -> Self {
57        Self {
58            buf: Vec::with_capacity(capacity),
59        }
60    }
61}
62
63impl From<&SstableBuilderOptions> for InMemWriter {
64    fn from(options: &SstableBuilderOptions) -> Self {
65        Self::new(options.capacity + options.block_capacity)
66    }
67}
68
69#[async_trait::async_trait]
70impl SstableWriter for InMemWriter {
71    type Output = (Bytes, SstableMeta);
72
73    async fn write_block(&mut self, block: &[u8], _meta: &BlockMeta) -> HummockResult<()> {
74        self.buf.extend_from_slice(block);
75        Ok(())
76    }
77
78    async fn write_block_bytes(&mut self, block: Bytes, _meta: &BlockMeta) -> HummockResult<()> {
79        self.buf.extend_from_slice(&block);
80        Ok(())
81    }
82
83    async fn finish(mut self, meta: SstableMeta) -> HummockResult<Self::Output> {
84        meta.encode_to(&mut self.buf);
85        Ok((Bytes::from(self.buf), meta))
86    }
87
88    fn data_len(&self) -> usize {
89        self.buf.len()
90    }
91}
92
93pub struct SstableWriterOptions {
94    /// Total length of SST data.
95    pub capacity_hint: Option<usize>,
96    pub tracker: Option<MemoryTracker>,
97    pub policy: CachePolicy,
98}
99
100impl Default for SstableWriterOptions {
101    fn default() -> Self {
102        Self {
103            capacity_hint: None,
104            tracker: None,
105            policy: CachePolicy::NotFill,
106        }
107    }
108}
109#[async_trait::async_trait]
110pub trait SstableWriterFactory: Send {
111    type Writer: SstableWriter<Output = UploadJoinHandle>;
112
113    async fn create_sst_writer(
114        &mut self,
115        object_id: impl Into<HummockSstableObjectId> + Send,
116        options: SstableWriterOptions,
117    ) -> HummockResult<Self::Writer>;
118}
119
120pub struct BatchSstableWriterFactory {
121    sstable_store: SstableStoreRef,
122}
123
124impl BatchSstableWriterFactory {
125    pub fn new(sstable_store: SstableStoreRef) -> Self {
126        BatchSstableWriterFactory { sstable_store }
127    }
128}
129
130#[async_trait::async_trait]
131impl SstableWriterFactory for BatchSstableWriterFactory {
132    type Writer = BatchUploadWriter;
133
134    async fn create_sst_writer(
135        &mut self,
136        object_id: impl Into<HummockSstableObjectId> + Send,
137        options: SstableWriterOptions,
138    ) -> HummockResult<Self::Writer> {
139        Ok(BatchUploadWriter::new(
140            object_id,
141            self.sstable_store.clone(),
142            options,
143        ))
144    }
145}
146
147/// Buffer SST data and upload it as a whole on `finish`.
148/// The upload is finished when the returned `JoinHandle` is joined.
149pub struct BatchUploadWriter {
150    object_id: HummockSstableObjectId,
151    sstable_store: SstableStoreRef,
152    policy: CachePolicy,
153    buf: Vec<u8>,
154    block_info: Vec<Block>,
155    tracker: Option<MemoryTracker>,
156}
157
158impl BatchUploadWriter {
159    pub fn new(
160        object_id: impl Into<HummockSstableObjectId>,
161        sstable_store: Arc<SstableStore>,
162        options: SstableWriterOptions,
163    ) -> Self {
164        Self {
165            object_id: object_id.into(),
166            sstable_store,
167            policy: options.policy,
168            buf: Vec::with_capacity(options.capacity_hint.unwrap_or(0)),
169            block_info: Vec::new(),
170            tracker: options.tracker,
171        }
172    }
173}
174
175#[async_trait::async_trait]
176impl SstableWriter for BatchUploadWriter {
177    type Output = JoinHandle<HummockResult<()>>;
178
179    async fn write_block(&mut self, block: &[u8], meta: &BlockMeta) -> HummockResult<()> {
180        self.buf.extend_from_slice(block);
181        if let CachePolicy::Fill(_) = self.policy {
182            self.block_info.push(Block::decode(
183                Bytes::from(block.to_vec()),
184                meta.uncompressed_size as usize,
185            )?);
186        }
187        Ok(())
188    }
189
190    async fn write_block_bytes(&mut self, block: Bytes, meta: &BlockMeta) -> HummockResult<()> {
191        self.buf.extend_from_slice(&block);
192        if let CachePolicy::Fill(_) = self.policy {
193            self.block_info
194                .push(Block::decode(block, meta.uncompressed_size as usize)?);
195        }
196        Ok(())
197    }
198
199    async fn finish(mut self, meta: SstableMeta) -> HummockResult<Self::Output> {
200        fail_point!("data_upload_err");
201        let join_handle = tokio::spawn(async move {
202            meta.encode_to(&mut self.buf);
203            let data = Bytes::from(self.buf);
204            let _tracker = self.tracker.map(|mut t| {
205                if !t.try_increase_memory(data.capacity() as u64) {
206                    tracing::debug!("failed to allocate increase memory for data file, sst object id: {}, file size: {}",
207                                    self.object_id, data.capacity());
208                }
209                t
210            });
211
212            // Upload data to object store.
213            self.sstable_store
214                .clone()
215                .put_sst_data(self.object_id, data)
216                .await?;
217            self.sstable_store.insert_meta_cache(self.object_id, meta);
218
219            // Only update recent filter with sst obj id is okay here, for l0 is only filter by sst obj id with recent filter.
220            if let Some(filter) = self.sstable_store.recent_filter() {
221                filter.insert((self.object_id, usize::MAX));
222            }
223
224            // Add block cache.
225            if let CachePolicy::Fill(hint) = self.policy {
226                // The `block_info` may be empty when there is only range-tombstones, because we
227                //  store them in meta-block.
228                for (block_idx, block) in self.block_info.into_iter().enumerate() {
229                    self.sstable_store.block_cache().insert_with_properties(
230                        SstableBlockIndex {
231                            sst_id: self.object_id,
232                            block_idx: block_idx as _,
233                        },
234                        Box::new(block),
235                        HybridCacheProperties::default().with_hint(hint),
236                    );
237                }
238            }
239            Ok(())
240        });
241        Ok(join_handle)
242    }
243
244    fn data_len(&self) -> usize {
245        self.buf.len()
246    }
247}
248
249pub struct StreamingUploadWriter {
250    object_id: HummockSstableObjectId,
251    sstable_store: SstableStoreRef,
252    policy: CachePolicy,
253    /// Data are uploaded block by block, except for the size footer.
254    object_uploader: ObjectStreamingUploader,
255    /// Compressed blocks to refill block or meta cache. Keep the uncompressed capacity for decode.
256    blocks: Vec<Block>,
257    data_len: usize,
258    tracker: Option<MemoryTracker>,
259}
260
261impl StreamingUploadWriter {
262    pub fn new(
263        object_id: HummockSstableObjectId,
264        sstable_store: SstableStoreRef,
265        object_uploader: ObjectStreamingUploader,
266        options: SstableWriterOptions,
267    ) -> Self {
268        Self {
269            object_id,
270            sstable_store,
271            policy: options.policy,
272            object_uploader,
273            blocks: Vec::new(),
274            data_len: 0,
275            tracker: options.tracker,
276        }
277    }
278}
279
280pub enum UnifiedSstableWriter {
281    StreamingSstableWriter(StreamingUploadWriter),
282    BatchSstableWriter(BatchUploadWriter),
283}
284
285#[async_trait::async_trait]
286impl SstableWriter for StreamingUploadWriter {
287    type Output = JoinHandle<HummockResult<()>>;
288
289    async fn write_block(&mut self, block_data: &[u8], meta: &BlockMeta) -> HummockResult<()> {
290        self.data_len += block_data.len();
291        let block_data = Bytes::from(block_data.to_vec());
292        if let CachePolicy::Fill(_) = self.policy {
293            let block = Block::decode(block_data.clone(), meta.uncompressed_size as usize)?;
294            self.blocks.push(block);
295        }
296        self.object_uploader
297            .write_bytes(block_data)
298            .await
299            .map_err(Into::into)
300    }
301
302    async fn write_block_bytes(&mut self, block: Bytes, meta: &BlockMeta) -> HummockResult<()> {
303        self.data_len += block.len();
304        if let CachePolicy::Fill(_) = self.policy {
305            let block = Block::decode(block.clone(), meta.uncompressed_size as usize)?;
306            self.blocks.push(block);
307        }
308        self.object_uploader
309            .write_bytes(block)
310            .await
311            .map_err(Into::into)
312    }
313
314    async fn finish(mut self, meta: SstableMeta) -> HummockResult<UploadJoinHandle> {
315        let metadata = Bytes::from(meta.encode_to_bytes());
316
317        self.object_uploader.write_bytes(metadata).await?;
318        let join_handle = tokio::spawn(async move {
319            let uploader_memory_usage = self.object_uploader.get_memory_usage();
320            let _tracker = self.tracker.map(|mut t| {
321                    if !t.try_increase_memory(uploader_memory_usage) {
322                        tracing::debug!("failed to allocate increase memory for data file, sst object id: {}, file size: {}",
323                                        self.object_id, uploader_memory_usage);
324                    }
325                    t
326                });
327
328            assert!(!meta.block_metas.is_empty());
329
330            // Upload data to object store.
331            self.object_uploader.finish().await?;
332            // Add meta cache.
333            self.sstable_store.insert_meta_cache(self.object_id, meta);
334
335            // Add block cache.
336            if let CachePolicy::Fill(hint) = self.policy
337                && !self.blocks.is_empty()
338            {
339                for (block_idx, block) in self.blocks.into_iter().enumerate() {
340                    self.sstable_store.block_cache().insert_with_properties(
341                        SstableBlockIndex {
342                            sst_id: self.object_id,
343                            block_idx: block_idx as _,
344                        },
345                        Box::new(block),
346                        HybridCacheProperties::default().with_hint(hint),
347                    );
348                }
349            }
350            Ok(())
351        });
352        Ok(join_handle)
353    }
354
355    fn data_len(&self) -> usize {
356        self.data_len
357    }
358}
359
360pub struct StreamingSstableWriterFactory {
361    sstable_store: SstableStoreRef,
362}
363
364impl StreamingSstableWriterFactory {
365    pub fn new(sstable_store: SstableStoreRef) -> Self {
366        StreamingSstableWriterFactory { sstable_store }
367    }
368}
369pub struct UnifiedSstableWriterFactory {
370    sstable_store: SstableStoreRef,
371}
372
373impl UnifiedSstableWriterFactory {
374    pub fn new(sstable_store: SstableStoreRef) -> Self {
375        UnifiedSstableWriterFactory { sstable_store }
376    }
377}
378
379#[async_trait::async_trait]
380impl SstableWriterFactory for UnifiedSstableWriterFactory {
381    type Writer = UnifiedSstableWriter;
382
383    async fn create_sst_writer(
384        &mut self,
385        object_id: impl Into<HummockSstableObjectId> + Send,
386        options: SstableWriterOptions,
387    ) -> HummockResult<Self::Writer> {
388        let object_id = object_id.into();
389        if self.sstable_store.store().support_streaming_upload() {
390            let path = self.sstable_store.get_sst_data_path(object_id);
391            let uploader = self.sstable_store.create_streaming_uploader(&path).await?;
392            let streaming_uploader_writer = StreamingUploadWriter::new(
393                object_id,
394                self.sstable_store.clone(),
395                uploader,
396                options,
397            );
398
399            Ok(UnifiedSstableWriter::StreamingSstableWriter(
400                streaming_uploader_writer,
401            ))
402        } else {
403            let batch_uploader_writer =
404                BatchUploadWriter::new(object_id, self.sstable_store.clone(), options);
405            Ok(UnifiedSstableWriter::BatchSstableWriter(
406                batch_uploader_writer,
407            ))
408        }
409    }
410}
411
412#[async_trait::async_trait]
413impl SstableWriterFactory for StreamingSstableWriterFactory {
414    type Writer = StreamingUploadWriter;
415
416    async fn create_sst_writer(
417        &mut self,
418        object_id: impl Into<HummockSstableObjectId> + Send,
419        options: SstableWriterOptions,
420    ) -> HummockResult<Self::Writer> {
421        let object_id = object_id.into();
422        let path = self.sstable_store.get_sst_data_path(object_id);
423        let uploader = self.sstable_store.create_streaming_uploader(&path).await?;
424        Ok(StreamingUploadWriter::new(
425            object_id,
426            self.sstable_store.clone(),
427            uploader,
428            options,
429        ))
430    }
431}
432
433#[async_trait::async_trait]
434impl SstableWriter for UnifiedSstableWriter {
435    type Output = JoinHandle<HummockResult<()>>;
436
437    async fn write_block(&mut self, block_data: &[u8], meta: &BlockMeta) -> HummockResult<()> {
438        match self {
439            UnifiedSstableWriter::StreamingSstableWriter(stream) => {
440                stream.write_block(block_data, meta).await
441            }
442            UnifiedSstableWriter::BatchSstableWriter(batch) => {
443                batch.write_block(block_data, meta).await
444            }
445        }
446    }
447
448    async fn write_block_bytes(&mut self, block: Bytes, meta: &BlockMeta) -> HummockResult<()> {
449        match self {
450            UnifiedSstableWriter::StreamingSstableWriter(stream) => {
451                stream.write_block_bytes(block, meta).await
452            }
453            UnifiedSstableWriter::BatchSstableWriter(batch) => {
454                batch.write_block_bytes(block, meta).await
455            }
456        }
457    }
458
459    async fn finish(self, meta: SstableMeta) -> HummockResult<UploadJoinHandle> {
460        match self {
461            UnifiedSstableWriter::StreamingSstableWriter(stream) => stream.finish(meta).await,
462            UnifiedSstableWriter::BatchSstableWriter(batch) => batch.finish(meta).await,
463        }
464    }
465
466    fn data_len(&self) -> usize {
467        match self {
468            UnifiedSstableWriter::StreamingSstableWriter(stream) => stream.data_len(),
469            UnifiedSstableWriter::BatchSstableWriter(batch) => batch.data_len(),
470        }
471    }
472}
473
474#[cfg(test)]
475mod tests {
476
477    use bytes::Bytes;
478    use rand::{Rng, SeedableRng};
479    use risingwave_common::util::iter_util::ZipEqFast;
480
481    use crate::hummock::sstable::VERSION;
482    use crate::hummock::{BlockMeta, InMemWriter, SstableMeta, SstableWriter};
483
484    fn get_sst() -> (Bytes, Vec<Bytes>, SstableMeta) {
485        let mut rng = rand::rngs::StdRng::seed_from_u64(0);
486        let mut buffer: Vec<u8> = vec![0; 5000];
487        rng.fill(&mut buffer[..]);
488        buffer.extend((5_u32).to_le_bytes());
489        let data = Bytes::from(buffer);
490
491        let mut blocks = Vec::with_capacity(5);
492        let mut block_metas = Vec::with_capacity(5);
493        for i in 0..5 {
494            block_metas.push(BlockMeta {
495                smallest_key: Vec::new(),
496                len: 1000,
497                offset: i * 1000,
498                ..Default::default()
499            });
500            blocks.push(data.slice((i * 1000) as usize..((i + 1) * 1000) as usize));
501        }
502        #[expect(deprecated)]
503        let meta = SstableMeta {
504            block_metas,
505            bloom_filter: vec![],
506            estimated_size: 0,
507            key_count: 0,
508            smallest_key: Vec::new(),
509            largest_key: Vec::new(),
510            meta_offset: data.len() as u64,
511            monotonic_tombstone_events: vec![],
512            version: VERSION,
513        };
514
515        (data, blocks, meta)
516    }
517
518    #[tokio::test]
519    async fn test_in_mem_writer() {
520        let (data, blocks, meta) = get_sst();
521        let mut writer = Box::new(InMemWriter::new(0));
522        for (block, meta) in blocks.iter().zip_eq_fast(meta.block_metas.iter()) {
523            writer.write_block(&block[..], meta).await.unwrap();
524        }
525
526        let meta_offset = meta.meta_offset as usize;
527        let (output_data, _) = writer.finish(meta).await.unwrap();
528        assert_eq!(output_data.slice(0..meta_offset), data);
529    }
530}