risingwave_storage/hummock/sstable/
builder.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap};
16use std::mem;
17use std::sync::Arc;
18use std::time::SystemTime;
19
20use bytes::{Bytes, BytesMut};
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::util::row_serde::OrderedRowSerde;
24use risingwave_hummock_sdk::key::{FullKey, MAX_KEY_LEN, TABLE_PREFIX_LEN, UserKey, user_key};
25use risingwave_hummock_sdk::key_range::KeyRange;
26use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner, VnodeStatistics};
27use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap};
28use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, LocalSstableInfo};
29use risingwave_pb::hummock::BloomFilterType;
30
31use super::utils::CompressionAlgorithm;
32use super::{
33    BlockBuilder, BlockBuilderOptions, BlockMeta, DEFAULT_BLOCK_SIZE, DEFAULT_ENTRY_SIZE,
34    DEFAULT_RESTART_INTERVAL, SstableMeta, SstableWriter, VERSION,
35};
36use crate::compaction_catalog_manager::{
37    CompactionCatalogAgent, CompactionCatalogAgentRef, FilterKeyExtractorImpl,
38    FullKeyFilterKeyExtractor,
39};
40use crate::hummock::sstable::{FilterBuilder, utils};
41use crate::hummock::value::HummockValue;
42use crate::hummock::{
43    Block, BlockHolder, BlockIterator, HummockResult, MemoryLimiter, Xor16FilterBuilder,
44    try_shorten_block_smallest_key,
45};
46use crate::monitor::CompactorMetrics;
47use crate::opts::StorageOpts;
48
49pub const DEFAULT_SSTABLE_SIZE: usize = 4 * 1024 * 1024;
50pub const DEFAULT_BLOOM_FALSE_POSITIVE: f64 = 0.001;
51pub const DEFAULT_MAX_SST_SIZE: u64 = 512 * 1024 * 1024;
52pub const MIN_BLOCK_SIZE: usize = 8 * 1024;
53
54#[derive(Clone, Debug)]
55pub struct SstableBuilderOptions {
56    /// Approximate sstable capacity.
57    pub capacity: usize,
58    /// Approximate block capacity.
59    pub block_capacity: usize,
60    /// Restart point interval.
61    pub restart_interval: usize,
62    /// False positive probability of bloom filter.
63    pub bloom_false_positive: f64,
64    /// Compression algorithm.
65    pub compression_algorithm: CompressionAlgorithm,
66    pub max_sst_size: u64,
67    /// If set, block metadata keys will be shortened when their length exceeds this threshold.
68    pub shorten_block_meta_key_threshold: Option<usize>,
69    /// Max bytes for vnode key-range hints in SST metadata. None disables collection.
70    pub max_vnode_key_range_bytes: Option<usize>,
71}
72
73impl From<&StorageOpts> for SstableBuilderOptions {
74    fn from(options: &StorageOpts) -> SstableBuilderOptions {
75        let capacity: usize = (options.sstable_size_mb as usize) * (1 << 20);
76        SstableBuilderOptions {
77            capacity,
78            block_capacity: (options.block_size_kb as usize) * (1 << 10),
79            restart_interval: DEFAULT_RESTART_INTERVAL,
80            bloom_false_positive: options.bloom_false_positive,
81            compression_algorithm: CompressionAlgorithm::None,
82            max_sst_size: options.compactor_max_sst_size,
83            shorten_block_meta_key_threshold: options.shorten_block_meta_key_threshold,
84            max_vnode_key_range_bytes: None,
85        }
86    }
87}
88
89impl Default for SstableBuilderOptions {
90    fn default() -> Self {
91        Self {
92            capacity: DEFAULT_SSTABLE_SIZE,
93            block_capacity: DEFAULT_BLOCK_SIZE,
94            restart_interval: DEFAULT_RESTART_INTERVAL,
95            bloom_false_positive: DEFAULT_BLOOM_FALSE_POSITIVE,
96            compression_algorithm: CompressionAlgorithm::None,
97            max_sst_size: DEFAULT_MAX_SST_SIZE,
98            shorten_block_meta_key_threshold: None,
99            max_vnode_key_range_bytes: None,
100        }
101    }
102}
103
104pub struct SstableBuilderOutput<WO> {
105    pub sst_info: LocalSstableInfo,
106    pub writer_output: WO,
107    pub stats: SstableBuilderOutputStats,
108}
109
110pub struct SstableBuilder<W: SstableWriter, F: FilterBuilder> {
111    /// Options.
112    options: SstableBuilderOptions,
113    /// Data writer.
114    writer: W,
115    /// Current block builder.
116    block_builder: BlockBuilder,
117
118    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
119    /// Block metadata vec.
120    block_metas: Vec<BlockMeta>,
121
122    /// `table_id` of added keys.
123    table_ids: BTreeSet<TableId>,
124    last_full_key: Vec<u8>,
125    /// Buffer for encoded key and value to avoid allocation.
126    raw_key: BytesMut,
127    raw_value: BytesMut,
128    last_table_id: Option<TableId>,
129    sst_object_id: HummockSstableObjectId,
130
131    /// Per table stats.
132    table_stats: TableStatsMap,
133    /// `last_table_stats` accumulates stats for `last_table_id` and finalizes it in `table_stats`
134    /// by `finalize_last_table_stats`
135    last_table_stats: TableStats,
136
137    filter_builder: F,
138
139    epoch_set: BTreeSet<u64>,
140    memory_limiter: Option<Arc<MemoryLimiter>>,
141
142    block_size_vec: Vec<usize>, // for statistics
143    vnode_range_collector: Option<VnodeUserKeyRangeCollector>,
144}
145
146impl<W: SstableWriter> SstableBuilder<W, Xor16FilterBuilder> {
147    pub fn for_test(
148        sstable_id: u64,
149        writer: W,
150        options: SstableBuilderOptions,
151        table_id_to_vnode: HashMap<impl Into<TableId>, usize>,
152        table_id_to_watermark_serde: HashMap<
153            impl Into<TableId>,
154            Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
155        >,
156    ) -> Self {
157        let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
158            FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
159            table_id_to_vnode
160                .into_iter()
161                .map(|(table_id, v)| (table_id.into(), v))
162                .collect(),
163            table_id_to_watermark_serde
164                .into_iter()
165                .map(|(table_id, v)| (table_id.into(), v))
166                .collect(),
167            HashMap::default(),
168        ));
169
170        Self::new(
171            sstable_id,
172            writer,
173            Xor16FilterBuilder::new(options.capacity / DEFAULT_ENTRY_SIZE + 1),
174            options,
175            compaction_catalog_agent_ref,
176            None,
177        )
178    }
179}
180
181impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
182    pub fn new(
183        sst_object_id: impl Into<HummockSstableObjectId>,
184        writer: W,
185        filter_builder: F,
186        options: SstableBuilderOptions,
187        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
188        memory_limiter: Option<Arc<MemoryLimiter>>,
189    ) -> Self {
190        let sst_object_id = sst_object_id.into();
191        Self {
192            vnode_range_collector: VnodeUserKeyRangeCollector::with_limit(
193                options.max_vnode_key_range_bytes,
194            ),
195            options: options.clone(),
196            writer,
197            block_builder: BlockBuilder::new(BlockBuilderOptions {
198                capacity: options.block_capacity,
199                restart_interval: options.restart_interval,
200                compression_algorithm: options.compression_algorithm,
201            }),
202            filter_builder,
203            block_metas: Vec::with_capacity(options.capacity / options.block_capacity + 1),
204            table_ids: BTreeSet::new(),
205            last_table_id: None,
206            raw_key: BytesMut::new(),
207            raw_value: BytesMut::new(),
208            last_full_key: vec![],
209            sst_object_id,
210            compaction_catalog_agent_ref,
211            table_stats: Default::default(),
212            last_table_stats: Default::default(),
213            epoch_set: BTreeSet::default(),
214            memory_limiter,
215            block_size_vec: Vec::new(),
216        }
217    }
218
219    /// Add kv pair to sstable.
220    pub async fn add_for_test(
221        &mut self,
222        full_key: FullKey<&[u8]>,
223        value: HummockValue<&[u8]>,
224    ) -> HummockResult<()> {
225        self.add(full_key, value).await
226    }
227
228    pub fn current_block_size(&self) -> usize {
229        self.block_builder.approximate_len()
230    }
231
232    /// Add raw data of block to sstable. return false means fallback
233    pub async fn add_raw_block(
234        &mut self,
235        buf: Bytes,
236        filter_data: Vec<u8>,
237        smallest_key: FullKey<Vec<u8>>,
238        largest_key: Vec<u8>,
239        mut meta: BlockMeta,
240    ) -> HummockResult<bool> {
241        let table_id = smallest_key.user_key.table_id;
242        if self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id {
243            if !self.block_builder.is_empty() {
244                // Try to finish the previous `Block`` when the `table_id` is switched, making sure that the data in the `Block` doesn't span two `table_ids`.
245                self.build_block().await?;
246            }
247
248            self.table_ids.insert(table_id);
249            self.finalize_last_table_stats();
250            self.last_table_id = Some(table_id);
251        }
252
253        if !self.block_builder.is_empty() {
254            let min_block_size = std::cmp::min(MIN_BLOCK_SIZE, self.options.block_capacity / 4);
255
256            // If the previous block is too small, we should merge it into the previous block.
257            if self.block_builder.approximate_len() < min_block_size {
258                let block = Block::decode(buf, meta.uncompressed_size as usize)?;
259                let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
260                iter.seek_to_first();
261                while iter.is_valid() {
262                    let value = HummockValue::from_slice(iter.value()).unwrap_or_else(|_| {
263                        panic!(
264                            "decode failed for fast compact sst_id {} block_idx {} last_table_id {:?}",
265                            self.sst_object_id, self.block_metas.len(), self.last_table_id
266                        )
267                    });
268                    self.add_impl(iter.key(), value, false).await?;
269                    iter.next();
270                }
271                return Ok(false);
272            }
273
274            self.build_block().await?;
275        }
276        self.last_full_key = largest_key;
277        assert_eq!(
278            meta.len as usize,
279            buf.len(),
280            "meta {} buf {} last_table_id {:?}",
281            meta.len,
282            buf.len(),
283            self.last_table_id
284        );
285        meta.offset = self.writer.data_len() as u32;
286        self.block_metas.push(meta);
287        self.filter_builder.add_raw_data(filter_data);
288        let block_meta = self.block_metas.last_mut().unwrap();
289        self.writer.write_block_bytes(buf, block_meta).await?;
290
291        Ok(true)
292    }
293
294    /// Add kv pair to sstable.
295    pub async fn add(
296        &mut self,
297        full_key: FullKey<&[u8]>,
298        value: HummockValue<&[u8]>,
299    ) -> HummockResult<()> {
300        self.add_impl(full_key, value, true).await
301    }
302
303    /// Add kv pair to sstable.
304    async fn add_impl(
305        &mut self,
306        full_key: FullKey<&[u8]>,
307        value: HummockValue<&[u8]>,
308        could_switch_block: bool,
309    ) -> HummockResult<()> {
310        const LARGE_KEY_LEN: usize = MAX_KEY_LEN >> 1;
311
312        let table_key_len = full_key.user_key.table_key.as_ref().len();
313        let table_value_len = match &value {
314            HummockValue::Put(t) => t.len(),
315            HummockValue::Delete => 0,
316        };
317        let large_value_len = self.options.max_sst_size as usize / 10;
318        let large_key_value_len = self.options.max_sst_size as usize / 2;
319        if table_key_len >= LARGE_KEY_LEN
320            || table_value_len > large_value_len
321            || table_key_len + table_value_len > large_key_value_len
322        {
323            let table_id = full_key.user_key.table_id;
324            tracing::warn!(
325                "A large key/value (table_id={}, key len={}, value len={}, epoch={}, spill offset={}) is added to block",
326                table_id,
327                table_key_len,
328                table_value_len,
329                full_key.epoch_with_gap.pure_epoch(),
330                full_key.epoch_with_gap.offset(),
331            );
332        }
333
334        // TODO: refine me
335        full_key.encode_into(&mut self.raw_key);
336        value.encode(&mut self.raw_value);
337        let is_new_user_key = self.last_full_key.is_empty()
338            || !user_key(&self.raw_key).eq(user_key(self.last_full_key.as_slice()));
339        let table_id = full_key.user_key.table_id;
340        let is_new_table = self.last_table_id != Some(table_id);
341        let current_block_size = self.current_block_size();
342        let is_block_full = current_block_size >= self.options.block_capacity
343            || (current_block_size > self.options.block_capacity / 4 * 3
344                && current_block_size + self.raw_value.len() + self.raw_key.len()
345                    > self.options.block_capacity);
346
347        if is_new_table {
348            assert!(
349                could_switch_block,
350                "is_new_user_key {} sst_id {} block_idx {} table_id {} last_table_id {:?} full_key {:?}",
351                is_new_user_key,
352                self.sst_object_id,
353                self.block_metas.len(),
354                table_id,
355                self.last_table_id,
356                full_key
357            );
358            self.table_ids.insert(table_id);
359            self.finalize_last_table_stats();
360            self.last_table_id = Some(table_id);
361            if !self.block_builder.is_empty() {
362                self.build_block().await?;
363            }
364        } else if is_block_full && could_switch_block {
365            self.build_block().await?;
366        }
367        self.last_table_stats.total_key_count += 1;
368        self.epoch_set.insert(full_key.epoch_with_gap.pure_epoch());
369
370        // Rotate block builder if the previous one has been built.
371        if self.block_builder.is_empty() {
372            let smallest_key = if let Some(threshold) =
373                self.options.shorten_block_meta_key_threshold
374                && !self.last_full_key.is_empty()
375                && full_key.encoded_len() >= threshold
376            {
377                let prev = FullKey::decode(&self.last_full_key);
378                if let Some(shortened) = try_shorten_block_smallest_key(&prev, &full_key) {
379                    shortened.encode()
380                } else {
381                    full_key.encode()
382                }
383            } else {
384                full_key.encode()
385            };
386
387            self.block_metas.push(BlockMeta {
388                offset: utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
389                    panic!(
390                        "WARN overflow can't convert writer_data_len {} into u32 sst_id {} block_idx {} tables {:?}",
391                        self.writer.data_len(),
392                        self.sst_object_id,
393                        self.block_metas.len(),
394                        self.table_ids,
395                    )
396                }),
397                len: 0,
398                smallest_key,
399                uncompressed_size: 0,
400                total_key_count: 0,
401                stale_key_count: 0,
402            });
403        }
404
405        let filter_key = self
406            .compaction_catalog_agent_ref
407            .extract(user_key(&self.raw_key));
408        // add bloom_filter check
409        if !filter_key.is_empty() {
410            self.filter_builder
411                .add_key(filter_key, table_id.as_raw_id());
412        }
413        // Use pre-encoded key to avoid redundant encoding
414        self.block_builder.add(
415            table_id,
416            &self.raw_key[TABLE_PREFIX_LEN..],
417            self.raw_value.as_ref(),
418        );
419        self.block_metas.last_mut().unwrap().total_key_count += 1;
420        if !is_new_user_key || value.is_delete() {
421            self.block_metas.last_mut().unwrap().stale_key_count += 1;
422        }
423        self.last_table_stats.total_key_size += full_key.encoded_len() as i64;
424        self.last_table_stats.total_value_size += value.encoded_len() as i64;
425
426        if let Some(collector) = self.vnode_range_collector.as_mut() {
427            collector.observe_key(
428                VirtualNode::from_index(full_key.user_key.get_vnode_id()),
429                &self.raw_key,
430                &self.last_full_key,
431            );
432        }
433
434        self.last_full_key.clear();
435        self.last_full_key.extend_from_slice(&self.raw_key);
436
437        self.raw_key.clear();
438        self.raw_value.clear();
439        Ok(())
440    }
441
442    /// Finish building sst.
443    ///
444    /// # Format
445    ///
446    /// data:
447    ///
448    /// ```plain
449    /// | Block 0 | ... | Block N-1 | N (4B) |
450    /// ```
451    pub async fn finish(mut self) -> HummockResult<SstableBuilderOutput<W::Output>> {
452        let smallest_key = if self.block_metas.is_empty() {
453            vec![]
454        } else {
455            self.block_metas[0].smallest_key.clone()
456        };
457        let largest_key = self.last_full_key.clone();
458        self.finalize_last_table_stats();
459
460        // Vnode key-range hints are only supported for single-table SSTs.
461        // Multi-table SST scenarios should not enable max_vnode_key_range_bytes in config.
462        assert!(
463            self.table_ids.len() <= 1 || self.vnode_range_collector.is_none(),
464            "vnode key-range hints are only supported for single-table SSTs, found {} tables",
465            self.table_ids.len()
466        );
467
468        self.build_block().await?;
469        let right_exclusive = false;
470        let meta_offset = self.writer.data_len() as u64;
471
472        let bloom_filter_kind = if self.filter_builder.support_blocked_raw_data() {
473            BloomFilterType::Blocked
474        } else {
475            BloomFilterType::Sstable
476        };
477        let bloom_filter = if self.options.bloom_false_positive > 0.0 {
478            self.filter_builder.finish(self.memory_limiter.clone())
479        } else {
480            vec![]
481        };
482
483        let total_key_count = self
484            .block_metas
485            .iter()
486            .map(|block_meta| block_meta.total_key_count as u64)
487            .sum::<u64>();
488        let stale_key_count = self
489            .block_metas
490            .iter()
491            .map(|block_meta| block_meta.stale_key_count as u64)
492            .sum::<u64>();
493        let uncompressed_file_size = self
494            .block_metas
495            .iter()
496            .map(|block_meta| block_meta.uncompressed_size as u64)
497            .sum::<u64>();
498
499        #[expect(deprecated)]
500        let mut meta = SstableMeta {
501            block_metas: self.block_metas,
502            bloom_filter,
503            estimated_size: 0,
504            key_count: utils::checked_into_u32(total_key_count).unwrap_or_else(|_| {
505                panic!(
506                    "WARN overflow can't convert total_key_count {} into u32 tables {:?}",
507                    total_key_count, self.table_ids,
508                )
509            }),
510            smallest_key,
511            largest_key,
512            version: VERSION,
513            meta_offset,
514            monotonic_tombstone_events: vec![],
515        };
516
517        let meta_encode_size = meta.encoded_size();
518        let encoded_size_u32 = utils::checked_into_u32(meta_encode_size).unwrap_or_else(|_| {
519            panic!(
520                "WARN overflow can't convert meta_encoded_size {} into u32 tables {:?}",
521                meta_encode_size, self.table_ids,
522            )
523        });
524        let meta_offset_u32 = utils::checked_into_u32(meta_offset).unwrap_or_else(|_| {
525            panic!(
526                "WARN overflow can't convert meta_offset {} into u32 tables {:?}",
527                meta_offset, self.table_ids,
528            )
529        });
530        meta.estimated_size = encoded_size_u32
531            .checked_add(meta_offset_u32)
532            .unwrap_or_else(|| {
533                panic!(
534                    "WARN overflow encoded_size_u32 {} meta_offset_u32 {} table_id {:?} table_ids {:?}",
535                    encoded_size_u32, meta_offset_u32, self.last_table_id, self.table_ids
536                )
537            });
538
539        let (avg_key_size, avg_value_size) = if self.table_stats.is_empty() {
540            (0, 0)
541        } else {
542            let total_key_count: usize = self
543                .table_stats
544                .values()
545                .map(|s| s.total_key_count as usize)
546                .sum();
547
548            if total_key_count == 0 {
549                (0, 0)
550            } else {
551                let total_key_size: usize = self
552                    .table_stats
553                    .values()
554                    .map(|s| s.total_key_size as usize)
555                    .sum();
556
557                let total_value_size: usize = self
558                    .table_stats
559                    .values()
560                    .map(|s| s.total_value_size as usize)
561                    .sum();
562
563                (
564                    total_key_size / total_key_count,
565                    total_value_size / total_key_count,
566                )
567            }
568        };
569
570        let (min_epoch, max_epoch) = {
571            if self.epoch_set.is_empty() {
572                (HummockEpoch::MAX, u64::MIN)
573            } else {
574                (
575                    *self.epoch_set.first().unwrap(),
576                    *self.epoch_set.last().unwrap(),
577                )
578            }
579        };
580
581        let vnode_user_key_ranges = self
582            .vnode_range_collector
583            .take()
584            .and_then(|collector| collector.finish(&self.last_full_key));
585
586        let sst_info: SstableInfo = SstableInfoInner {
587            object_id: self.sst_object_id,
588            // use the same sst_id as object_id for initial sst
589            sst_id: self.sst_object_id.as_raw_id().into(),
590            bloom_filter_kind,
591            key_range: KeyRange {
592                left: Bytes::from(meta.smallest_key.clone()),
593                right: Bytes::from(meta.largest_key.clone()),
594                right_exclusive,
595            },
596            file_size: meta.estimated_size as u64,
597            table_ids: self.table_ids.into_iter().collect(),
598            meta_offset: meta.meta_offset,
599            stale_key_count,
600            total_key_count,
601            uncompressed_file_size: uncompressed_file_size + meta.encoded_size() as u64,
602            min_epoch,
603            max_epoch,
604            range_tombstone_count: 0,
605            sst_size: meta.estimated_size as u64,
606            vnode_statistics: vnode_user_key_ranges,
607        }
608        .into();
609
610        tracing::trace!(
611            "meta_size {} bloom_filter_size {}  add_key_counts {} stale_key_count {} min_epoch {} max_epoch {} epoch_count {}",
612            meta.encoded_size(),
613            meta.bloom_filter.len(),
614            total_key_count,
615            stale_key_count,
616            min_epoch,
617            max_epoch,
618            self.epoch_set.len()
619        );
620        let bloom_filter_size = meta.bloom_filter.len();
621        let sstable_file_size = sst_info.file_size as usize;
622
623        if !meta.block_metas.is_empty() {
624            // fill total_compressed_size
625            let mut last_table_id = meta.block_metas[0].table_id();
626            let mut last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
627            for block_meta in &meta.block_metas {
628                let block_table_id = block_meta.table_id();
629                if last_table_id != block_table_id {
630                    last_table_id = block_table_id;
631                    last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
632                }
633
634                last_table_stats.total_compressed_size += block_meta.len as u64;
635            }
636        }
637
638        let writer_output = self.writer.finish(meta).await?;
639        // The timestamp is only used during full GC.
640        //
641        // Ideally object store object's last_modified should be used.
642        // However, it'll incur additional IO overhead since S3 lacks an interface to retrieve the last_modified timestamp after the PUT operation on an object.
643        //
644        // The local timestamp below is expected to precede the last_modified of object store object, given that the object store object is created afterward.
645        // It should help alleviate the clock drift issue.
646
647        let now = SystemTime::now()
648            .duration_since(SystemTime::UNIX_EPOCH)
649            .expect("Clock may have gone backwards")
650            .as_secs();
651        Ok(SstableBuilderOutput::<W::Output> {
652            sst_info: LocalSstableInfo::new(sst_info, self.table_stats, now),
653            writer_output,
654            stats: SstableBuilderOutputStats {
655                bloom_filter_size,
656                avg_key_size,
657                avg_value_size,
658                epoch_count: self.epoch_set.len(),
659                block_size_vec: self.block_size_vec,
660                sstable_file_size,
661            },
662        })
663    }
664
665    pub fn approximate_len(&self) -> usize {
666        self.writer.data_len()
667            + self.block_builder.approximate_len()
668            + self.filter_builder.approximate_len()
669    }
670
671    pub async fn build_block(&mut self) -> HummockResult<()> {
672        // Skip empty block.
673        if self.block_builder.is_empty() {
674            return Ok(());
675        }
676
677        let block_meta = self.block_metas.last_mut().unwrap();
678        let uncompressed_block_size = self.block_builder.uncompressed_block_size();
679        block_meta.uncompressed_size = utils::checked_into_u32(uncompressed_block_size)
680            .unwrap_or_else(|_| {
681                panic!(
682                    "WARN overflow can't convert uncompressed_block_size {} into u32 table {:?}",
683                    uncompressed_block_size,
684                    self.block_builder.table_id(),
685                )
686            });
687        let block = self.block_builder.build();
688        self.writer.write_block(block, block_meta).await?;
689        self.block_size_vec.push(block.len());
690        self.filter_builder
691            .switch_block(self.memory_limiter.clone());
692        let data_len = utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
693            panic!(
694                "WARN overflow can't convert writer_data_len {} into u32 table {:?}",
695                self.writer.data_len(),
696                self.block_builder.table_id(),
697            )
698        });
699        block_meta.len = data_len.checked_sub(block_meta.offset).unwrap_or_else(|| {
700            panic!(
701                "data_len should >= meta_offset, found data_len={}, meta_offset={}",
702                data_len, block_meta.offset
703            )
704        });
705
706        if data_len as usize > self.options.capacity * 2 {
707            tracing::warn!(
708                "WARN unexpected block size {} table {:?}",
709                data_len,
710                self.block_builder.table_id()
711            );
712        }
713
714        self.block_builder.clear();
715        Ok(())
716    }
717
718    pub fn is_empty(&self) -> bool {
719        self.writer.data_len() > 0
720    }
721
722    /// Returns true if we roughly reached capacity
723    pub fn reach_capacity(&self) -> bool {
724        self.approximate_len() >= self.options.capacity
725    }
726
727    fn finalize_last_table_stats(&mut self) {
728        if self.table_ids.is_empty() || self.last_table_id.is_none() {
729            return;
730        }
731        self.table_stats.insert(
732            self.last_table_id.unwrap(),
733            std::mem::take(&mut self.last_table_stats),
734        );
735    }
736}
737
738/// Collects vnode key-range hints during SST building.
739struct VnodeUserKeyRangeCollector {
740    max_bytes: usize,
741    current_size: usize,
742    ranges: BTreeMap<VirtualNode, (UserKey<Bytes>, UserKey<Bytes>)>,
743    current_vnode: VirtualNode,
744    range_start_key: Vec<u8>,
745}
746
747impl VnodeUserKeyRangeCollector {
748    fn new(max_bytes: usize) -> Self {
749        Self {
750            max_bytes,
751            current_size: 0,
752            ranges: BTreeMap::new(),
753            current_vnode: VirtualNode::ZERO,
754            range_start_key: Vec::new(),
755        }
756    }
757
758    fn with_limit(max_bytes: Option<usize>) -> Option<Self> {
759        max_bytes.filter(|&n| n > 0).map(Self::new)
760    }
761
762    /// Track vnode boundaries. On vnode switch, seals previous range with `prev_key` as right bound.
763    /// Range: `[first_key_of_vnode, last_key_of_vnode]` (inclusive).
764    fn observe_key(&mut self, vnode: VirtualNode, key: &[u8], prev_key: &[u8]) {
765        if self.current_size >= self.max_bytes {
766            return;
767        }
768
769        // First key
770        if self.range_start_key.is_empty() {
771            self.current_vnode = vnode;
772            self.range_start_key = key.to_vec();
773            return;
774        }
775
776        // Same vnode, nothing to do
777        if vnode == self.current_vnode {
778            return;
779        }
780
781        // Vnode changed: seal previous range
782        self.seal_range(prev_key);
783
784        // Check if budget exhausted after sealing
785        if self.current_size >= self.max_bytes {
786            return;
787        }
788
789        // Start new vnode
790        self.current_vnode = vnode;
791        self.range_start_key = key.to_vec();
792    }
793
794    /// Seal current range. Asserts `vnode/table_id` consistency between left and right keys.
795    fn seal_range(&mut self, right_key: &[u8]) {
796        let left_key = mem::take(&mut self.range_start_key);
797        self.current_size += mem::size_of::<VirtualNode>() + left_key.len() + right_key.len();
798
799        let left_full_key = FullKey::decode(&left_key);
800        let right_full_key = FullKey::decode(right_key);
801        let left_user_key = left_full_key.user_key.copy_into();
802        let right_user_key = right_full_key.user_key.copy_into();
803
804        // Sanity checks for data correctness:
805        // 1. left and right keys have same `vnode`
806        // 2. vnode matches `current_vnode` being tracked
807        // 3. left and right keys have same `table_id`
808        assert_eq!(
809            left_user_key.get_vnode_id(),
810            right_user_key.get_vnode_id(),
811            "vnode changed within range: left_user {:?}, right_user {:?}",
812            left_user_key,
813            right_user_key
814        );
815        assert_eq!(
816            left_user_key.get_vnode_id(),
817            self.current_vnode.to_index(),
818            "vnode mismatch: left {:?}, right {:?}, expected vnode {}",
819            left_user_key,
820            right_user_key,
821            self.current_vnode.to_index()
822        );
823        assert_eq!(
824            left_user_key.table_id, right_user_key.table_id,
825            "table_id changed within range: left {:?}, right {:?}",
826            left_user_key, right_user_key
827        );
828
829        self.ranges
830            .insert(self.current_vnode, (left_user_key, right_user_key));
831    }
832
833    /// Returns `Some` if >1 vnodes collected, `None` otherwise (single-vnode needs no hints).
834    fn finish(mut self, last_key: &[u8]) -> Option<VnodeStatistics> {
835        if !self.range_start_key.is_empty() {
836            self.seal_range(last_key);
837        }
838
839        if self.ranges.len() > 1 {
840            // Validate all ranges belong to the same table_id before building VnodeStatistics
841            let mut table_ids = self
842                .ranges
843                .values()
844                .flat_map(|(left, right)| [left.table_id, right.table_id]);
845            if let Some(first_table_id) = table_ids.next() {
846                for table_id in table_ids {
847                    assert_eq!(
848                        table_id, first_table_id,
849                        "all vnode ranges must belong to the same table_id, found {:?} and {:?}",
850                        table_id, first_table_id
851                    );
852                }
853            }
854
855            Some(VnodeStatistics::from_map(self.ranges))
856        } else {
857            None
858        }
859    }
860}
861
862pub struct SstableBuilderOutputStats {
863    bloom_filter_size: usize,
864    avg_key_size: usize,
865    avg_value_size: usize,
866    epoch_count: usize,
867    block_size_vec: Vec<usize>, // for statistics
868    sstable_file_size: usize,
869}
870
871impl SstableBuilderOutputStats {
872    pub fn report_stats(&self, metrics: &Arc<CompactorMetrics>) {
873        if self.bloom_filter_size != 0 {
874            metrics
875                .sstable_bloom_filter_size
876                .observe(self.bloom_filter_size as _);
877        }
878
879        if self.sstable_file_size != 0 {
880            metrics
881                .sstable_file_size
882                .observe(self.sstable_file_size as _);
883        }
884
885        if self.avg_key_size != 0 {
886            metrics.sstable_avg_key_size.observe(self.avg_key_size as _);
887        }
888
889        if self.avg_value_size != 0 {
890            metrics
891                .sstable_avg_value_size
892                .observe(self.avg_value_size as _);
893        }
894
895        if self.epoch_count != 0 {
896            metrics
897                .sstable_distinct_epoch_count
898                .observe(self.epoch_count as _);
899        }
900
901        if !self.block_size_vec.is_empty() {
902            for block_size in &self.block_size_vec {
903                metrics.sstable_block_size.observe(*block_size as _);
904            }
905        }
906    }
907}
908
909#[cfg(test)]
910pub(super) mod tests {
911    use std::collections::{Bound, HashMap};
912
913    use risingwave_common::catalog::TableId;
914    use risingwave_common::hash::VirtualNode;
915    use risingwave_common::util::epoch::test_epoch;
916    use risingwave_hummock_sdk::key::UserKey;
917
918    use super::*;
919    use crate::assert_bytes_eq;
920    use crate::compaction_catalog_manager::{
921        CompactionCatalogAgent, DummyFilterKeyExtractor, MultiFilterKeyExtractor,
922    };
923    use crate::hummock::iterator::test_utils::mock_sstable_store;
924    use crate::hummock::sstable::xor_filter::BlockedXor16FilterBuilder;
925    use crate::hummock::test_utils::{
926        TEST_KEYS_COUNT, default_builder_opt_for_test, gen_test_sstable_impl, mock_sst_writer,
927        test_key_of, test_value_of,
928    };
929    use crate::hummock::{CachePolicy, Sstable, SstableWriterOptions, Xor8FilterBuilder};
930    use crate::monitor::StoreLocalStatistic;
931
932    #[tokio::test]
933    async fn test_empty() {
934        let opt = SstableBuilderOptions {
935            capacity: 0,
936            block_capacity: 4096,
937            restart_interval: 16,
938            bloom_false_positive: 0.001,
939            ..Default::default()
940        };
941
942        let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
943        let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
944        let b = SstableBuilder::for_test(
945            0,
946            mock_sst_writer(&opt),
947            opt,
948            table_id_to_vnode,
949            table_id_to_watermark_serde,
950        );
951
952        b.finish().await.unwrap();
953    }
954
955    fn encode_full_key(vnode: VirtualNode, table_key_suffix: &[u8]) -> Vec<u8> {
956        let mut table_key = vnode.to_be_bytes().to_vec();
957        table_key.extend_from_slice(table_key_suffix);
958        FullKey::for_test(TableId::default(), table_key, 0).encode()
959    }
960
961    fn table_key_of(vnode: VirtualNode, suffix: &[u8]) -> Vec<u8> {
962        let mut key = vnode.to_be_bytes().to_vec();
963        key.extend_from_slice(suffix);
964        key
965    }
966
967    #[test]
968    fn test_vnode_user_key_range_basic_collection() {
969        // Test basic multi-vnode collection with boundary semantics verification.
970        // Validates: vnode switching triggers range sealing, boundaries are inclusive (right_exclusive=false).
971        let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(1024)).unwrap();
972        let vnode_1 = VirtualNode::from_index(1);
973        let vnode_2 = VirtualNode::from_index(2);
974
975        let k1 = encode_full_key(vnode_1, b"k1");
976        let k2 = encode_full_key(vnode_1, b"k2");
977        let k3 = encode_full_key(vnode_2, b"k3");
978        let k4 = encode_full_key(vnode_2, b"k4");
979
980        collector.observe_key(vnode_1, &k1, &[]);
981        collector.observe_key(vnode_1, &k2, &k1);
982        collector.observe_key(vnode_2, &k3, &k2);
983        collector.observe_key(vnode_2, &k4, &k3);
984
985        let info = collector.finish(&k4).unwrap();
986        assert_eq!(info.vnode_user_key_ranges().len(), 2);
987
988        // Verify vnode_1: left = first key, right = last key before switch
989        let (range1_left, range1_right) = info.get_vnode_user_key_range(vnode_1).unwrap();
990        assert_eq!(range1_left.table_key.as_ref(), table_key_of(vnode_1, b"k1"));
991        assert_eq!(
992            range1_right.table_key.as_ref(),
993            table_key_of(vnode_1, b"k2")
994        );
995
996        // Verify vnode_2: left = first key, right = SST's last key
997        let (range2_left, range2_right) = info.get_vnode_user_key_range(vnode_2).unwrap();
998        assert_eq!(range2_left.table_key.as_ref(), table_key_of(vnode_2, b"k3"));
999        assert_eq!(
1000            range2_right.table_key.as_ref(),
1001            table_key_of(vnode_2, b"k4")
1002        );
1003    }
1004
1005    #[test]
1006    fn test_vnode_user_key_range_capacity_limit() {
1007        // Test "allow over-limit write" semantics: inserting a range may exceed capacity,
1008        // but stops before starting the next range if it would exceed the limit.
1009        //
1010        // Calculation based on encoded key sizes:
1011        // Each range = sizeof(VirtualNode) + left.len() + right.len().
1012        // Use a limit that:
1013        //   - allows writing vnode_1 (range_size),
1014        //   - allows over-limit write of vnode_2 (range_size + estimated_next),
1015        //   - stops before vnode_3 (2 * range_size + estimated_next > limit).
1016        let vnode_1 = VirtualNode::from_index(1);
1017        let vnode_2 = VirtualNode::from_index(2);
1018        let vnode_3 = VirtualNode::from_index(3);
1019        let vnode_4 = VirtualNode::from_index(4);
1020
1021        let k1 = encode_full_key(vnode_1, b"k1");
1022        let k2 = encode_full_key(vnode_1, b"k2");
1023        let k3 = encode_full_key(vnode_2, b"k3");
1024        let k4 = encode_full_key(vnode_2, b"k4");
1025        let k5 = encode_full_key(vnode_3, b"k5");
1026        let k6 = encode_full_key(vnode_3, b"k6");
1027        let k7 = encode_full_key(vnode_4, b"k7");
1028
1029        let range_size = mem::size_of::<VirtualNode>() + k1.len() + k2.len();
1030        let estimated_next_size = mem::size_of::<VirtualNode>() + k3.len();
1031        let limit = range_size + estimated_next_size; // allow second range, stop before third
1032        let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(limit)).unwrap();
1033
1034        collector.observe_key(vnode_1, &k1, &[]);
1035        collector.observe_key(vnode_1, &k2, &k1);
1036        collector.observe_key(vnode_2, &k3, &k2); // Seals vnode_1 (6 bytes)
1037        collector.observe_key(vnode_2, &k4, &k3);
1038        collector.observe_key(vnode_3, &k5, &k4); // Seals vnode_2 (12 bytes total), stops before vnode_3
1039        collector.observe_key(vnode_3, &k6, &k5); // Ignored
1040        collector.observe_key(vnode_4, &k7, &k6); // Ignored
1041
1042        let info = collector.finish(&k7).unwrap();
1043        assert_eq!(
1044            info.vnode_user_key_ranges().len(),
1045            2,
1046            "Should collect exactly 2 vnodes"
1047        );
1048
1049        // Verify collected vnodes have correct boundaries
1050        let (range1_left, range1_right) = info.get_vnode_user_key_range(vnode_1).unwrap();
1051        assert_eq!(range1_left.table_key.as_ref(), table_key_of(vnode_1, b"k1"));
1052        assert_eq!(
1053            range1_right.table_key.as_ref(),
1054            table_key_of(vnode_1, b"k2")
1055        );
1056
1057        let (range2_left, range2_right) = info.get_vnode_user_key_range(vnode_2).unwrap();
1058        assert_eq!(range2_left.table_key.as_ref(), table_key_of(vnode_2, b"k3"));
1059        assert_eq!(
1060            range2_right.table_key.as_ref(),
1061            table_key_of(vnode_2, b"k4")
1062        );
1063
1064        // Verify stopped vnodes are not collected
1065        assert!(info.get_vnode_user_key_range(vnode_3).is_none());
1066        assert!(info.get_vnode_user_key_range(vnode_4).is_none());
1067    }
1068
1069    #[test]
1070    fn test_vnode_user_key_range_sparse_distribution() {
1071        // Test non-consecutive vnodes (production scenario: hash-based data distribution).
1072        // Validates: collector handles sparse vnode indices correctly, gaps are not filled.
1073        let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(1024)).unwrap();
1074        let vnode_5 = VirtualNode::from_index(5);
1075        let vnode_10 = VirtualNode::from_index(10);
1076        let vnode_100 = VirtualNode::from_index(100);
1077
1078        let keys = vec![
1079            (vnode_5, encode_full_key(vnode_5, b"key_005_001")),
1080            (vnode_5, encode_full_key(vnode_5, b"key_005_002")),
1081            (vnode_5, encode_full_key(vnode_5, b"key_005_999")),
1082            (vnode_10, encode_full_key(vnode_10, b"key_010_001")),
1083            (vnode_10, encode_full_key(vnode_10, b"key_010_100")),
1084            (vnode_100, encode_full_key(vnode_100, b"key_100_001")),
1085            (vnode_100, encode_full_key(vnode_100, b"key_100_999")),
1086        ];
1087
1088        let mut prev_key = Vec::new();
1089        for (vnode, key) in &keys {
1090            collector.observe_key(*vnode, key, &prev_key);
1091            prev_key = key.clone();
1092        }
1093
1094        let info = collector.finish(&prev_key).unwrap();
1095        assert_eq!(info.vnode_user_key_ranges().len(), 3);
1096
1097        // Verify each collected vnode has correct boundaries
1098        let (range5_left, range5_right) = info.get_vnode_user_key_range(vnode_5).unwrap();
1099        assert_eq!(
1100            range5_left.table_key.as_ref(),
1101            table_key_of(vnode_5, b"key_005_001")
1102        );
1103        assert_eq!(
1104            range5_right.table_key.as_ref(),
1105            table_key_of(vnode_5, b"key_005_999")
1106        );
1107
1108        let (range10_left, range10_right) = info.get_vnode_user_key_range(vnode_10).unwrap();
1109        assert_eq!(
1110            range10_left.table_key.as_ref(),
1111            table_key_of(vnode_10, b"key_010_001")
1112        );
1113        assert_eq!(
1114            range10_right.table_key.as_ref(),
1115            table_key_of(vnode_10, b"key_010_100")
1116        );
1117
1118        let (range100_left, range100_right) = info.get_vnode_user_key_range(vnode_100).unwrap();
1119        assert_eq!(
1120            range100_left.table_key.as_ref(),
1121            table_key_of(vnode_100, b"key_100_001")
1122        );
1123        assert_eq!(
1124            range100_right.table_key.as_ref(),
1125            table_key_of(vnode_100, b"key_100_999")
1126        );
1127
1128        // Verify gaps are not filled (no data for vnodes 0, 7, 50)
1129        assert!(
1130            info.get_vnode_user_key_range(VirtualNode::from_index(0))
1131                .is_none()
1132        );
1133        assert!(
1134            info.get_vnode_user_key_range(VirtualNode::from_index(7))
1135                .is_none()
1136        );
1137        assert!(
1138            info.get_vnode_user_key_range(VirtualNode::from_index(50))
1139                .is_none()
1140        );
1141    }
1142
1143    #[test]
1144    fn test_vnode_user_key_range_edge_cases() {
1145        // Test 1: Configuration disabled (None or 0) should return None
1146        assert!(VnodeUserKeyRangeCollector::with_limit(None).is_none());
1147        assert!(VnodeUserKeyRangeCollector::with_limit(Some(0)).is_none());
1148
1149        // Test 2: Empty collector (no keys) should return None
1150        let collector = VnodeUserKeyRangeCollector::with_limit(Some(1024)).unwrap();
1151        assert!(
1152            collector
1153                .finish(&encode_full_key(VirtualNode::ZERO, b"any"))
1154                .is_none()
1155        );
1156
1157        // Test 3: Single vnode (optimization: don't emit hints for single-vnode SSTs)
1158        let vnode = VirtualNode::from_index(5);
1159        let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(1024)).unwrap();
1160        let a = encode_full_key(vnode, b"a");
1161        let b = encode_full_key(vnode, b"b");
1162        let c = encode_full_key(vnode, b"c");
1163        collector.observe_key(vnode, &a, &[]);
1164        collector.observe_key(vnode, &b, &a);
1165        collector.observe_key(vnode, &c, &b);
1166        assert!(
1167            collector.finish(&c).is_none(),
1168            "Single-vnode SST should not emit hints"
1169        );
1170
1171        // Test 4: Extreme limit (1 byte) - first vnode exceeds, becomes single-vnode
1172        // Range size = sizeof(VirtualNode=2) + encoded_key_len * 2
1173        // With 1-byte limit: vnode_1 range exceeds but allowed (over-limit write)
1174        // vnode_2 check: current_size >= max_bytes, stop. Result: only vnode_1, returns None (single-vnode)
1175        let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(1)).unwrap();
1176        let vnode_1 = VirtualNode::from_index(1);
1177        let vnode_2 = VirtualNode::from_index(2);
1178        let key1 = encode_full_key(vnode_1, b"key1");
1179        let key2 = encode_full_key(vnode_1, b"key2");
1180        let key3 = encode_full_key(vnode_2, b"key3");
1181        collector.observe_key(vnode_1, &key1, &[]);
1182        collector.observe_key(vnode_1, &key2, &key1);
1183        collector.observe_key(vnode_2, &key3, &key2); // Seals vnode_1, stops before vnode_2
1184        assert!(
1185            collector.finish(&key3).is_none(),
1186            "Only 1 vnode collected, should return None"
1187        );
1188    }
1189
1190    #[tokio::test]
1191    async fn test_basic() {
1192        let opt = default_builder_opt_for_test();
1193
1194        let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
1195        let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
1196        let mut b = SstableBuilder::for_test(
1197            0,
1198            mock_sst_writer(&opt),
1199            opt,
1200            table_id_to_vnode,
1201            table_id_to_watermark_serde,
1202        );
1203
1204        for i in 0..TEST_KEYS_COUNT {
1205            b.add_for_test(
1206                test_key_of(i).to_ref(),
1207                HummockValue::put(&test_value_of(i)),
1208            )
1209            .await
1210            .unwrap();
1211        }
1212
1213        let output = b.finish().await.unwrap();
1214        let info = output.sst_info.sst_info;
1215
1216        assert_bytes_eq!(test_key_of(0).encode(), info.key_range.left);
1217        assert_bytes_eq!(
1218            test_key_of(TEST_KEYS_COUNT - 1).encode(),
1219            info.key_range.right
1220        );
1221        let (data, meta) = output.writer_output;
1222        assert_eq!(info.file_size, meta.estimated_size as u64);
1223        let offset = info.meta_offset as usize;
1224        let meta2 = SstableMeta::decode(&data[offset..]).unwrap();
1225        assert_eq!(meta2, meta);
1226    }
1227
1228    async fn test_with_bloom_filter<F: FilterBuilder>(with_blooms: bool) {
1229        let key_count = 1000;
1230
1231        let opts = SstableBuilderOptions {
1232            capacity: 0,
1233            block_capacity: 4096,
1234            restart_interval: 16,
1235            bloom_false_positive: if with_blooms { 0.01 } else { 0.0 },
1236            ..Default::default()
1237        };
1238
1239        // build remote table
1240        let sstable_store = mock_sstable_store().await;
1241        let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
1242        let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
1243        let sst_info = gen_test_sstable_impl::<Vec<u8>, F>(
1244            opts,
1245            0,
1246            (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
1247            sstable_store.clone(),
1248            CachePolicy::NotFill,
1249            table_id_to_vnode,
1250            table_id_to_watermark_serde,
1251        )
1252        .await;
1253        let table = sstable_store
1254            .sstable(&sst_info, &mut StoreLocalStatistic::default())
1255            .await
1256            .unwrap();
1257
1258        assert_eq!(table.has_bloom_filter(), with_blooms);
1259        for i in 0..key_count {
1260            let full_key = test_key_of(i);
1261            if table.has_bloom_filter() {
1262                let hash = Sstable::hash_for_bloom_filter(full_key.user_key.encode().as_slice(), 0);
1263                let key_ref = full_key.user_key.as_ref();
1264                assert!(
1265                    table.may_match_hash(
1266                        &(Bound::Included(key_ref), Bound::Included(key_ref)),
1267                        hash
1268                    ),
1269                    "failed at {}",
1270                    i
1271                );
1272            }
1273        }
1274    }
1275
1276    #[tokio::test]
1277    async fn test_bloom_filter() {
1278        test_with_bloom_filter::<Xor16FilterBuilder>(false).await;
1279        test_with_bloom_filter::<Xor16FilterBuilder>(true).await;
1280        test_with_bloom_filter::<Xor8FilterBuilder>(true).await;
1281        test_with_bloom_filter::<BlockedXor16FilterBuilder>(true).await;
1282    }
1283
1284    #[tokio::test]
1285    async fn test_no_bloom_filter_block() {
1286        let opts = SstableBuilderOptions::default();
1287        // build remote table
1288        let sstable_store = mock_sstable_store().await;
1289        let writer_opts = SstableWriterOptions::default();
1290        let object_id = 1;
1291        let writer = sstable_store
1292            .clone()
1293            .create_sst_writer(object_id, writer_opts);
1294        let mut filter = MultiFilterKeyExtractor::default();
1295        filter.register(
1296            1.into(),
1297            FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
1298        );
1299        filter.register(
1300            2.into(),
1301            FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
1302        );
1303        filter.register(
1304            3.into(),
1305            FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
1306        );
1307
1308        let table_id_to_vnode = HashMap::from_iter(vec![
1309            (1.into(), VirtualNode::COUNT_FOR_TEST),
1310            (2.into(), VirtualNode::COUNT_FOR_TEST),
1311            (3.into(), VirtualNode::COUNT_FOR_TEST),
1312        ]);
1313        let table_id_to_watermark_serde =
1314            HashMap::from_iter(vec![(1.into(), None), (2.into(), None), (3.into(), None)]);
1315
1316        let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1317            FilterKeyExtractorImpl::Multi(filter),
1318            table_id_to_vnode,
1319            table_id_to_watermark_serde,
1320            HashMap::default(),
1321        ));
1322
1323        let mut builder = SstableBuilder::new(
1324            object_id,
1325            writer,
1326            BlockedXor16FilterBuilder::new(1024),
1327            opts,
1328            compaction_catalog_agent_ref,
1329            None,
1330        );
1331
1332        let key_count: usize = 10000;
1333        for table_id in 1..4 {
1334            let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
1335            for idx in 0..key_count {
1336                table_key.resize(VirtualNode::SIZE, 0);
1337                table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
1338                let k = UserKey::for_test(TableId::new(table_id), table_key.as_ref());
1339                let v = test_value_of(idx);
1340                builder
1341                    .add(
1342                        FullKey::from_user_key(k, test_epoch(1)),
1343                        HummockValue::put(v.as_ref()),
1344                    )
1345                    .await
1346                    .unwrap();
1347            }
1348        }
1349        let ret = builder.finish().await.unwrap();
1350        let sst_info = ret.sst_info.sst_info.clone();
1351        ret.writer_output.await.unwrap().unwrap();
1352        let table = sstable_store
1353            .sstable(&sst_info, &mut StoreLocalStatistic::default())
1354            .await
1355            .unwrap();
1356        let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
1357        for idx in 0..key_count {
1358            table_key.resize(VirtualNode::SIZE, 0);
1359            table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
1360            let k = UserKey::for_test(TableId::new(2), table_key.as_slice());
1361            let hash = Sstable::hash_for_bloom_filter(&k.encode(), 2);
1362            let key_ref = k.as_ref();
1363            assert!(
1364                table.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash)
1365            );
1366        }
1367    }
1368}