risingwave_storage/hummock/sstable/
builder.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashMap};
16use std::sync::Arc;
17use std::time::SystemTime;
18
19use bytes::{Bytes, BytesMut};
20use risingwave_common::catalog::TableId;
21use risingwave_common::util::row_serde::OrderedRowSerde;
22use risingwave_hummock_sdk::key::{FullKey, MAX_KEY_LEN, user_key};
23use risingwave_hummock_sdk::key_range::KeyRange;
24use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
25use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap};
26use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, LocalSstableInfo};
27use risingwave_pb::hummock::BloomFilterType;
28
29use super::utils::CompressionAlgorithm;
30use super::{
31    BlockBuilder, BlockBuilderOptions, BlockMeta, DEFAULT_BLOCK_SIZE, DEFAULT_ENTRY_SIZE,
32    DEFAULT_RESTART_INTERVAL, SstableMeta, SstableWriter, VERSION,
33};
34use crate::compaction_catalog_manager::{
35    CompactionCatalogAgent, CompactionCatalogAgentRef, FilterKeyExtractorImpl,
36    FullKeyFilterKeyExtractor,
37};
38use crate::hummock::sstable::{FilterBuilder, utils};
39use crate::hummock::value::HummockValue;
40use crate::hummock::{
41    Block, BlockHolder, BlockIterator, HummockResult, MemoryLimiter, Xor16FilterBuilder,
42};
43use crate::monitor::CompactorMetrics;
44use crate::opts::StorageOpts;
45
46pub const DEFAULT_SSTABLE_SIZE: usize = 4 * 1024 * 1024;
47pub const DEFAULT_BLOOM_FALSE_POSITIVE: f64 = 0.001;
48pub const DEFAULT_MAX_SST_SIZE: u64 = 512 * 1024 * 1024;
49pub const MIN_BLOCK_SIZE: usize = 8 * 1024;
50
51#[derive(Clone, Debug)]
52pub struct SstableBuilderOptions {
53    /// Approximate sstable capacity.
54    pub capacity: usize,
55    /// Approximate block capacity.
56    pub block_capacity: usize,
57    /// Restart point interval.
58    pub restart_interval: usize,
59    /// False positive probability of bloom filter.
60    pub bloom_false_positive: f64,
61    /// Compression algorithm.
62    pub compression_algorithm: CompressionAlgorithm,
63    pub max_sst_size: u64,
64}
65
66impl From<&StorageOpts> for SstableBuilderOptions {
67    fn from(options: &StorageOpts) -> SstableBuilderOptions {
68        let capacity: usize = (options.sstable_size_mb as usize) * (1 << 20);
69        SstableBuilderOptions {
70            capacity,
71            block_capacity: (options.block_size_kb as usize) * (1 << 10),
72            restart_interval: DEFAULT_RESTART_INTERVAL,
73            bloom_false_positive: options.bloom_false_positive,
74            compression_algorithm: CompressionAlgorithm::None,
75            max_sst_size: options.compactor_max_sst_size,
76        }
77    }
78}
79
80impl Default for SstableBuilderOptions {
81    fn default() -> Self {
82        Self {
83            capacity: DEFAULT_SSTABLE_SIZE,
84            block_capacity: DEFAULT_BLOCK_SIZE,
85            restart_interval: DEFAULT_RESTART_INTERVAL,
86            bloom_false_positive: DEFAULT_BLOOM_FALSE_POSITIVE,
87            compression_algorithm: CompressionAlgorithm::None,
88            max_sst_size: DEFAULT_MAX_SST_SIZE,
89        }
90    }
91}
92
93pub struct SstableBuilderOutput<WO> {
94    pub sst_info: LocalSstableInfo,
95    pub writer_output: WO,
96    pub stats: SstableBuilderOutputStats,
97}
98
99pub struct SstableBuilder<W: SstableWriter, F: FilterBuilder> {
100    /// Options.
101    options: SstableBuilderOptions,
102    /// Data writer.
103    writer: W,
104    /// Current block builder.
105    block_builder: BlockBuilder,
106
107    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
108    /// Block metadata vec.
109    block_metas: Vec<BlockMeta>,
110
111    /// `table_id` of added keys.
112    table_ids: BTreeSet<TableId>,
113    last_full_key: Vec<u8>,
114    /// Buffer for encoded key and value to avoid allocation.
115    raw_key: BytesMut,
116    raw_value: BytesMut,
117    last_table_id: Option<TableId>,
118    sst_object_id: HummockSstableObjectId,
119
120    /// Per table stats.
121    table_stats: TableStatsMap,
122    /// `last_table_stats` accumulates stats for `last_table_id` and finalizes it in `table_stats`
123    /// by `finalize_last_table_stats`
124    last_table_stats: TableStats,
125
126    filter_builder: F,
127
128    epoch_set: BTreeSet<u64>,
129    memory_limiter: Option<Arc<MemoryLimiter>>,
130
131    block_size_vec: Vec<usize>, // for statistics
132}
133
134impl<W: SstableWriter> SstableBuilder<W, Xor16FilterBuilder> {
135    pub fn for_test(
136        sstable_id: u64,
137        writer: W,
138        options: SstableBuilderOptions,
139        table_id_to_vnode: HashMap<impl Into<TableId>, usize>,
140        table_id_to_watermark_serde: HashMap<
141            impl Into<TableId>,
142            Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
143        >,
144    ) -> Self {
145        let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
146            FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
147            table_id_to_vnode
148                .into_iter()
149                .map(|(table_id, v)| (table_id.into(), v))
150                .collect(),
151            table_id_to_watermark_serde
152                .into_iter()
153                .map(|(table_id, v)| (table_id.into(), v))
154                .collect(),
155        ));
156
157        Self::new(
158            sstable_id,
159            writer,
160            Xor16FilterBuilder::new(options.capacity / DEFAULT_ENTRY_SIZE + 1),
161            options,
162            compaction_catalog_agent_ref,
163            None,
164        )
165    }
166}
167
168impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
169    pub fn new(
170        sst_object_id: impl Into<HummockSstableObjectId>,
171        writer: W,
172        filter_builder: F,
173        options: SstableBuilderOptions,
174        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
175        memory_limiter: Option<Arc<MemoryLimiter>>,
176    ) -> Self {
177        let sst_object_id = sst_object_id.into();
178        Self {
179            options: options.clone(),
180            writer,
181            block_builder: BlockBuilder::new(BlockBuilderOptions {
182                capacity: options.block_capacity,
183                restart_interval: options.restart_interval,
184                compression_algorithm: options.compression_algorithm,
185            }),
186            filter_builder,
187            block_metas: Vec::with_capacity(options.capacity / options.block_capacity + 1),
188            table_ids: BTreeSet::new(),
189            last_table_id: None,
190            raw_key: BytesMut::new(),
191            raw_value: BytesMut::new(),
192            last_full_key: vec![],
193            sst_object_id,
194            compaction_catalog_agent_ref,
195            table_stats: Default::default(),
196            last_table_stats: Default::default(),
197            epoch_set: BTreeSet::default(),
198            memory_limiter,
199            block_size_vec: Vec::new(),
200        }
201    }
202
203    /// Add kv pair to sstable.
204    pub async fn add_for_test(
205        &mut self,
206        full_key: FullKey<&[u8]>,
207        value: HummockValue<&[u8]>,
208    ) -> HummockResult<()> {
209        self.add(full_key, value).await
210    }
211
212    pub fn current_block_size(&self) -> usize {
213        self.block_builder.approximate_len()
214    }
215
216    /// Add raw data of block to sstable. return false means fallback
217    pub async fn add_raw_block(
218        &mut self,
219        buf: Bytes,
220        filter_data: Vec<u8>,
221        smallest_key: FullKey<Vec<u8>>,
222        largest_key: Vec<u8>,
223        mut meta: BlockMeta,
224    ) -> HummockResult<bool> {
225        let table_id = smallest_key.user_key.table_id;
226        if self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id {
227            if !self.block_builder.is_empty() {
228                // Try to finish the previous `Block`` when the `table_id` is switched, making sure that the data in the `Block` doesn't span two `table_ids`.
229                self.build_block().await?;
230            }
231
232            self.table_ids.insert(table_id);
233            self.finalize_last_table_stats();
234            self.last_table_id = Some(table_id);
235        }
236
237        if !self.block_builder.is_empty() {
238            let min_block_size = std::cmp::min(MIN_BLOCK_SIZE, self.options.block_capacity / 4);
239
240            // If the previous block is too small, we should merge it into the previous block.
241            if self.block_builder.approximate_len() < min_block_size {
242                let block = Block::decode(buf, meta.uncompressed_size as usize)?;
243                let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
244                iter.seek_to_first();
245                while iter.is_valid() {
246                    let value = HummockValue::from_slice(iter.value()).unwrap_or_else(|_| {
247                        panic!(
248                            "decode failed for fast compact sst_id {} block_idx {} last_table_id {:?}",
249                            self.sst_object_id, self.block_metas.len(), self.last_table_id
250                        )
251                    });
252                    self.add_impl(iter.key(), value, false).await?;
253                    iter.next();
254                }
255                return Ok(false);
256            }
257
258            self.build_block().await?;
259        }
260        self.last_full_key = largest_key;
261        assert_eq!(
262            meta.len as usize,
263            buf.len(),
264            "meta {} buf {} last_table_id {:?}",
265            meta.len,
266            buf.len(),
267            self.last_table_id
268        );
269        meta.offset = self.writer.data_len() as u32;
270        self.block_metas.push(meta);
271        self.filter_builder.add_raw_data(filter_data);
272        let block_meta = self.block_metas.last_mut().unwrap();
273        self.writer.write_block_bytes(buf, block_meta).await?;
274
275        Ok(true)
276    }
277
278    /// Add kv pair to sstable.
279    pub async fn add(
280        &mut self,
281        full_key: FullKey<&[u8]>,
282        value: HummockValue<&[u8]>,
283    ) -> HummockResult<()> {
284        self.add_impl(full_key, value, true).await
285    }
286
287    /// Add kv pair to sstable.
288    async fn add_impl(
289        &mut self,
290        full_key: FullKey<&[u8]>,
291        value: HummockValue<&[u8]>,
292        could_switch_block: bool,
293    ) -> HummockResult<()> {
294        const LARGE_KEY_LEN: usize = MAX_KEY_LEN >> 1;
295
296        let table_key_len = full_key.user_key.table_key.as_ref().len();
297        let table_value_len = match &value {
298            HummockValue::Put(t) => t.len(),
299            HummockValue::Delete => 0,
300        };
301        let large_value_len = self.options.max_sst_size as usize / 10;
302        let large_key_value_len = self.options.max_sst_size as usize / 2;
303        if table_key_len >= LARGE_KEY_LEN
304            || table_value_len > large_value_len
305            || table_key_len + table_value_len > large_key_value_len
306        {
307            let table_id = full_key.user_key.table_id;
308            tracing::warn!(
309                "A large key/value (table_id={}, key len={}, value len={}, epoch={}, spill offset={}) is added to block",
310                table_id,
311                table_key_len,
312                table_value_len,
313                full_key.epoch_with_gap.pure_epoch(),
314                full_key.epoch_with_gap.offset(),
315            );
316        }
317
318        // TODO: refine me
319        full_key.encode_into(&mut self.raw_key);
320        value.encode(&mut self.raw_value);
321        let is_new_user_key = self.last_full_key.is_empty()
322            || !user_key(&self.raw_key).eq(user_key(&self.last_full_key));
323        let table_id = full_key.user_key.table_id;
324        let is_new_table = self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id;
325        let current_block_size = self.current_block_size();
326        let is_block_full = current_block_size >= self.options.block_capacity
327            || (current_block_size > self.options.block_capacity / 4 * 3
328                && current_block_size + self.raw_value.len() + self.raw_key.len()
329                    > self.options.block_capacity);
330
331        if is_new_table {
332            assert!(
333                could_switch_block,
334                "is_new_user_key {} sst_id {} block_idx {} table_id {} last_table_id {:?} full_key {:?}",
335                is_new_user_key,
336                self.sst_object_id,
337                self.block_metas.len(),
338                table_id,
339                self.last_table_id,
340                full_key
341            );
342            self.table_ids.insert(table_id);
343            self.finalize_last_table_stats();
344            self.last_table_id = Some(table_id);
345            if !self.block_builder.is_empty() {
346                self.build_block().await?;
347            }
348        } else if is_block_full && could_switch_block {
349            self.build_block().await?;
350        }
351        self.last_table_stats.total_key_count += 1;
352        self.epoch_set.insert(full_key.epoch_with_gap.pure_epoch());
353
354        // Rotate block builder if the previous one has been built.
355        if self.block_builder.is_empty() {
356            self.block_metas.push(BlockMeta {
357                offset: utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
358                    panic!(
359                        "WARN overflow can't convert writer_data_len {} into u32 sst_id {} block_idx {} tables {:?}",
360                        self.writer.data_len(),
361                        self.sst_object_id,
362                        self.block_metas.len(),
363                        self.table_ids,
364                    )
365                }),
366                len: 0,
367                smallest_key: full_key.encode(),
368                uncompressed_size: 0,
369                total_key_count: 0,
370                stale_key_count: 0,
371            });
372        }
373
374        let table_id = full_key.user_key.table_id;
375        let mut extract_key = user_key(&self.raw_key);
376        extract_key = self.compaction_catalog_agent_ref.extract(extract_key);
377        // add bloom_filter check
378        if !extract_key.is_empty() {
379            self.filter_builder
380                .add_key(extract_key, table_id.as_raw_id());
381        }
382        self.block_builder.add(full_key, self.raw_value.as_ref());
383        self.block_metas.last_mut().unwrap().total_key_count += 1;
384        if !is_new_user_key || value.is_delete() {
385            self.block_metas.last_mut().unwrap().stale_key_count += 1;
386        }
387        self.last_table_stats.total_key_size += full_key.encoded_len() as i64;
388        self.last_table_stats.total_value_size += value.encoded_len() as i64;
389
390        self.last_full_key.clear();
391        self.last_full_key.extend_from_slice(&self.raw_key);
392
393        self.raw_key.clear();
394        self.raw_value.clear();
395        Ok(())
396    }
397
398    /// Finish building sst.
399    ///
400    /// # Format
401    ///
402    /// data:
403    ///
404    /// ```plain
405    /// | Block 0 | ... | Block N-1 | N (4B) |
406    /// ```
407    pub async fn finish(mut self) -> HummockResult<SstableBuilderOutput<W::Output>> {
408        let smallest_key = if self.block_metas.is_empty() {
409            vec![]
410        } else {
411            self.block_metas[0].smallest_key.clone()
412        };
413        let largest_key = self.last_full_key.clone();
414        self.finalize_last_table_stats();
415
416        self.build_block().await?;
417        let right_exclusive = false;
418        let meta_offset = self.writer.data_len() as u64;
419
420        let bloom_filter_kind = if self.filter_builder.support_blocked_raw_data() {
421            BloomFilterType::Blocked
422        } else {
423            BloomFilterType::Sstable
424        };
425        let bloom_filter = if self.options.bloom_false_positive > 0.0 {
426            self.filter_builder.finish(self.memory_limiter.clone())
427        } else {
428            vec![]
429        };
430
431        let total_key_count = self
432            .block_metas
433            .iter()
434            .map(|block_meta| block_meta.total_key_count as u64)
435            .sum::<u64>();
436        let stale_key_count = self
437            .block_metas
438            .iter()
439            .map(|block_meta| block_meta.stale_key_count as u64)
440            .sum::<u64>();
441        let uncompressed_file_size = self
442            .block_metas
443            .iter()
444            .map(|block_meta| block_meta.uncompressed_size as u64)
445            .sum::<u64>();
446
447        #[expect(deprecated)]
448        let mut meta = SstableMeta {
449            block_metas: self.block_metas,
450            bloom_filter,
451            estimated_size: 0,
452            key_count: utils::checked_into_u32(total_key_count).unwrap_or_else(|_| {
453                panic!(
454                    "WARN overflow can't convert total_key_count {} into u32 tables {:?}",
455                    total_key_count, self.table_ids,
456                )
457            }),
458            smallest_key,
459            largest_key,
460            version: VERSION,
461            meta_offset,
462            monotonic_tombstone_events: vec![],
463        };
464
465        let meta_encode_size = meta.encoded_size();
466        let encoded_size_u32 = utils::checked_into_u32(meta_encode_size).unwrap_or_else(|_| {
467            panic!(
468                "WARN overflow can't convert meta_encoded_size {} into u32 tables {:?}",
469                meta_encode_size, self.table_ids,
470            )
471        });
472        let meta_offset_u32 = utils::checked_into_u32(meta_offset).unwrap_or_else(|_| {
473            panic!(
474                "WARN overflow can't convert meta_offset {} into u32 tables {:?}",
475                meta_offset, self.table_ids,
476            )
477        });
478        meta.estimated_size = encoded_size_u32
479            .checked_add(meta_offset_u32)
480            .unwrap_or_else(|| {
481                panic!(
482                    "WARN overflow encoded_size_u32 {} meta_offset_u32 {} table_id {:?} table_ids {:?}",
483                    encoded_size_u32, meta_offset_u32, self.last_table_id, self.table_ids
484                )
485            });
486
487        let (avg_key_size, avg_value_size) = if self.table_stats.is_empty() {
488            (0, 0)
489        } else {
490            let total_key_count: usize = self
491                .table_stats
492                .values()
493                .map(|s| s.total_key_count as usize)
494                .sum();
495
496            if total_key_count == 0 {
497                (0, 0)
498            } else {
499                let total_key_size: usize = self
500                    .table_stats
501                    .values()
502                    .map(|s| s.total_key_size as usize)
503                    .sum();
504
505                let total_value_size: usize = self
506                    .table_stats
507                    .values()
508                    .map(|s| s.total_value_size as usize)
509                    .sum();
510
511                (
512                    total_key_size / total_key_count,
513                    total_value_size / total_key_count,
514                )
515            }
516        };
517
518        let (min_epoch, max_epoch) = {
519            if self.epoch_set.is_empty() {
520                (HummockEpoch::MAX, u64::MIN)
521            } else {
522                (
523                    *self.epoch_set.first().unwrap(),
524                    *self.epoch_set.last().unwrap(),
525                )
526            }
527        };
528
529        let sst_info: SstableInfo = SstableInfoInner {
530            object_id: self.sst_object_id,
531            // use the same sst_id as object_id for initial sst
532            sst_id: self.sst_object_id.inner().into(),
533            bloom_filter_kind,
534            key_range: KeyRange {
535                left: Bytes::from(meta.smallest_key.clone()),
536                right: Bytes::from(meta.largest_key.clone()),
537                right_exclusive,
538            },
539            file_size: meta.estimated_size as u64,
540            table_ids: self.table_ids.into_iter().collect(),
541            meta_offset: meta.meta_offset,
542            stale_key_count,
543            total_key_count,
544            uncompressed_file_size: uncompressed_file_size + meta.encoded_size() as u64,
545            min_epoch,
546            max_epoch,
547            range_tombstone_count: 0,
548            sst_size: meta.estimated_size as u64,
549        }
550        .into();
551        tracing::trace!(
552            "meta_size {} bloom_filter_size {}  add_key_counts {} stale_key_count {} min_epoch {} max_epoch {} epoch_count {}",
553            meta.encoded_size(),
554            meta.bloom_filter.len(),
555            total_key_count,
556            stale_key_count,
557            min_epoch,
558            max_epoch,
559            self.epoch_set.len()
560        );
561        let bloom_filter_size = meta.bloom_filter.len();
562        let sstable_file_size = sst_info.file_size as usize;
563
564        if !meta.block_metas.is_empty() {
565            // fill total_compressed_size
566            let mut last_table_id = meta.block_metas[0].table_id();
567            let mut last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
568            for block_meta in &meta.block_metas {
569                let block_table_id = block_meta.table_id();
570                if last_table_id != block_table_id {
571                    last_table_id = block_table_id;
572                    last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
573                }
574
575                last_table_stats.total_compressed_size += block_meta.len as u64;
576            }
577        }
578
579        let writer_output = self.writer.finish(meta).await?;
580        // The timestamp is only used during full GC.
581        //
582        // Ideally object store object's last_modified should be used.
583        // However, it'll incur additional IO overhead since S3 lacks an interface to retrieve the last_modified timestamp after the PUT operation on an object.
584        //
585        // The local timestamp below is expected to precede the last_modified of object store object, given that the object store object is created afterward.
586        // It should help alleviate the clock drift issue.
587
588        let now = SystemTime::now()
589            .duration_since(SystemTime::UNIX_EPOCH)
590            .expect("Clock may have gone backwards")
591            .as_secs();
592        Ok(SstableBuilderOutput::<W::Output> {
593            sst_info: LocalSstableInfo::new(sst_info, self.table_stats, now),
594            writer_output,
595            stats: SstableBuilderOutputStats {
596                bloom_filter_size,
597                avg_key_size,
598                avg_value_size,
599                epoch_count: self.epoch_set.len(),
600                block_size_vec: self.block_size_vec,
601                sstable_file_size,
602            },
603        })
604    }
605
606    pub fn approximate_len(&self) -> usize {
607        self.writer.data_len()
608            + self.block_builder.approximate_len()
609            + self.filter_builder.approximate_len()
610    }
611
612    pub async fn build_block(&mut self) -> HummockResult<()> {
613        // Skip empty block.
614        if self.block_builder.is_empty() {
615            return Ok(());
616        }
617
618        let block_meta = self.block_metas.last_mut().unwrap();
619        let uncompressed_block_size = self.block_builder.uncompressed_block_size();
620        block_meta.uncompressed_size = utils::checked_into_u32(uncompressed_block_size)
621            .unwrap_or_else(|_| {
622                panic!(
623                    "WARN overflow can't convert uncompressed_block_size {} into u32 table {:?}",
624                    uncompressed_block_size,
625                    self.block_builder.table_id(),
626                )
627            });
628        let block = self.block_builder.build();
629        self.writer.write_block(block, block_meta).await?;
630        self.block_size_vec.push(block.len());
631        self.filter_builder
632            .switch_block(self.memory_limiter.clone());
633        let data_len = utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
634            panic!(
635                "WARN overflow can't convert writer_data_len {} into u32 table {:?}",
636                self.writer.data_len(),
637                self.block_builder.table_id(),
638            )
639        });
640        block_meta.len = data_len.checked_sub(block_meta.offset).unwrap_or_else(|| {
641            panic!(
642                "data_len should >= meta_offset, found data_len={}, meta_offset={}",
643                data_len, block_meta.offset
644            )
645        });
646
647        if data_len as usize > self.options.capacity * 2 {
648            tracing::warn!(
649                "WARN unexpected block size {} table {:?}",
650                data_len,
651                self.block_builder.table_id()
652            );
653        }
654
655        self.block_builder.clear();
656        Ok(())
657    }
658
659    pub fn is_empty(&self) -> bool {
660        self.writer.data_len() > 0
661    }
662
663    /// Returns true if we roughly reached capacity
664    pub fn reach_capacity(&self) -> bool {
665        self.approximate_len() >= self.options.capacity
666    }
667
668    fn finalize_last_table_stats(&mut self) {
669        if self.table_ids.is_empty() || self.last_table_id.is_none() {
670            return;
671        }
672        self.table_stats.insert(
673            self.last_table_id.unwrap(),
674            std::mem::take(&mut self.last_table_stats),
675        );
676    }
677}
678
679pub struct SstableBuilderOutputStats {
680    bloom_filter_size: usize,
681    avg_key_size: usize,
682    avg_value_size: usize,
683    epoch_count: usize,
684    block_size_vec: Vec<usize>, // for statistics
685    sstable_file_size: usize,
686}
687
688impl SstableBuilderOutputStats {
689    pub fn report_stats(&self, metrics: &Arc<CompactorMetrics>) {
690        if self.bloom_filter_size != 0 {
691            metrics
692                .sstable_bloom_filter_size
693                .observe(self.bloom_filter_size as _);
694        }
695
696        if self.sstable_file_size != 0 {
697            metrics
698                .sstable_file_size
699                .observe(self.sstable_file_size as _);
700        }
701
702        if self.avg_key_size != 0 {
703            metrics.sstable_avg_key_size.observe(self.avg_key_size as _);
704        }
705
706        if self.avg_value_size != 0 {
707            metrics
708                .sstable_avg_value_size
709                .observe(self.avg_value_size as _);
710        }
711
712        if self.epoch_count != 0 {
713            metrics
714                .sstable_distinct_epoch_count
715                .observe(self.epoch_count as _);
716        }
717
718        if !self.block_size_vec.is_empty() {
719            for block_size in &self.block_size_vec {
720                metrics.sstable_block_size.observe(*block_size as _);
721            }
722        }
723    }
724}
725
726#[cfg(test)]
727pub(super) mod tests {
728    use std::collections::{Bound, HashMap};
729
730    use risingwave_common::catalog::TableId;
731    use risingwave_common::hash::VirtualNode;
732    use risingwave_common::util::epoch::test_epoch;
733    use risingwave_hummock_sdk::key::UserKey;
734
735    use super::*;
736    use crate::assert_bytes_eq;
737    use crate::compaction_catalog_manager::{
738        CompactionCatalogAgent, DummyFilterKeyExtractor, MultiFilterKeyExtractor,
739    };
740    use crate::hummock::iterator::test_utils::mock_sstable_store;
741    use crate::hummock::sstable::xor_filter::BlockedXor16FilterBuilder;
742    use crate::hummock::test_utils::{
743        TEST_KEYS_COUNT, default_builder_opt_for_test, gen_test_sstable_impl, mock_sst_writer,
744        test_key_of, test_value_of,
745    };
746    use crate::hummock::{CachePolicy, Sstable, SstableWriterOptions, Xor8FilterBuilder};
747    use crate::monitor::StoreLocalStatistic;
748
749    #[tokio::test]
750    async fn test_empty() {
751        let opt = SstableBuilderOptions {
752            capacity: 0,
753            block_capacity: 4096,
754            restart_interval: 16,
755            bloom_false_positive: 0.001,
756            ..Default::default()
757        };
758
759        let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
760        let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
761        let b = SstableBuilder::for_test(
762            0,
763            mock_sst_writer(&opt),
764            opt,
765            table_id_to_vnode,
766            table_id_to_watermark_serde,
767        );
768
769        b.finish().await.unwrap();
770    }
771
772    #[tokio::test]
773    async fn test_basic() {
774        let opt = default_builder_opt_for_test();
775
776        let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
777        let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
778        let mut b = SstableBuilder::for_test(
779            0,
780            mock_sst_writer(&opt),
781            opt,
782            table_id_to_vnode,
783            table_id_to_watermark_serde,
784        );
785
786        for i in 0..TEST_KEYS_COUNT {
787            b.add_for_test(
788                test_key_of(i).to_ref(),
789                HummockValue::put(&test_value_of(i)),
790            )
791            .await
792            .unwrap();
793        }
794
795        let output = b.finish().await.unwrap();
796        let info = output.sst_info.sst_info;
797
798        assert_bytes_eq!(test_key_of(0).encode(), info.key_range.left);
799        assert_bytes_eq!(
800            test_key_of(TEST_KEYS_COUNT - 1).encode(),
801            info.key_range.right
802        );
803        let (data, meta) = output.writer_output;
804        assert_eq!(info.file_size, meta.estimated_size as u64);
805        let offset = info.meta_offset as usize;
806        let meta2 = SstableMeta::decode(&data[offset..]).unwrap();
807        assert_eq!(meta2, meta);
808    }
809
810    async fn test_with_bloom_filter<F: FilterBuilder>(with_blooms: bool) {
811        let key_count = 1000;
812
813        let opts = SstableBuilderOptions {
814            capacity: 0,
815            block_capacity: 4096,
816            restart_interval: 16,
817            bloom_false_positive: if with_blooms { 0.01 } else { 0.0 },
818            ..Default::default()
819        };
820
821        // build remote table
822        let sstable_store = mock_sstable_store().await;
823        let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
824        let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
825        let sst_info = gen_test_sstable_impl::<Vec<u8>, F>(
826            opts,
827            0,
828            (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
829            sstable_store.clone(),
830            CachePolicy::NotFill,
831            table_id_to_vnode,
832            table_id_to_watermark_serde,
833        )
834        .await;
835        let table = sstable_store
836            .sstable(&sst_info, &mut StoreLocalStatistic::default())
837            .await
838            .unwrap();
839
840        assert_eq!(table.has_bloom_filter(), with_blooms);
841        for i in 0..key_count {
842            let full_key = test_key_of(i);
843            if table.has_bloom_filter() {
844                let hash = Sstable::hash_for_bloom_filter(full_key.user_key.encode().as_slice(), 0);
845                let key_ref = full_key.user_key.as_ref();
846                assert!(
847                    table.may_match_hash(
848                        &(Bound::Included(key_ref), Bound::Included(key_ref)),
849                        hash
850                    ),
851                    "failed at {}",
852                    i
853                );
854            }
855        }
856    }
857
858    #[tokio::test]
859    async fn test_bloom_filter() {
860        test_with_bloom_filter::<Xor16FilterBuilder>(false).await;
861        test_with_bloom_filter::<Xor16FilterBuilder>(true).await;
862        test_with_bloom_filter::<Xor8FilterBuilder>(true).await;
863        test_with_bloom_filter::<BlockedXor16FilterBuilder>(true).await;
864    }
865
866    #[tokio::test]
867    async fn test_no_bloom_filter_block() {
868        let opts = SstableBuilderOptions::default();
869        // build remote table
870        let sstable_store = mock_sstable_store().await;
871        let writer_opts = SstableWriterOptions::default();
872        let object_id = 1;
873        let writer = sstable_store
874            .clone()
875            .create_sst_writer(object_id, writer_opts);
876        let mut filter = MultiFilterKeyExtractor::default();
877        filter.register(
878            1.into(),
879            FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
880        );
881        filter.register(
882            2.into(),
883            FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
884        );
885        filter.register(
886            3.into(),
887            FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
888        );
889
890        let table_id_to_vnode = HashMap::from_iter(vec![
891            (1.into(), VirtualNode::COUNT_FOR_TEST),
892            (2.into(), VirtualNode::COUNT_FOR_TEST),
893            (3.into(), VirtualNode::COUNT_FOR_TEST),
894        ]);
895        let table_id_to_watermark_serde =
896            HashMap::from_iter(vec![(1.into(), None), (2.into(), None), (3.into(), None)]);
897
898        let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
899            FilterKeyExtractorImpl::Multi(filter),
900            table_id_to_vnode,
901            table_id_to_watermark_serde,
902        ));
903
904        let mut builder = SstableBuilder::new(
905            object_id,
906            writer,
907            BlockedXor16FilterBuilder::new(1024),
908            opts,
909            compaction_catalog_agent_ref,
910            None,
911        );
912
913        let key_count: usize = 10000;
914        for table_id in 1..4 {
915            let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
916            for idx in 0..key_count {
917                table_key.resize(VirtualNode::SIZE, 0);
918                table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
919                let k = UserKey::for_test(TableId::new(table_id), table_key.as_ref());
920                let v = test_value_of(idx);
921                builder
922                    .add(
923                        FullKey::from_user_key(k, test_epoch(1)),
924                        HummockValue::put(v.as_ref()),
925                    )
926                    .await
927                    .unwrap();
928            }
929        }
930        let ret = builder.finish().await.unwrap();
931        let sst_info = ret.sst_info.sst_info.clone();
932        ret.writer_output.await.unwrap().unwrap();
933        let table = sstable_store
934            .sstable(&sst_info, &mut StoreLocalStatistic::default())
935            .await
936            .unwrap();
937        let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
938        for idx in 0..key_count {
939            table_key.resize(VirtualNode::SIZE, 0);
940            table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
941            let k = UserKey::for_test(TableId::new(2), table_key.as_slice());
942            let hash = Sstable::hash_for_bloom_filter(&k.encode(), 2);
943            let key_ref = k.as_ref();
944            assert!(
945                table.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash)
946            );
947        }
948    }
949}