risingwave_storage/hummock/sstable/
builder.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashMap};
16use std::sync::Arc;
17use std::time::SystemTime;
18
19use bytes::{Bytes, BytesMut};
20use risingwave_common::util::row_serde::OrderedRowSerde;
21use risingwave_hummock_sdk::compaction_group::StateTableId;
22use risingwave_hummock_sdk::key::{FullKey, MAX_KEY_LEN, user_key};
23use risingwave_hummock_sdk::key_range::KeyRange;
24use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
25use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap};
26use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo};
27use risingwave_pb::hummock::BloomFilterType;
28
29use super::utils::CompressionAlgorithm;
30use super::{
31    BlockBuilder, BlockBuilderOptions, BlockMeta, DEFAULT_BLOCK_SIZE, DEFAULT_ENTRY_SIZE,
32    DEFAULT_RESTART_INTERVAL, SstableMeta, SstableWriter, VERSION,
33};
34use crate::compaction_catalog_manager::{
35    CompactionCatalogAgent, CompactionCatalogAgentRef, FilterKeyExtractorImpl,
36    FullKeyFilterKeyExtractor,
37};
38use crate::hummock::sstable::{FilterBuilder, utils};
39use crate::hummock::value::HummockValue;
40use crate::hummock::{
41    Block, BlockHolder, BlockIterator, HummockResult, MemoryLimiter, Xor16FilterBuilder,
42};
43use crate::monitor::CompactorMetrics;
44use crate::opts::StorageOpts;
45
46pub const DEFAULT_SSTABLE_SIZE: usize = 4 * 1024 * 1024;
47pub const DEFAULT_BLOOM_FALSE_POSITIVE: f64 = 0.001;
48pub const DEFAULT_MAX_SST_SIZE: u64 = 512 * 1024 * 1024;
49pub const MIN_BLOCK_SIZE: usize = 8 * 1024;
50
51#[derive(Clone, Debug)]
52pub struct SstableBuilderOptions {
53    /// Approximate sstable capacity.
54    pub capacity: usize,
55    /// Approximate block capacity.
56    pub block_capacity: usize,
57    /// Restart point interval.
58    pub restart_interval: usize,
59    /// False positive probability of bloom filter.
60    pub bloom_false_positive: f64,
61    /// Compression algorithm.
62    pub compression_algorithm: CompressionAlgorithm,
63    pub max_sst_size: u64,
64}
65
66impl From<&StorageOpts> for SstableBuilderOptions {
67    fn from(options: &StorageOpts) -> SstableBuilderOptions {
68        let capacity: usize = (options.sstable_size_mb as usize) * (1 << 20);
69        SstableBuilderOptions {
70            capacity,
71            block_capacity: (options.block_size_kb as usize) * (1 << 10),
72            restart_interval: DEFAULT_RESTART_INTERVAL,
73            bloom_false_positive: options.bloom_false_positive,
74            compression_algorithm: CompressionAlgorithm::None,
75            max_sst_size: options.compactor_max_sst_size,
76        }
77    }
78}
79
80impl Default for SstableBuilderOptions {
81    fn default() -> Self {
82        Self {
83            capacity: DEFAULT_SSTABLE_SIZE,
84            block_capacity: DEFAULT_BLOCK_SIZE,
85            restart_interval: DEFAULT_RESTART_INTERVAL,
86            bloom_false_positive: DEFAULT_BLOOM_FALSE_POSITIVE,
87            compression_algorithm: CompressionAlgorithm::None,
88            max_sst_size: DEFAULT_MAX_SST_SIZE,
89        }
90    }
91}
92
93pub struct SstableBuilderOutput<WO> {
94    pub sst_info: LocalSstableInfo,
95    pub writer_output: WO,
96    pub stats: SstableBuilderOutputStats,
97}
98
99pub struct SstableBuilder<W: SstableWriter, F: FilterBuilder> {
100    /// Options.
101    options: SstableBuilderOptions,
102    /// Data writer.
103    writer: W,
104    /// Current block builder.
105    block_builder: BlockBuilder,
106
107    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
108    /// Block metadata vec.
109    block_metas: Vec<BlockMeta>,
110
111    /// `table_id` of added keys.
112    table_ids: BTreeSet<u32>,
113    last_full_key: Vec<u8>,
114    /// Buffer for encoded key and value to avoid allocation.
115    raw_key: BytesMut,
116    raw_value: BytesMut,
117    last_table_id: Option<u32>,
118    sstable_id: u64,
119
120    /// Per table stats.
121    table_stats: TableStatsMap,
122    /// `last_table_stats` accumulates stats for `last_table_id` and finalizes it in `table_stats`
123    /// by `finalize_last_table_stats`
124    last_table_stats: TableStats,
125
126    filter_builder: F,
127
128    epoch_set: BTreeSet<u64>,
129    memory_limiter: Option<Arc<MemoryLimiter>>,
130
131    block_size_vec: Vec<usize>, // for statistics
132}
133
134impl<W: SstableWriter> SstableBuilder<W, Xor16FilterBuilder> {
135    pub fn for_test(
136        sstable_id: u64,
137        writer: W,
138        options: SstableBuilderOptions,
139        table_id_to_vnode: HashMap<StateTableId, usize>,
140        table_id_to_watermark_serde: HashMap<
141            u32,
142            Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
143        >,
144    ) -> Self {
145        let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
146            FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
147            table_id_to_vnode,
148            table_id_to_watermark_serde,
149        ));
150
151        Self::new(
152            sstable_id,
153            writer,
154            Xor16FilterBuilder::new(options.capacity / DEFAULT_ENTRY_SIZE + 1),
155            options,
156            compaction_catalog_agent_ref,
157            None,
158        )
159    }
160}
161
162impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
163    pub fn new(
164        sstable_id: u64,
165        writer: W,
166        filter_builder: F,
167        options: SstableBuilderOptions,
168        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
169        memory_limiter: Option<Arc<MemoryLimiter>>,
170    ) -> Self {
171        Self {
172            options: options.clone(),
173            writer,
174            block_builder: BlockBuilder::new(BlockBuilderOptions {
175                capacity: options.block_capacity,
176                restart_interval: options.restart_interval,
177                compression_algorithm: options.compression_algorithm,
178            }),
179            filter_builder,
180            block_metas: Vec::with_capacity(options.capacity / options.block_capacity + 1),
181            table_ids: BTreeSet::new(),
182            last_table_id: None,
183            raw_key: BytesMut::new(),
184            raw_value: BytesMut::new(),
185            last_full_key: vec![],
186            sstable_id,
187            compaction_catalog_agent_ref,
188            table_stats: Default::default(),
189            last_table_stats: Default::default(),
190            epoch_set: BTreeSet::default(),
191            memory_limiter,
192            block_size_vec: Vec::new(),
193        }
194    }
195
196    /// Add kv pair to sstable.
197    pub async fn add_for_test(
198        &mut self,
199        full_key: FullKey<&[u8]>,
200        value: HummockValue<&[u8]>,
201    ) -> HummockResult<()> {
202        self.add(full_key, value).await
203    }
204
205    pub fn current_block_size(&self) -> usize {
206        self.block_builder.approximate_len()
207    }
208
209    /// Add raw data of block to sstable. return false means fallback
210    pub async fn add_raw_block(
211        &mut self,
212        buf: Bytes,
213        filter_data: Vec<u8>,
214        smallest_key: FullKey<Vec<u8>>,
215        largest_key: Vec<u8>,
216        mut meta: BlockMeta,
217    ) -> HummockResult<bool> {
218        let table_id = smallest_key.user_key.table_id.table_id;
219        if self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id {
220            if !self.block_builder.is_empty() {
221                // Try to finish the previous `Block`` when the `table_id` is switched, making sure that the data in the `Block` doesn't span two `table_ids`.
222                self.build_block().await?;
223            }
224
225            self.table_ids.insert(table_id);
226            self.finalize_last_table_stats();
227            self.last_table_id = Some(table_id);
228        }
229
230        if !self.block_builder.is_empty() {
231            let min_block_size = std::cmp::min(MIN_BLOCK_SIZE, self.options.block_capacity / 4);
232
233            // If the previous block is too small, we should merge it into the previous block.
234            if self.block_builder.approximate_len() < min_block_size {
235                let block = Block::decode(buf, meta.uncompressed_size as usize)?;
236                let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
237                iter.seek_to_first();
238                while iter.is_valid() {
239                    let value = HummockValue::from_slice(iter.value()).unwrap_or_else(|_| {
240                        panic!(
241                            "decode failed for fast compact sst_id {} block_idx {} last_table_id {:?}",
242                            self.sstable_id, self.block_metas.len(), self.last_table_id
243                        )
244                    });
245                    self.add_impl(iter.key(), value, false).await?;
246                    iter.next();
247                }
248                return Ok(false);
249            }
250
251            self.build_block().await?;
252        }
253        self.last_full_key = largest_key;
254        assert_eq!(
255            meta.len as usize,
256            buf.len(),
257            "meta {} buf {} last_table_id {:?}",
258            meta.len,
259            buf.len(),
260            self.last_table_id
261        );
262        meta.offset = self.writer.data_len() as u32;
263        self.block_metas.push(meta);
264        self.filter_builder.add_raw_data(filter_data);
265        let block_meta = self.block_metas.last_mut().unwrap();
266        self.writer.write_block_bytes(buf, block_meta).await?;
267
268        Ok(true)
269    }
270
271    /// Add kv pair to sstable.
272    pub async fn add(
273        &mut self,
274        full_key: FullKey<&[u8]>,
275        value: HummockValue<&[u8]>,
276    ) -> HummockResult<()> {
277        self.add_impl(full_key, value, true).await
278    }
279
280    /// Add kv pair to sstable.
281    async fn add_impl(
282        &mut self,
283        full_key: FullKey<&[u8]>,
284        value: HummockValue<&[u8]>,
285        could_switch_block: bool,
286    ) -> HummockResult<()> {
287        const LARGE_KEY_LEN: usize = MAX_KEY_LEN >> 1;
288
289        let table_key_len = full_key.user_key.table_key.as_ref().len();
290        let table_value_len = match &value {
291            HummockValue::Put(t) => t.len(),
292            HummockValue::Delete => 0,
293        };
294        let large_value_len = self.options.max_sst_size as usize / 10;
295        let large_key_value_len = self.options.max_sst_size as usize / 2;
296        if table_key_len >= LARGE_KEY_LEN
297            || table_value_len > large_value_len
298            || table_key_len + table_value_len > large_key_value_len
299        {
300            let table_id = full_key.user_key.table_id.table_id();
301            tracing::warn!(
302                "A large key/value (table_id={}, key len={}, value len={}, epoch={}, spill offset={}) is added to block",
303                table_id,
304                table_key_len,
305                table_value_len,
306                full_key.epoch_with_gap.pure_epoch(),
307                full_key.epoch_with_gap.offset(),
308            );
309        }
310
311        // TODO: refine me
312        full_key.encode_into(&mut self.raw_key);
313        value.encode(&mut self.raw_value);
314        let is_new_user_key = self.last_full_key.is_empty()
315            || !user_key(&self.raw_key).eq(user_key(&self.last_full_key));
316        let table_id = full_key.user_key.table_id.table_id();
317        let is_new_table = self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id;
318        let current_block_size = self.current_block_size();
319        let is_block_full = current_block_size >= self.options.block_capacity
320            || (current_block_size > self.options.block_capacity / 4 * 3
321                && current_block_size + self.raw_value.len() + self.raw_key.len()
322                    > self.options.block_capacity);
323
324        if is_new_table {
325            assert!(
326                could_switch_block,
327                "is_new_user_key {} sst_id {} block_idx {} table_id {} last_table_id {:?} full_key {:?}",
328                is_new_user_key,
329                self.sstable_id,
330                self.block_metas.len(),
331                table_id,
332                self.last_table_id,
333                full_key
334            );
335            self.table_ids.insert(table_id);
336            self.finalize_last_table_stats();
337            self.last_table_id = Some(table_id);
338            if !self.block_builder.is_empty() {
339                self.build_block().await?;
340            }
341        } else if is_block_full && could_switch_block {
342            self.build_block().await?;
343        }
344        self.last_table_stats.total_key_count += 1;
345        self.epoch_set.insert(full_key.epoch_with_gap.pure_epoch());
346
347        // Rotate block builder if the previous one has been built.
348        if self.block_builder.is_empty() {
349            self.block_metas.push(BlockMeta {
350                offset: utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
351                    panic!(
352                        "WARN overflow can't convert writer_data_len {} into u32 sst_id {} block_idx {} tables {:?}",
353                        self.writer.data_len(),
354                        self.sstable_id,
355                        self.block_metas.len(),
356                        self.table_ids,
357                    )
358                }),
359                len: 0,
360                smallest_key: full_key.encode(),
361                uncompressed_size: 0,
362                total_key_count: 0,
363                stale_key_count: 0,
364            });
365        }
366
367        let table_id = full_key.user_key.table_id.table_id();
368        let mut extract_key = user_key(&self.raw_key);
369        extract_key = self.compaction_catalog_agent_ref.extract(extract_key);
370        // add bloom_filter check
371        if !extract_key.is_empty() {
372            self.filter_builder.add_key(extract_key, table_id);
373        }
374        self.block_builder.add(full_key, self.raw_value.as_ref());
375        self.block_metas.last_mut().unwrap().total_key_count += 1;
376        if !is_new_user_key || value.is_delete() {
377            self.block_metas.last_mut().unwrap().stale_key_count += 1;
378        }
379        self.last_table_stats.total_key_size += full_key.encoded_len() as i64;
380        self.last_table_stats.total_value_size += value.encoded_len() as i64;
381
382        self.last_full_key.clear();
383        self.last_full_key.extend_from_slice(&self.raw_key);
384
385        self.raw_key.clear();
386        self.raw_value.clear();
387        Ok(())
388    }
389
390    /// Finish building sst.
391    ///
392    /// # Format
393    ///
394    /// data:
395    ///
396    /// ```plain
397    /// | Block 0 | ... | Block N-1 | N (4B) |
398    /// ```
399    pub async fn finish(mut self) -> HummockResult<SstableBuilderOutput<W::Output>> {
400        let smallest_key = if self.block_metas.is_empty() {
401            vec![]
402        } else {
403            self.block_metas[0].smallest_key.clone()
404        };
405        let largest_key = self.last_full_key.clone();
406        self.finalize_last_table_stats();
407
408        self.build_block().await?;
409        let right_exclusive = false;
410        let meta_offset = self.writer.data_len() as u64;
411
412        let bloom_filter_kind = if self.filter_builder.support_blocked_raw_data() {
413            BloomFilterType::Blocked
414        } else {
415            BloomFilterType::Sstable
416        };
417        let bloom_filter = if self.options.bloom_false_positive > 0.0 {
418            self.filter_builder.finish(self.memory_limiter.clone())
419        } else {
420            vec![]
421        };
422
423        let total_key_count = self
424            .block_metas
425            .iter()
426            .map(|block_meta| block_meta.total_key_count as u64)
427            .sum::<u64>();
428        let stale_key_count = self
429            .block_metas
430            .iter()
431            .map(|block_meta| block_meta.stale_key_count as u64)
432            .sum::<u64>();
433        let uncompressed_file_size = self
434            .block_metas
435            .iter()
436            .map(|block_meta| block_meta.uncompressed_size as u64)
437            .sum::<u64>();
438
439        #[expect(deprecated)]
440        let mut meta = SstableMeta {
441            block_metas: self.block_metas,
442            bloom_filter,
443            estimated_size: 0,
444            key_count: utils::checked_into_u32(total_key_count).unwrap_or_else(|_| {
445                panic!(
446                    "WARN overflow can't convert total_key_count {} into u32 tables {:?}",
447                    total_key_count, self.table_ids,
448                )
449            }),
450            smallest_key,
451            largest_key,
452            version: VERSION,
453            meta_offset,
454            monotonic_tombstone_events: vec![],
455        };
456
457        let meta_encode_size = meta.encoded_size();
458        let encoded_size_u32 = utils::checked_into_u32(meta_encode_size).unwrap_or_else(|_| {
459            panic!(
460                "WARN overflow can't convert meta_encoded_size {} into u32 tables {:?}",
461                meta_encode_size, self.table_ids,
462            )
463        });
464        let meta_offset_u32 = utils::checked_into_u32(meta_offset).unwrap_or_else(|_| {
465            panic!(
466                "WARN overflow can't convert meta_offset {} into u32 tables {:?}",
467                meta_offset, self.table_ids,
468            )
469        });
470        meta.estimated_size = encoded_size_u32
471            .checked_add(meta_offset_u32)
472            .unwrap_or_else(|| {
473                panic!(
474                    "WARN overflow encoded_size_u32 {} meta_offset_u32 {} table_id {:?} table_ids {:?}",
475                    encoded_size_u32, meta_offset_u32, self.last_table_id, self.table_ids
476                )
477            });
478
479        let (avg_key_size, avg_value_size) = if self.table_stats.is_empty() {
480            (0, 0)
481        } else {
482            let total_key_count: usize = self
483                .table_stats
484                .values()
485                .map(|s| s.total_key_count as usize)
486                .sum();
487
488            if total_key_count == 0 {
489                (0, 0)
490            } else {
491                let total_key_size: usize = self
492                    .table_stats
493                    .values()
494                    .map(|s| s.total_key_size as usize)
495                    .sum();
496
497                let total_value_size: usize = self
498                    .table_stats
499                    .values()
500                    .map(|s| s.total_value_size as usize)
501                    .sum();
502
503                (
504                    total_key_size / total_key_count,
505                    total_value_size / total_key_count,
506                )
507            }
508        };
509
510        let (min_epoch, max_epoch) = {
511            if self.epoch_set.is_empty() {
512                (HummockEpoch::MAX, u64::MIN)
513            } else {
514                (
515                    *self.epoch_set.first().unwrap(),
516                    *self.epoch_set.last().unwrap(),
517                )
518            }
519        };
520
521        let sst_info: SstableInfo = SstableInfoInner {
522            object_id: self.sstable_id,
523            sst_id: self.sstable_id,
524            bloom_filter_kind,
525            key_range: KeyRange {
526                left: Bytes::from(meta.smallest_key.clone()),
527                right: Bytes::from(meta.largest_key.clone()),
528                right_exclusive,
529            },
530            file_size: meta.estimated_size as u64,
531            table_ids: self.table_ids.into_iter().collect(),
532            meta_offset: meta.meta_offset,
533            stale_key_count,
534            total_key_count,
535            uncompressed_file_size: uncompressed_file_size + meta.encoded_size() as u64,
536            min_epoch,
537            max_epoch,
538            range_tombstone_count: 0,
539            sst_size: meta.estimated_size as u64,
540        }
541        .into();
542        tracing::trace!(
543            "meta_size {} bloom_filter_size {}  add_key_counts {} stale_key_count {} min_epoch {} max_epoch {} epoch_count {}",
544            meta.encoded_size(),
545            meta.bloom_filter.len(),
546            total_key_count,
547            stale_key_count,
548            min_epoch,
549            max_epoch,
550            self.epoch_set.len()
551        );
552        let bloom_filter_size = meta.bloom_filter.len();
553        let sstable_file_size = sst_info.file_size as usize;
554
555        if !meta.block_metas.is_empty() {
556            // fill total_compressed_size
557            let mut last_table_id = meta.block_metas[0].table_id().table_id();
558            let mut last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
559            for block_meta in &meta.block_metas {
560                let block_table_id = block_meta.table_id();
561                if last_table_id != block_table_id.table_id() {
562                    last_table_id = block_table_id.table_id();
563                    last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
564                }
565
566                last_table_stats.total_compressed_size += block_meta.len as u64;
567            }
568        }
569
570        let writer_output = self.writer.finish(meta).await?;
571        // The timestamp is only used during full GC.
572        //
573        // Ideally object store object's last_modified should be used.
574        // However, it'll incur additional IO overhead since S3 lacks an interface to retrieve the last_modified timestamp after the PUT operation on an object.
575        //
576        // The local timestamp below is expected to precede the last_modified of object store object, given that the object store object is created afterward.
577        // It should help alleviate the clock drift issue.
578
579        let now = SystemTime::now()
580            .duration_since(SystemTime::UNIX_EPOCH)
581            .expect("Clock may have gone backwards")
582            .as_secs();
583        Ok(SstableBuilderOutput::<W::Output> {
584            sst_info: LocalSstableInfo::new(sst_info, self.table_stats, now),
585            writer_output,
586            stats: SstableBuilderOutputStats {
587                bloom_filter_size,
588                avg_key_size,
589                avg_value_size,
590                epoch_count: self.epoch_set.len(),
591                block_size_vec: self.block_size_vec,
592                sstable_file_size,
593            },
594        })
595    }
596
597    pub fn approximate_len(&self) -> usize {
598        self.writer.data_len()
599            + self.block_builder.approximate_len()
600            + self.filter_builder.approximate_len()
601    }
602
603    pub async fn build_block(&mut self) -> HummockResult<()> {
604        // Skip empty block.
605        if self.block_builder.is_empty() {
606            return Ok(());
607        }
608
609        let block_meta = self.block_metas.last_mut().unwrap();
610        let uncompressed_block_size = self.block_builder.uncompressed_block_size();
611        block_meta.uncompressed_size = utils::checked_into_u32(uncompressed_block_size)
612            .unwrap_or_else(|_| {
613                panic!(
614                    "WARN overflow can't convert uncompressed_block_size {} into u32 table {:?}",
615                    uncompressed_block_size,
616                    self.block_builder.table_id(),
617                )
618            });
619        let block = self.block_builder.build();
620        self.writer.write_block(block, block_meta).await?;
621        self.block_size_vec.push(block.len());
622        self.filter_builder
623            .switch_block(self.memory_limiter.clone());
624        let data_len = utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
625            panic!(
626                "WARN overflow can't convert writer_data_len {} into u32 table {:?}",
627                self.writer.data_len(),
628                self.block_builder.table_id(),
629            )
630        });
631        block_meta.len = data_len.checked_sub(block_meta.offset).unwrap_or_else(|| {
632            panic!(
633                "data_len should >= meta_offset, found data_len={}, meta_offset={}",
634                data_len, block_meta.offset
635            )
636        });
637
638        if data_len as usize > self.options.capacity * 2 {
639            tracing::warn!(
640                "WARN unexpected block size {} table {:?}",
641                data_len,
642                self.block_builder.table_id()
643            );
644        }
645
646        self.block_builder.clear();
647        Ok(())
648    }
649
650    pub fn is_empty(&self) -> bool {
651        self.writer.data_len() > 0
652    }
653
654    /// Returns true if we roughly reached capacity
655    pub fn reach_capacity(&self) -> bool {
656        self.approximate_len() >= self.options.capacity
657    }
658
659    fn finalize_last_table_stats(&mut self) {
660        if self.table_ids.is_empty() || self.last_table_id.is_none() {
661            return;
662        }
663        self.table_stats.insert(
664            self.last_table_id.unwrap(),
665            std::mem::take(&mut self.last_table_stats),
666        );
667    }
668}
669
670pub struct SstableBuilderOutputStats {
671    bloom_filter_size: usize,
672    avg_key_size: usize,
673    avg_value_size: usize,
674    epoch_count: usize,
675    block_size_vec: Vec<usize>, // for statistics
676    sstable_file_size: usize,
677}
678
679impl SstableBuilderOutputStats {
680    pub fn report_stats(&self, metrics: &Arc<CompactorMetrics>) {
681        if self.bloom_filter_size != 0 {
682            metrics
683                .sstable_bloom_filter_size
684                .observe(self.bloom_filter_size as _);
685        }
686
687        if self.sstable_file_size != 0 {
688            metrics
689                .sstable_file_size
690                .observe(self.sstable_file_size as _);
691        }
692
693        if self.avg_key_size != 0 {
694            metrics.sstable_avg_key_size.observe(self.avg_key_size as _);
695        }
696
697        if self.avg_value_size != 0 {
698            metrics
699                .sstable_avg_value_size
700                .observe(self.avg_value_size as _);
701        }
702
703        if self.epoch_count != 0 {
704            metrics
705                .sstable_distinct_epoch_count
706                .observe(self.epoch_count as _);
707        }
708
709        if !self.block_size_vec.is_empty() {
710            for block_size in &self.block_size_vec {
711                metrics.sstable_block_size.observe(*block_size as _);
712            }
713        }
714    }
715}
716
717#[cfg(test)]
718pub(super) mod tests {
719    use std::collections::{Bound, HashMap};
720
721    use risingwave_common::catalog::TableId;
722    use risingwave_common::hash::VirtualNode;
723    use risingwave_common::util::epoch::test_epoch;
724    use risingwave_hummock_sdk::key::UserKey;
725
726    use super::*;
727    use crate::assert_bytes_eq;
728    use crate::compaction_catalog_manager::{
729        CompactionCatalogAgent, DummyFilterKeyExtractor, MultiFilterKeyExtractor,
730    };
731    use crate::hummock::iterator::test_utils::mock_sstable_store;
732    use crate::hummock::sstable::xor_filter::BlockedXor16FilterBuilder;
733    use crate::hummock::test_utils::{
734        TEST_KEYS_COUNT, default_builder_opt_for_test, gen_test_sstable_impl, mock_sst_writer,
735        test_key_of, test_value_of,
736    };
737    use crate::hummock::{CachePolicy, Sstable, SstableWriterOptions, Xor8FilterBuilder};
738    use crate::monitor::StoreLocalStatistic;
739
740    #[tokio::test]
741    async fn test_empty() {
742        let opt = SstableBuilderOptions {
743            capacity: 0,
744            block_capacity: 4096,
745            restart_interval: 16,
746            bloom_false_positive: 0.001,
747            ..Default::default()
748        };
749
750        let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
751        let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
752        let b = SstableBuilder::for_test(
753            0,
754            mock_sst_writer(&opt),
755            opt,
756            table_id_to_vnode,
757            table_id_to_watermark_serde,
758        );
759
760        b.finish().await.unwrap();
761    }
762
763    #[tokio::test]
764    async fn test_basic() {
765        let opt = default_builder_opt_for_test();
766
767        let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
768        let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
769        let mut b = SstableBuilder::for_test(
770            0,
771            mock_sst_writer(&opt),
772            opt,
773            table_id_to_vnode,
774            table_id_to_watermark_serde,
775        );
776
777        for i in 0..TEST_KEYS_COUNT {
778            b.add_for_test(
779                test_key_of(i).to_ref(),
780                HummockValue::put(&test_value_of(i)),
781            )
782            .await
783            .unwrap();
784        }
785
786        let output = b.finish().await.unwrap();
787        let info = output.sst_info.sst_info;
788
789        assert_bytes_eq!(test_key_of(0).encode(), info.key_range.left);
790        assert_bytes_eq!(
791            test_key_of(TEST_KEYS_COUNT - 1).encode(),
792            info.key_range.right
793        );
794        let (data, meta) = output.writer_output;
795        assert_eq!(info.file_size, meta.estimated_size as u64);
796        let offset = info.meta_offset as usize;
797        let meta2 = SstableMeta::decode(&data[offset..]).unwrap();
798        assert_eq!(meta2, meta);
799    }
800
801    async fn test_with_bloom_filter<F: FilterBuilder>(with_blooms: bool) {
802        let key_count = 1000;
803
804        let opts = SstableBuilderOptions {
805            capacity: 0,
806            block_capacity: 4096,
807            restart_interval: 16,
808            bloom_false_positive: if with_blooms { 0.01 } else { 0.0 },
809            ..Default::default()
810        };
811
812        // build remote table
813        let sstable_store = mock_sstable_store().await;
814        let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
815        let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
816        let sst_info = gen_test_sstable_impl::<Vec<u8>, F>(
817            opts,
818            0,
819            (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
820            sstable_store.clone(),
821            CachePolicy::NotFill,
822            table_id_to_vnode,
823            table_id_to_watermark_serde,
824        )
825        .await;
826        let table = sstable_store
827            .sstable(&sst_info, &mut StoreLocalStatistic::default())
828            .await
829            .unwrap();
830
831        assert_eq!(table.has_bloom_filter(), with_blooms);
832        for i in 0..key_count {
833            let full_key = test_key_of(i);
834            if table.has_bloom_filter() {
835                let hash = Sstable::hash_for_bloom_filter(full_key.user_key.encode().as_slice(), 0);
836                let key_ref = full_key.user_key.as_ref();
837                assert!(
838                    table.may_match_hash(
839                        &(Bound::Included(key_ref), Bound::Included(key_ref)),
840                        hash
841                    ),
842                    "failed at {}",
843                    i
844                );
845            }
846        }
847    }
848
849    #[tokio::test]
850    async fn test_bloom_filter() {
851        test_with_bloom_filter::<Xor16FilterBuilder>(false).await;
852        test_with_bloom_filter::<Xor16FilterBuilder>(true).await;
853        test_with_bloom_filter::<Xor8FilterBuilder>(true).await;
854        test_with_bloom_filter::<BlockedXor16FilterBuilder>(true).await;
855    }
856
857    #[tokio::test]
858    async fn test_no_bloom_filter_block() {
859        let opts = SstableBuilderOptions::default();
860        // build remote table
861        let sstable_store = mock_sstable_store().await;
862        let writer_opts = SstableWriterOptions::default();
863        let object_id = 1;
864        let writer = sstable_store
865            .clone()
866            .create_sst_writer(object_id, writer_opts);
867        let mut filter = MultiFilterKeyExtractor::default();
868        filter.register(1, FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor));
869        filter.register(
870            2,
871            FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
872        );
873        filter.register(3, FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor));
874
875        let table_id_to_vnode = HashMap::from_iter(vec![
876            (1, VirtualNode::COUNT_FOR_TEST),
877            (2, VirtualNode::COUNT_FOR_TEST),
878            (3, VirtualNode::COUNT_FOR_TEST),
879        ]);
880        let table_id_to_watermark_serde = HashMap::from_iter(vec![(1, None), (2, None), (3, None)]);
881
882        let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
883            FilterKeyExtractorImpl::Multi(filter),
884            table_id_to_vnode,
885            table_id_to_watermark_serde,
886        ));
887
888        let mut builder = SstableBuilder::new(
889            object_id,
890            writer,
891            BlockedXor16FilterBuilder::new(1024),
892            opts,
893            compaction_catalog_agent_ref,
894            None,
895        );
896
897        let key_count: usize = 10000;
898        for table_id in 1..4 {
899            let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
900            for idx in 0..key_count {
901                table_key.resize(VirtualNode::SIZE, 0);
902                table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
903                let k = UserKey::for_test(TableId::new(table_id), table_key.as_ref());
904                let v = test_value_of(idx);
905                builder
906                    .add(
907                        FullKey::from_user_key(k, test_epoch(1)),
908                        HummockValue::put(v.as_ref()),
909                    )
910                    .await
911                    .unwrap();
912            }
913        }
914        let ret = builder.finish().await.unwrap();
915        let sst_info = ret.sst_info.sst_info.clone();
916        ret.writer_output.await.unwrap().unwrap();
917        let table = sstable_store
918            .sstable(&sst_info, &mut StoreLocalStatistic::default())
919            .await
920            .unwrap();
921        let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
922        for idx in 0..key_count {
923            table_key.resize(VirtualNode::SIZE, 0);
924            table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
925            let k = UserKey::for_test(TableId::new(2), table_key.as_slice());
926            let hash = Sstable::hash_for_bloom_filter(&k.encode(), 2);
927            let key_ref = k.as_ref();
928            assert!(
929                table.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash)
930            );
931        }
932    }
933}