risingwave_storage/hummock/sstable/
writer.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use bytes::Bytes;
18use fail::fail_point;
19use risingwave_hummock_sdk::HummockSstableObjectId;
20use risingwave_object_store::object::ObjectStreamingUploader;
21use tokio::task::JoinHandle;
22use zstd::zstd_safe::WriteBuf;
23
24use super::multi_builder::UploadJoinHandle;
25use super::{Block, BlockMeta};
26use crate::hummock::utils::MemoryTracker;
27use crate::hummock::{
28    CachePolicy, HummockResult, SstableBlockIndex, SstableBuilderOptions, SstableMeta,
29    SstableStore, SstableStoreRef,
30};
31
32/// A consumer of SST data.
33#[async_trait::async_trait]
34pub trait SstableWriter: Send {
35    type Output;
36
37    /// Write an SST block to the writer.
38    async fn write_block(&mut self, block: &[u8], meta: &BlockMeta) -> HummockResult<()>;
39
40    async fn write_block_bytes(&mut self, block: Bytes, meta: &BlockMeta) -> HummockResult<()>;
41
42    /// Finish writing the SST.
43    async fn finish(self, meta: SstableMeta) -> HummockResult<Self::Output>;
44
45    /// Get the length of data that has already been written.
46    fn data_len(&self) -> usize;
47}
48
49/// Append SST data to a buffer. Used for tests and benchmarks.
50pub struct InMemWriter {
51    buf: Vec<u8>,
52}
53
54impl InMemWriter {
55    pub fn new(capacity: usize) -> Self {
56        Self {
57            buf: Vec::with_capacity(capacity),
58        }
59    }
60}
61
62impl From<&SstableBuilderOptions> for InMemWriter {
63    fn from(options: &SstableBuilderOptions) -> Self {
64        Self::new(options.capacity + options.block_capacity)
65    }
66}
67
68#[async_trait::async_trait]
69impl SstableWriter for InMemWriter {
70    type Output = (Bytes, SstableMeta);
71
72    async fn write_block(&mut self, block: &[u8], _meta: &BlockMeta) -> HummockResult<()> {
73        self.buf.extend_from_slice(block);
74        Ok(())
75    }
76
77    async fn write_block_bytes(&mut self, block: Bytes, _meta: &BlockMeta) -> HummockResult<()> {
78        self.buf.extend_from_slice(&block);
79        Ok(())
80    }
81
82    async fn finish(mut self, meta: SstableMeta) -> HummockResult<Self::Output> {
83        meta.encode_to(&mut self.buf);
84        Ok((Bytes::from(self.buf), meta))
85    }
86
87    fn data_len(&self) -> usize {
88        self.buf.len()
89    }
90}
91
92pub struct SstableWriterOptions {
93    /// Total length of SST data.
94    pub capacity_hint: Option<usize>,
95    pub tracker: Option<MemoryTracker>,
96    pub policy: CachePolicy,
97}
98
99impl Default for SstableWriterOptions {
100    fn default() -> Self {
101        Self {
102            capacity_hint: None,
103            tracker: None,
104            policy: CachePolicy::NotFill,
105        }
106    }
107}
108#[async_trait::async_trait]
109pub trait SstableWriterFactory: Send {
110    type Writer: SstableWriter<Output = UploadJoinHandle>;
111
112    async fn create_sst_writer(
113        &mut self,
114        object_id: HummockSstableObjectId,
115        options: SstableWriterOptions,
116    ) -> HummockResult<Self::Writer>;
117}
118
119pub struct BatchSstableWriterFactory {
120    sstable_store: SstableStoreRef,
121}
122
123impl BatchSstableWriterFactory {
124    pub fn new(sstable_store: SstableStoreRef) -> Self {
125        BatchSstableWriterFactory { sstable_store }
126    }
127}
128
129#[async_trait::async_trait]
130impl SstableWriterFactory for BatchSstableWriterFactory {
131    type Writer = BatchUploadWriter;
132
133    async fn create_sst_writer(
134        &mut self,
135        object_id: HummockSstableObjectId,
136        options: SstableWriterOptions,
137    ) -> HummockResult<Self::Writer> {
138        Ok(BatchUploadWriter::new(
139            object_id,
140            self.sstable_store.clone(),
141            options,
142        ))
143    }
144}
145
146/// Buffer SST data and upload it as a whole on `finish`.
147/// The upload is finished when the returned `JoinHandle` is joined.
148pub struct BatchUploadWriter {
149    object_id: HummockSstableObjectId,
150    sstable_store: SstableStoreRef,
151    policy: CachePolicy,
152    buf: Vec<u8>,
153    block_info: Vec<Block>,
154    tracker: Option<MemoryTracker>,
155}
156
157impl BatchUploadWriter {
158    pub fn new(
159        object_id: HummockSstableObjectId,
160        sstable_store: Arc<SstableStore>,
161        options: SstableWriterOptions,
162    ) -> Self {
163        Self {
164            object_id,
165            sstable_store,
166            policy: options.policy,
167            buf: Vec::with_capacity(options.capacity_hint.unwrap_or(0)),
168            block_info: Vec::new(),
169            tracker: options.tracker,
170        }
171    }
172}
173
174#[async_trait::async_trait]
175impl SstableWriter for BatchUploadWriter {
176    type Output = JoinHandle<HummockResult<()>>;
177
178    async fn write_block(&mut self, block: &[u8], meta: &BlockMeta) -> HummockResult<()> {
179        self.buf.extend_from_slice(block);
180        if let CachePolicy::Fill(_) = self.policy {
181            self.block_info.push(Block::decode(
182                Bytes::from(block.to_vec()),
183                meta.uncompressed_size as usize,
184            )?);
185        }
186        Ok(())
187    }
188
189    async fn write_block_bytes(&mut self, block: Bytes, meta: &BlockMeta) -> HummockResult<()> {
190        self.buf.extend_from_slice(&block);
191        if let CachePolicy::Fill(_) = self.policy {
192            self.block_info
193                .push(Block::decode(block, meta.uncompressed_size as usize)?);
194        }
195        Ok(())
196    }
197
198    async fn finish(mut self, meta: SstableMeta) -> HummockResult<Self::Output> {
199        fail_point!("data_upload_err");
200        let join_handle = tokio::spawn(async move {
201            meta.encode_to(&mut self.buf);
202            let data = Bytes::from(self.buf);
203            let _tracker = self.tracker.map(|mut t| {
204                if !t.try_increase_memory(data.capacity() as u64) {
205                    tracing::debug!("failed to allocate increase memory for data file, sst object id: {}, file size: {}",
206                                    self.object_id, data.capacity());
207                }
208                t
209            });
210
211            // Upload data to object store.
212            self.sstable_store
213                .clone()
214                .put_sst_data(self.object_id, data)
215                .await?;
216            self.sstable_store.insert_meta_cache(self.object_id, meta);
217
218            // Only update recent filter with sst obj id is okay here, for l0 is only filter by sst obj id with recent filter.
219            if let Some(filter) = self.sstable_store.recent_filter() {
220                filter.insert((self.object_id, usize::MAX));
221            }
222
223            // Add block cache.
224            if let CachePolicy::Fill(fill_cache_priority) = self.policy {
225                // The `block_info` may be empty when there is only range-tombstones, because we
226                //  store them in meta-block.
227                for (block_idx, block) in self.block_info.into_iter().enumerate() {
228                    self.sstable_store.block_cache().insert_with_hint(
229                        SstableBlockIndex {
230                            sst_id: self.object_id,
231                            block_idx: block_idx as _,
232                        },
233                        Box::new(block),
234                        fill_cache_priority,
235                    );
236                }
237            }
238            Ok(())
239        });
240        Ok(join_handle)
241    }
242
243    fn data_len(&self) -> usize {
244        self.buf.len()
245    }
246}
247
248pub struct StreamingUploadWriter {
249    object_id: HummockSstableObjectId,
250    sstable_store: SstableStoreRef,
251    policy: CachePolicy,
252    /// Data are uploaded block by block, except for the size footer.
253    object_uploader: ObjectStreamingUploader,
254    /// Compressed blocks to refill block or meta cache. Keep the uncompressed capacity for decode.
255    blocks: Vec<Block>,
256    data_len: usize,
257    tracker: Option<MemoryTracker>,
258}
259
260impl StreamingUploadWriter {
261    pub fn new(
262        object_id: HummockSstableObjectId,
263        sstable_store: SstableStoreRef,
264        object_uploader: ObjectStreamingUploader,
265        options: SstableWriterOptions,
266    ) -> Self {
267        Self {
268            object_id,
269            sstable_store,
270            policy: options.policy,
271            object_uploader,
272            blocks: Vec::new(),
273            data_len: 0,
274            tracker: options.tracker,
275        }
276    }
277}
278
279pub enum UnifiedSstableWriter {
280    StreamingSstableWriter(StreamingUploadWriter),
281    BatchSstableWriter(BatchUploadWriter),
282}
283
284#[async_trait::async_trait]
285impl SstableWriter for StreamingUploadWriter {
286    type Output = JoinHandle<HummockResult<()>>;
287
288    async fn write_block(&mut self, block_data: &[u8], meta: &BlockMeta) -> HummockResult<()> {
289        self.data_len += block_data.len();
290        let block_data = Bytes::from(block_data.to_vec());
291        if let CachePolicy::Fill(_) = self.policy {
292            let block = Block::decode(block_data.clone(), meta.uncompressed_size as usize)?;
293            self.blocks.push(block);
294        }
295        self.object_uploader
296            .write_bytes(block_data)
297            .await
298            .map_err(Into::into)
299    }
300
301    async fn write_block_bytes(&mut self, block: Bytes, meta: &BlockMeta) -> HummockResult<()> {
302        self.data_len += block.len();
303        if let CachePolicy::Fill(_) = self.policy {
304            let block = Block::decode(block.clone(), meta.uncompressed_size as usize)?;
305            self.blocks.push(block);
306        }
307        self.object_uploader
308            .write_bytes(block)
309            .await
310            .map_err(Into::into)
311    }
312
313    async fn finish(mut self, meta: SstableMeta) -> HummockResult<UploadJoinHandle> {
314        let metadata = Bytes::from(meta.encode_to_bytes());
315
316        self.object_uploader.write_bytes(metadata).await?;
317        let join_handle = tokio::spawn(async move {
318            let uploader_memory_usage = self.object_uploader.get_memory_usage();
319            let _tracker = self.tracker.map(|mut t| {
320                    if !t.try_increase_memory(uploader_memory_usage) {
321                        tracing::debug!("failed to allocate increase memory for data file, sst object id: {}, file size: {}",
322                                        self.object_id, uploader_memory_usage);
323                    }
324                    t
325                });
326
327            assert!(!meta.block_metas.is_empty());
328
329            // Upload data to object store.
330            self.object_uploader.finish().await?;
331            // Add meta cache.
332            self.sstable_store.insert_meta_cache(self.object_id, meta);
333
334            // Add block cache.
335            if let CachePolicy::Fill(fill_high_priority_cache) = self.policy
336                && !self.blocks.is_empty()
337            {
338                for (block_idx, block) in self.blocks.into_iter().enumerate() {
339                    self.sstable_store.block_cache().insert_with_hint(
340                        SstableBlockIndex {
341                            sst_id: self.object_id,
342                            block_idx: block_idx as _,
343                        },
344                        Box::new(block),
345                        fill_high_priority_cache,
346                    );
347                }
348            }
349            Ok(())
350        });
351        Ok(join_handle)
352    }
353
354    fn data_len(&self) -> usize {
355        self.data_len
356    }
357}
358
359pub struct StreamingSstableWriterFactory {
360    sstable_store: SstableStoreRef,
361}
362
363impl StreamingSstableWriterFactory {
364    pub fn new(sstable_store: SstableStoreRef) -> Self {
365        StreamingSstableWriterFactory { sstable_store }
366    }
367}
368pub struct UnifiedSstableWriterFactory {
369    sstable_store: SstableStoreRef,
370}
371
372impl UnifiedSstableWriterFactory {
373    pub fn new(sstable_store: SstableStoreRef) -> Self {
374        UnifiedSstableWriterFactory { sstable_store }
375    }
376}
377
378#[async_trait::async_trait]
379impl SstableWriterFactory for UnifiedSstableWriterFactory {
380    type Writer = UnifiedSstableWriter;
381
382    async fn create_sst_writer(
383        &mut self,
384        object_id: HummockSstableObjectId,
385        options: SstableWriterOptions,
386    ) -> HummockResult<Self::Writer> {
387        if self.sstable_store.store().support_streaming_upload() {
388            let path = self.sstable_store.get_sst_data_path(object_id);
389            let uploader = self.sstable_store.create_streaming_uploader(&path).await?;
390            let streaming_uploader_writer = StreamingUploadWriter::new(
391                object_id,
392                self.sstable_store.clone(),
393                uploader,
394                options,
395            );
396
397            Ok(UnifiedSstableWriter::StreamingSstableWriter(
398                streaming_uploader_writer,
399            ))
400        } else {
401            let batch_uploader_writer =
402                BatchUploadWriter::new(object_id, self.sstable_store.clone(), options);
403            Ok(UnifiedSstableWriter::BatchSstableWriter(
404                batch_uploader_writer,
405            ))
406        }
407    }
408}
409
410#[async_trait::async_trait]
411impl SstableWriterFactory for StreamingSstableWriterFactory {
412    type Writer = StreamingUploadWriter;
413
414    async fn create_sst_writer(
415        &mut self,
416        object_id: HummockSstableObjectId,
417        options: SstableWriterOptions,
418    ) -> HummockResult<Self::Writer> {
419        let path = self.sstable_store.get_sst_data_path(object_id);
420        let uploader = self.sstable_store.create_streaming_uploader(&path).await?;
421        Ok(StreamingUploadWriter::new(
422            object_id,
423            self.sstable_store.clone(),
424            uploader,
425            options,
426        ))
427    }
428}
429
430#[async_trait::async_trait]
431impl SstableWriter for UnifiedSstableWriter {
432    type Output = JoinHandle<HummockResult<()>>;
433
434    async fn write_block(&mut self, block_data: &[u8], meta: &BlockMeta) -> HummockResult<()> {
435        match self {
436            UnifiedSstableWriter::StreamingSstableWriter(stream) => {
437                stream.write_block(block_data, meta).await
438            }
439            UnifiedSstableWriter::BatchSstableWriter(batch) => {
440                batch.write_block(block_data, meta).await
441            }
442        }
443    }
444
445    async fn write_block_bytes(&mut self, block: Bytes, meta: &BlockMeta) -> HummockResult<()> {
446        match self {
447            UnifiedSstableWriter::StreamingSstableWriter(stream) => {
448                stream.write_block_bytes(block, meta).await
449            }
450            UnifiedSstableWriter::BatchSstableWriter(batch) => {
451                batch.write_block_bytes(block, meta).await
452            }
453        }
454    }
455
456    async fn finish(self, meta: SstableMeta) -> HummockResult<UploadJoinHandle> {
457        match self {
458            UnifiedSstableWriter::StreamingSstableWriter(stream) => stream.finish(meta).await,
459            UnifiedSstableWriter::BatchSstableWriter(batch) => batch.finish(meta).await,
460        }
461    }
462
463    fn data_len(&self) -> usize {
464        match self {
465            UnifiedSstableWriter::StreamingSstableWriter(stream) => stream.data_len(),
466            UnifiedSstableWriter::BatchSstableWriter(batch) => batch.data_len(),
467        }
468    }
469}
470
471#[cfg(test)]
472mod tests {
473
474    use bytes::Bytes;
475    use rand::{Rng, SeedableRng};
476    use risingwave_common::util::iter_util::ZipEqFast;
477
478    use crate::hummock::sstable::VERSION;
479    use crate::hummock::{BlockMeta, InMemWriter, SstableMeta, SstableWriter};
480
481    fn get_sst() -> (Bytes, Vec<Bytes>, SstableMeta) {
482        let mut rng = rand::rngs::StdRng::seed_from_u64(0);
483        let mut buffer: Vec<u8> = vec![0; 5000];
484        rng.fill(&mut buffer[..]);
485        buffer.extend((5_u32).to_le_bytes());
486        let data = Bytes::from(buffer);
487
488        let mut blocks = Vec::with_capacity(5);
489        let mut block_metas = Vec::with_capacity(5);
490        for i in 0..5 {
491            block_metas.push(BlockMeta {
492                smallest_key: Vec::new(),
493                len: 1000,
494                offset: i * 1000,
495                ..Default::default()
496            });
497            blocks.push(data.slice((i * 1000) as usize..((i + 1) * 1000) as usize));
498        }
499        #[expect(deprecated)]
500        let meta = SstableMeta {
501            block_metas,
502            bloom_filter: vec![],
503            estimated_size: 0,
504            key_count: 0,
505            smallest_key: Vec::new(),
506            largest_key: Vec::new(),
507            meta_offset: data.len() as u64,
508            monotonic_tombstone_events: vec![],
509            version: VERSION,
510        };
511
512        (data, blocks, meta)
513    }
514
515    #[tokio::test]
516    async fn test_in_mem_writer() {
517        let (data, blocks, meta) = get_sst();
518        let mut writer = Box::new(InMemWriter::new(0));
519        for (block, meta) in blocks.iter().zip_eq_fast(meta.block_metas.iter()) {
520            writer.write_block(&block[..], meta).await.unwrap();
521        }
522
523        let meta_offset = meta.meta_offset as usize;
524        let (output_data, _) = writer.finish(meta).await.unwrap();
525        assert_eq!(output_data.slice(0..meta_offset), data);
526    }
527}