risingwave_storage/hummock/sstable/
builder.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap};
16use std::mem;
17use std::sync::Arc;
18use std::time::SystemTime;
19
20use bytes::{Bytes, BytesMut};
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::util::row_serde::OrderedRowSerde;
24use risingwave_hummock_sdk::key::{FullKey, MAX_KEY_LEN, TABLE_PREFIX_LEN, UserKey, user_key};
25use risingwave_hummock_sdk::key_range::KeyRange;
26use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner, VnodeStatistics};
27use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap};
28use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, LocalSstableInfo};
29use risingwave_pb::hummock::{BloomFilterType, PbSstableFilterType};
30
31use super::utils::CompressionAlgorithm;
32use super::{
33    BlockBuilder, BlockBuilderOptions, BlockMeta, DEFAULT_BLOCK_SIZE, DEFAULT_ENTRY_SIZE,
34    DEFAULT_RESTART_INTERVAL, SstableMeta, SstableWriter, VERSION,
35};
36use crate::compaction_catalog_manager::{
37    CompactionCatalogAgent, CompactionCatalogAgentRef, FilterKeyExtractorImpl,
38    FullKeyFilterKeyExtractor,
39};
40use crate::hummock::sstable::{
41    DEFAULT_FILTER_HASH_PREALLOC_KEY_COUNT_CAP, FilterBuilder, FilterBuilderOptions, utils,
42};
43use crate::hummock::value::HummockValue;
44use crate::hummock::{
45    Block, BlockHolder, BlockIterator, HummockResult, MemoryLimiter, Xor16FilterBuilder,
46    try_shorten_block_smallest_key,
47};
48use crate::monitor::CompactorMetrics;
49use crate::opts::StorageOpts;
50
51pub const DEFAULT_SSTABLE_SIZE: usize = 4 * 1024 * 1024;
52pub const DEFAULT_BLOOM_FALSE_POSITIVE: f64 = 0.001;
53pub const DEFAULT_MAX_SST_SIZE: u64 = 512 * 1024 * 1024;
54pub const MIN_BLOCK_SIZE: usize = 8 * 1024;
55
56#[derive(Clone, Debug)]
57pub struct SstableBuilderOptions {
58    /// Approximate sstable capacity.
59    pub capacity: usize,
60    /// Approximate block capacity.
61    pub block_capacity: usize,
62    /// Restart point interval.
63    pub restart_interval: usize,
64    /// Deprecated and ignored by SST filter builders; kept for backward compatibility.
65    pub bloom_false_positive: f64,
66    /// Compression algorithm.
67    pub compression_algorithm: CompressionAlgorithm,
68    pub max_sst_size: u64,
69    /// If set, block metadata keys will be shortened when their length exceeds this threshold.
70    pub shorten_block_meta_key_threshold: Option<usize>,
71    /// Max bytes for vnode key-range hints in SST metadata. None disables collection.
72    pub max_vnode_key_range_bytes: Option<usize>,
73    /// Estimated key count for one output SST. Used only as a filter-builder capacity hint.
74    pub estimated_output_key_count: Option<usize>,
75    /// Upper bound for the initial key-hash buffer allocation in plain filter builders.
76    pub filter_hash_prealloc_key_count_cap: usize,
77}
78
79impl From<&StorageOpts> for SstableBuilderOptions {
80    fn from(options: &StorageOpts) -> SstableBuilderOptions {
81        let capacity: usize = (options.sstable_size_mb as usize) * (1 << 20);
82        SstableBuilderOptions {
83            capacity,
84            block_capacity: (options.block_size_kb as usize) * (1 << 10),
85            restart_interval: DEFAULT_RESTART_INTERVAL,
86            bloom_false_positive: options.bloom_false_positive,
87            compression_algorithm: CompressionAlgorithm::None,
88            max_sst_size: options.compactor_max_sst_size,
89            shorten_block_meta_key_threshold: options.shorten_block_meta_key_threshold,
90            max_vnode_key_range_bytes: None,
91            estimated_output_key_count: None,
92            filter_hash_prealloc_key_count_cap: DEFAULT_FILTER_HASH_PREALLOC_KEY_COUNT_CAP,
93        }
94    }
95}
96
97impl Default for SstableBuilderOptions {
98    fn default() -> Self {
99        Self {
100            capacity: DEFAULT_SSTABLE_SIZE,
101            block_capacity: DEFAULT_BLOCK_SIZE,
102            restart_interval: DEFAULT_RESTART_INTERVAL,
103            bloom_false_positive: DEFAULT_BLOOM_FALSE_POSITIVE,
104            compression_algorithm: CompressionAlgorithm::None,
105            max_sst_size: DEFAULT_MAX_SST_SIZE,
106            shorten_block_meta_key_threshold: None,
107            max_vnode_key_range_bytes: None,
108            estimated_output_key_count: None,
109            filter_hash_prealloc_key_count_cap: DEFAULT_FILTER_HASH_PREALLOC_KEY_COUNT_CAP,
110        }
111    }
112}
113
114impl SstableBuilderOptions {
115    pub fn estimated_output_key_count(&self) -> usize {
116        self.estimated_output_key_count
117            .unwrap_or(self.capacity / DEFAULT_ENTRY_SIZE + 1)
118    }
119
120    pub fn filter_builder_options(&self) -> FilterBuilderOptions {
121        FilterBuilderOptions {
122            estimated_key_count: self.estimated_output_key_count(),
123            estimated_block_count: self.capacity / self.block_capacity + 1,
124            hash_prealloc_key_count_cap: self.filter_hash_prealloc_key_count_cap,
125        }
126    }
127}
128
129pub struct SstableBuilderOutput<WO> {
130    pub sst_info: LocalSstableInfo,
131    pub writer_output: WO,
132    pub stats: SstableBuilderOutputStats,
133}
134
135pub struct SstableBuilder<W: SstableWriter, F: FilterBuilder> {
136    /// Options.
137    options: SstableBuilderOptions,
138    /// Data writer.
139    writer: W,
140    /// Current block builder.
141    block_builder: BlockBuilder,
142
143    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
144    /// Block metadata vec.
145    block_metas: Vec<BlockMeta>,
146
147    /// `table_id` of added keys.
148    table_ids: BTreeSet<TableId>,
149    last_full_key: Vec<u8>,
150    /// Buffer for encoded key and value to avoid allocation.
151    raw_key: BytesMut,
152    raw_value: BytesMut,
153    last_table_id: Option<TableId>,
154    sst_object_id: HummockSstableObjectId,
155
156    /// Per table stats.
157    table_stats: TableStatsMap,
158    /// `last_table_stats` accumulates stats for `last_table_id` and finalizes it in `table_stats`
159    /// by `finalize_last_table_stats`
160    last_table_stats: TableStats,
161
162    filter_builder: F,
163
164    epoch_set: BTreeSet<u64>,
165    memory_limiter: Option<Arc<MemoryLimiter>>,
166
167    block_size_vec: Vec<usize>, // for statistics
168    vnode_range_collector: Option<VnodeUserKeyRangeCollector>,
169}
170
171impl<W: SstableWriter> SstableBuilder<W, Xor16FilterBuilder> {
172    pub fn for_test(
173        sstable_id: u64,
174        writer: W,
175        options: SstableBuilderOptions,
176        table_id_to_vnode: HashMap<impl Into<TableId>, usize>,
177        table_id_to_watermark_serde: HashMap<
178            impl Into<TableId>,
179            Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
180        >,
181    ) -> Self {
182        let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
183            FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
184            table_id_to_vnode
185                .into_iter()
186                .map(|(table_id, v)| (table_id.into(), v))
187                .collect(),
188            table_id_to_watermark_serde
189                .into_iter()
190                .map(|(table_id, v)| (table_id.into(), v))
191                .collect(),
192            HashMap::default(),
193        ));
194
195        Self::new(
196            sstable_id,
197            writer,
198            Xor16FilterBuilder::create(options.filter_builder_options()),
199            options,
200            compaction_catalog_agent_ref,
201            None,
202        )
203    }
204}
205
206impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
207    pub fn new(
208        sst_object_id: impl Into<HummockSstableObjectId>,
209        writer: W,
210        filter_builder: F,
211        options: SstableBuilderOptions,
212        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
213        memory_limiter: Option<Arc<MemoryLimiter>>,
214    ) -> Self {
215        let sst_object_id = sst_object_id.into();
216        Self {
217            vnode_range_collector: VnodeUserKeyRangeCollector::with_limit(
218                options.max_vnode_key_range_bytes,
219            ),
220            options: options.clone(),
221            writer,
222            block_builder: BlockBuilder::new(BlockBuilderOptions {
223                capacity: options.block_capacity,
224                restart_interval: options.restart_interval,
225                compression_algorithm: options.compression_algorithm,
226            }),
227            filter_builder,
228            block_metas: Vec::with_capacity(options.capacity / options.block_capacity + 1),
229            table_ids: BTreeSet::new(),
230            last_table_id: None,
231            raw_key: BytesMut::new(),
232            raw_value: BytesMut::new(),
233            last_full_key: vec![],
234            sst_object_id,
235            compaction_catalog_agent_ref,
236            table_stats: Default::default(),
237            last_table_stats: Default::default(),
238            epoch_set: BTreeSet::default(),
239            memory_limiter,
240            block_size_vec: Vec::new(),
241        }
242    }
243
244    /// Add kv pair to sstable.
245    pub async fn add_for_test(
246        &mut self,
247        full_key: FullKey<&[u8]>,
248        value: HummockValue<&[u8]>,
249    ) -> HummockResult<()> {
250        self.add(full_key, value).await
251    }
252
253    pub fn current_block_size(&self) -> usize {
254        self.block_builder.approximate_len()
255    }
256
257    /// Add raw data of block to sstable. return false means fallback
258    pub async fn add_raw_block(
259        &mut self,
260        buf: Bytes,
261        filter_data: Vec<u8>,
262        smallest_key: FullKey<Vec<u8>>,
263        largest_key: Vec<u8>,
264        mut meta: BlockMeta,
265    ) -> HummockResult<bool> {
266        let table_id = smallest_key.user_key.table_id;
267        if self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id {
268            if !self.block_builder.is_empty() {
269                // Try to finish the previous `Block`` when the `table_id` is switched, making sure that the data in the `Block` doesn't span two `table_ids`.
270                self.build_block().await?;
271            }
272
273            self.table_ids.insert(table_id);
274            self.finalize_last_table_stats();
275            self.last_table_id = Some(table_id);
276        }
277
278        if !self.block_builder.is_empty() {
279            let min_block_size = std::cmp::min(MIN_BLOCK_SIZE, self.options.block_capacity / 4);
280
281            // If the previous block is too small, we should merge it into the previous block.
282            if self.block_builder.approximate_len() < min_block_size {
283                let block = Block::decode(buf, meta.uncompressed_size as usize)?;
284                let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
285                iter.seek_to_first();
286                while iter.is_valid() {
287                    let value = HummockValue::from_slice(iter.value()).unwrap_or_else(|_| {
288                        panic!(
289                            "decode failed for fast compact sst_id {} block_idx {} last_table_id {:?}",
290                            self.sst_object_id, self.block_metas.len(), self.last_table_id
291                        )
292                    });
293                    self.add_impl(iter.key(), value, false).await?;
294                    iter.next();
295                }
296                return Ok(false);
297            }
298
299            self.build_block().await?;
300        }
301        self.last_full_key = largest_key;
302        assert_eq!(
303            meta.len as usize,
304            buf.len(),
305            "meta {} buf {} last_table_id {:?}",
306            meta.len,
307            buf.len(),
308            self.last_table_id
309        );
310        meta.offset = self.writer.data_len() as u32;
311        self.block_metas.push(meta);
312        self.filter_builder.add_raw_data(filter_data);
313        let block_meta = self.block_metas.last_mut().unwrap();
314        self.writer.write_block_bytes(buf, block_meta).await?;
315
316        Ok(true)
317    }
318
319    /// Add kv pair to sstable.
320    pub async fn add(
321        &mut self,
322        full_key: FullKey<&[u8]>,
323        value: HummockValue<&[u8]>,
324    ) -> HummockResult<()> {
325        self.add_impl(full_key, value, true).await
326    }
327
328    /// Add kv pair to sstable.
329    async fn add_impl(
330        &mut self,
331        full_key: FullKey<&[u8]>,
332        value: HummockValue<&[u8]>,
333        could_switch_block: bool,
334    ) -> HummockResult<()> {
335        const LARGE_KEY_LEN: usize = MAX_KEY_LEN >> 1;
336
337        let table_key_len = full_key.user_key.table_key.as_ref().len();
338        let table_value_len = match &value {
339            HummockValue::Put(t) => t.len(),
340            HummockValue::Delete => 0,
341        };
342        let large_value_len = self.options.max_sst_size as usize / 10;
343        let large_key_value_len = self.options.max_sst_size as usize / 2;
344        if table_key_len >= LARGE_KEY_LEN
345            || table_value_len > large_value_len
346            || table_key_len + table_value_len > large_key_value_len
347        {
348            let table_id = full_key.user_key.table_id;
349            tracing::warn!(
350                "A large key/value (table_id={}, key len={}, value len={}, epoch={}, spill offset={}) is added to block",
351                table_id,
352                table_key_len,
353                table_value_len,
354                full_key.epoch_with_gap.pure_epoch(),
355                full_key.epoch_with_gap.offset(),
356            );
357        }
358
359        // TODO: refine me
360        full_key.encode_into(&mut self.raw_key);
361        value.encode(&mut self.raw_value);
362        let is_new_user_key = self.last_full_key.is_empty()
363            || !user_key(&self.raw_key).eq(user_key(self.last_full_key.as_slice()));
364        let table_id = full_key.user_key.table_id;
365        let is_new_table = self.last_table_id != Some(table_id);
366        let current_block_size = self.current_block_size();
367        let is_block_full = current_block_size >= self.options.block_capacity
368            || (current_block_size > self.options.block_capacity / 4 * 3
369                && current_block_size + self.raw_value.len() + self.raw_key.len()
370                    > self.options.block_capacity);
371
372        if is_new_table {
373            assert!(
374                could_switch_block,
375                "is_new_user_key {} sst_id {} block_idx {} table_id {} last_table_id {:?} full_key {:?}",
376                is_new_user_key,
377                self.sst_object_id,
378                self.block_metas.len(),
379                table_id,
380                self.last_table_id,
381                full_key
382            );
383            self.table_ids.insert(table_id);
384            self.finalize_last_table_stats();
385            self.last_table_id = Some(table_id);
386            if !self.block_builder.is_empty() {
387                self.build_block().await?;
388            }
389        } else if is_block_full && could_switch_block {
390            self.build_block().await?;
391        }
392        self.last_table_stats.total_key_count += 1;
393        self.epoch_set.insert(full_key.epoch_with_gap.pure_epoch());
394
395        // Rotate block builder if the previous one has been built.
396        if self.block_builder.is_empty() {
397            let smallest_key = if let Some(threshold) =
398                self.options.shorten_block_meta_key_threshold
399                && !self.last_full_key.is_empty()
400                && full_key.encoded_len() >= threshold
401            {
402                let prev = FullKey::decode(&self.last_full_key);
403                if let Some(shortened) = try_shorten_block_smallest_key(&prev, &full_key) {
404                    shortened.encode()
405                } else {
406                    full_key.encode()
407                }
408            } else {
409                full_key.encode()
410            };
411
412            self.block_metas.push(BlockMeta {
413                offset: utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
414                    panic!(
415                        "WARN overflow can't convert writer_data_len {} into u32 sst_id {} block_idx {} tables {:?}",
416                        self.writer.data_len(),
417                        self.sst_object_id,
418                        self.block_metas.len(),
419                        self.table_ids,
420                    )
421                }),
422                len: 0,
423                smallest_key,
424                uncompressed_size: 0,
425                total_key_count: 0,
426                stale_key_count: 0,
427            });
428        }
429
430        let filter_key = self
431            .compaction_catalog_agent_ref
432            .extract(user_key(&self.raw_key));
433        // Add SST filter check.
434        if !filter_key.is_empty() {
435            self.filter_builder
436                .add_key(filter_key, table_id.as_raw_id());
437        }
438        // Use pre-encoded key to avoid redundant encoding
439        self.block_builder.add(
440            table_id,
441            &self.raw_key[TABLE_PREFIX_LEN..],
442            self.raw_value.as_ref(),
443        );
444        self.block_metas.last_mut().unwrap().total_key_count += 1;
445        if !is_new_user_key || value.is_delete() {
446            self.block_metas.last_mut().unwrap().stale_key_count += 1;
447        }
448        self.last_table_stats.total_key_size += full_key.encoded_len() as i64;
449        self.last_table_stats.total_value_size += value.encoded_len() as i64;
450
451        if let Some(collector) = self.vnode_range_collector.as_mut() {
452            collector.observe_key(
453                VirtualNode::from_index(full_key.user_key.get_vnode_id()),
454                &self.raw_key,
455                &self.last_full_key,
456            );
457        }
458
459        self.last_full_key.clear();
460        self.last_full_key.extend_from_slice(&self.raw_key);
461
462        self.raw_key.clear();
463        self.raw_value.clear();
464        Ok(())
465    }
466
467    /// Finish building sst.
468    ///
469    /// # Format
470    ///
471    /// data:
472    ///
473    /// ```plain
474    /// | Block 0 | ... | Block N-1 | N (4B) |
475    /// ```
476    pub async fn finish(mut self) -> HummockResult<SstableBuilderOutput<W::Output>> {
477        let smallest_key = if self.block_metas.is_empty() {
478            vec![]
479        } else {
480            self.block_metas[0].smallest_key.clone()
481        };
482        let largest_key = self.last_full_key.clone();
483        self.finalize_last_table_stats();
484
485        // Vnode key-range hints are only supported for single-table SSTs.
486        // Multi-table SST scenarios should not enable max_vnode_key_range_bytes in config.
487        assert!(
488            self.table_ids.len() <= 1 || self.vnode_range_collector.is_none(),
489            "vnode key-range hints are only supported for single-table SSTs, found {} tables",
490            self.table_ids.len()
491        );
492
493        self.build_block().await?;
494        let right_exclusive = false;
495        let meta_offset = self.writer.data_len() as u64;
496
497        let filter_data = self.filter_builder.finish(self.memory_limiter.clone());
498        let (bloom_filter_kind, filter_type) = if filter_data.is_empty() {
499            (
500                BloomFilterType::BloomFilterUnspecified,
501                PbSstableFilterType::SstableFilterNone,
502            )
503        } else if self.filter_builder.support_blocked_raw_data() {
504            (BloomFilterType::Blocked, self.filter_builder.filter_type())
505        } else {
506            (BloomFilterType::Sstable, self.filter_builder.filter_type())
507        };
508
509        let total_key_count = self
510            .block_metas
511            .iter()
512            .map(|block_meta| block_meta.total_key_count as u64)
513            .sum::<u64>();
514        let stale_key_count = self
515            .block_metas
516            .iter()
517            .map(|block_meta| block_meta.stale_key_count as u64)
518            .sum::<u64>();
519        let uncompressed_file_size = self
520            .block_metas
521            .iter()
522            .map(|block_meta| block_meta.uncompressed_size as u64)
523            .sum::<u64>();
524
525        #[expect(deprecated)]
526        let mut meta = SstableMeta {
527            block_metas: self.block_metas,
528            bloom_filter: filter_data,
529            estimated_size: 0,
530            key_count: utils::checked_into_u32(total_key_count).unwrap_or_else(|_| {
531                panic!(
532                    "WARN overflow can't convert total_key_count {} into u32 tables {:?}",
533                    total_key_count, self.table_ids,
534                )
535            }),
536            smallest_key,
537            largest_key,
538            version: VERSION,
539            meta_offset,
540            monotonic_tombstone_events: vec![],
541        };
542
543        let meta_encode_size = meta.encoded_size();
544        let encoded_size_u32 = utils::checked_into_u32(meta_encode_size).unwrap_or_else(|_| {
545            panic!(
546                "WARN overflow can't convert meta_encoded_size {} into u32 tables {:?}",
547                meta_encode_size, self.table_ids,
548            )
549        });
550        let meta_offset_u32 = utils::checked_into_u32(meta_offset).unwrap_or_else(|_| {
551            panic!(
552                "WARN overflow can't convert meta_offset {} into u32 tables {:?}",
553                meta_offset, self.table_ids,
554            )
555        });
556        meta.estimated_size = encoded_size_u32
557            .checked_add(meta_offset_u32)
558            .unwrap_or_else(|| {
559                panic!(
560                    "WARN overflow encoded_size_u32 {} meta_offset_u32 {} table_id {:?} table_ids {:?}",
561                    encoded_size_u32, meta_offset_u32, self.last_table_id, self.table_ids
562                )
563            });
564
565        let (avg_key_size, avg_value_size) = if self.table_stats.is_empty() {
566            (0, 0)
567        } else {
568            let total_key_count: usize = self
569                .table_stats
570                .values()
571                .map(|s| s.total_key_count as usize)
572                .sum();
573
574            if total_key_count == 0 {
575                (0, 0)
576            } else {
577                let total_key_size: usize = self
578                    .table_stats
579                    .values()
580                    .map(|s| s.total_key_size as usize)
581                    .sum();
582
583                let total_value_size: usize = self
584                    .table_stats
585                    .values()
586                    .map(|s| s.total_value_size as usize)
587                    .sum();
588
589                (
590                    total_key_size / total_key_count,
591                    total_value_size / total_key_count,
592                )
593            }
594        };
595
596        let (min_epoch, max_epoch) = {
597            if self.epoch_set.is_empty() {
598                (HummockEpoch::MAX, u64::MIN)
599            } else {
600                (
601                    *self.epoch_set.first().unwrap(),
602                    *self.epoch_set.last().unwrap(),
603                )
604            }
605        };
606
607        let vnode_user_key_ranges = self
608            .vnode_range_collector
609            .take()
610            .and_then(|collector| collector.finish(&self.last_full_key));
611
612        let sst_info: SstableInfo = SstableInfoInner {
613            object_id: self.sst_object_id,
614            // use the same sst_id as object_id for initial sst
615            sst_id: self.sst_object_id.as_raw_id().into(),
616            bloom_filter_kind,
617            key_range: KeyRange {
618                left: Bytes::from(meta.smallest_key.clone()),
619                right: Bytes::from(meta.largest_key.clone()),
620                right_exclusive,
621            },
622            file_size: meta.estimated_size as u64,
623            table_ids: self.table_ids.into_iter().collect(),
624            meta_offset: meta.meta_offset,
625            stale_key_count,
626            total_key_count,
627            uncompressed_file_size: uncompressed_file_size + meta.encoded_size() as u64,
628            min_epoch,
629            max_epoch,
630            range_tombstone_count: 0,
631            sst_size: meta.estimated_size as u64,
632            filter_type,
633            vnode_statistics: vnode_user_key_ranges,
634        }
635        .into();
636
637        tracing::trace!(
638            "meta_size {} filter_size {} add_key_counts {} stale_key_count {} min_epoch {} max_epoch {} epoch_count {}",
639            meta.encoded_size(),
640            meta.bloom_filter.len(),
641            total_key_count,
642            stale_key_count,
643            min_epoch,
644            max_epoch,
645            self.epoch_set.len()
646        );
647        let filter_size = meta.bloom_filter.len();
648        let sstable_file_size = sst_info.file_size as usize;
649
650        if !meta.block_metas.is_empty() {
651            // fill total_compressed_size
652            let mut last_table_id = meta.block_metas[0].table_id();
653            let mut last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
654            for block_meta in &meta.block_metas {
655                let block_table_id = block_meta.table_id();
656                if last_table_id != block_table_id {
657                    last_table_id = block_table_id;
658                    last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
659                }
660
661                last_table_stats.total_compressed_size += block_meta.len as u64;
662            }
663        }
664
665        let writer_output = self.writer.finish(meta).await?;
666        // The timestamp is only used during full GC.
667        //
668        // Ideally object store object's last_modified should be used.
669        // However, it'll incur additional IO overhead since S3 lacks an interface to retrieve the last_modified timestamp after the PUT operation on an object.
670        //
671        // The local timestamp below is expected to precede the last_modified of object store object, given that the object store object is created afterward.
672        // It should help alleviate the clock drift issue.
673
674        let now = SystemTime::now()
675            .duration_since(SystemTime::UNIX_EPOCH)
676            .expect("Clock may have gone backwards")
677            .as_secs();
678        Ok(SstableBuilderOutput::<W::Output> {
679            sst_info: LocalSstableInfo::new(sst_info, self.table_stats, now),
680            writer_output,
681            stats: SstableBuilderOutputStats {
682                filter_size,
683                avg_key_size,
684                avg_value_size,
685                epoch_count: self.epoch_set.len(),
686                block_size_vec: self.block_size_vec,
687                sstable_file_size,
688            },
689        })
690    }
691
692    pub fn approximate_len(&self) -> usize {
693        self.writer.data_len()
694            + self.block_builder.approximate_len()
695            + self.filter_builder.approximate_len()
696    }
697
698    pub async fn build_block(&mut self) -> HummockResult<()> {
699        // Skip empty block.
700        if self.block_builder.is_empty() {
701            return Ok(());
702        }
703
704        let block_meta = self.block_metas.last_mut().unwrap();
705        let uncompressed_block_size = self.block_builder.uncompressed_block_size();
706        block_meta.uncompressed_size = utils::checked_into_u32(uncompressed_block_size)
707            .unwrap_or_else(|_| {
708                panic!(
709                    "WARN overflow can't convert uncompressed_block_size {} into u32 table {:?}",
710                    uncompressed_block_size,
711                    self.block_builder.table_id(),
712                )
713            });
714        let block = self.block_builder.build();
715        self.writer.write_block(block, block_meta).await?;
716        self.block_size_vec.push(block.len());
717        let data_len = utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
718            panic!(
719                "WARN overflow can't convert writer_data_len {} into u32 table {:?}",
720                self.writer.data_len(),
721                self.block_builder.table_id(),
722            )
723        });
724        block_meta.len = data_len.checked_sub(block_meta.offset).unwrap_or_else(|| {
725            panic!(
726                "data_len should >= meta_offset, found data_len={}, meta_offset={}",
727                data_len, block_meta.offset
728            )
729        });
730
731        self.filter_builder
732            .switch_block(self.memory_limiter.clone());
733
734        if data_len as usize > self.options.capacity * 2 {
735            tracing::warn!(
736                "WARN unexpected block size {} table {:?}",
737                data_len,
738                self.block_builder.table_id()
739            );
740        }
741
742        self.block_builder.clear();
743        Ok(())
744    }
745
746    pub fn is_empty(&self) -> bool {
747        self.writer.data_len() > 0
748    }
749
750    /// Returns true if we roughly reached capacity
751    pub fn reach_capacity(&self) -> bool {
752        self.approximate_len() >= self.options.capacity
753    }
754
755    fn finalize_last_table_stats(&mut self) {
756        if self.table_ids.is_empty() || self.last_table_id.is_none() {
757            return;
758        }
759        self.table_stats.insert(
760            self.last_table_id.unwrap(),
761            std::mem::take(&mut self.last_table_stats),
762        );
763    }
764}
765
766/// Collects vnode key-range hints during SST building.
767struct VnodeUserKeyRangeCollector {
768    max_bytes: usize,
769    current_size: usize,
770    ranges: BTreeMap<VirtualNode, (UserKey<Bytes>, UserKey<Bytes>)>,
771    current_vnode: VirtualNode,
772    range_start_key: Vec<u8>,
773}
774
775impl VnodeUserKeyRangeCollector {
776    fn new(max_bytes: usize) -> Self {
777        Self {
778            max_bytes,
779            current_size: 0,
780            ranges: BTreeMap::new(),
781            current_vnode: VirtualNode::ZERO,
782            range_start_key: Vec::new(),
783        }
784    }
785
786    fn with_limit(max_bytes: Option<usize>) -> Option<Self> {
787        max_bytes.filter(|&n| n > 0).map(Self::new)
788    }
789
790    /// Track vnode boundaries. On vnode switch, seals previous range with `prev_key` as right bound.
791    /// Range: `[first_key_of_vnode, last_key_of_vnode]` (inclusive).
792    fn observe_key(&mut self, vnode: VirtualNode, key: &[u8], prev_key: &[u8]) {
793        if self.current_size >= self.max_bytes {
794            return;
795        }
796
797        // First key
798        if self.range_start_key.is_empty() {
799            self.current_vnode = vnode;
800            self.range_start_key = key.to_vec();
801            return;
802        }
803
804        // Same vnode, nothing to do
805        if vnode == self.current_vnode {
806            return;
807        }
808
809        // Vnode changed: seal previous range
810        self.seal_range(prev_key);
811
812        // Check if budget exhausted after sealing
813        if self.current_size >= self.max_bytes {
814            return;
815        }
816
817        // Start new vnode
818        self.current_vnode = vnode;
819        self.range_start_key = key.to_vec();
820    }
821
822    /// Seal current range. Asserts `vnode/table_id` consistency between left and right keys.
823    fn seal_range(&mut self, right_key: &[u8]) {
824        let left_key = mem::take(&mut self.range_start_key);
825        self.current_size += mem::size_of::<VirtualNode>() + left_key.len() + right_key.len();
826
827        let left_full_key = FullKey::decode(&left_key);
828        let right_full_key = FullKey::decode(right_key);
829        let left_user_key = left_full_key.user_key.copy_into();
830        let right_user_key = right_full_key.user_key.copy_into();
831
832        // Sanity checks for data correctness:
833        // 1. left and right keys have same `vnode`
834        // 2. vnode matches `current_vnode` being tracked
835        // 3. left and right keys have same `table_id`
836        assert_eq!(
837            left_user_key.get_vnode_id(),
838            right_user_key.get_vnode_id(),
839            "vnode changed within range: left_user {:?}, right_user {:?}",
840            left_user_key,
841            right_user_key
842        );
843        assert_eq!(
844            left_user_key.get_vnode_id(),
845            self.current_vnode.to_index(),
846            "vnode mismatch: left {:?}, right {:?}, expected vnode {}",
847            left_user_key,
848            right_user_key,
849            self.current_vnode.to_index()
850        );
851        assert_eq!(
852            left_user_key.table_id, right_user_key.table_id,
853            "table_id changed within range: left {:?}, right {:?}",
854            left_user_key, right_user_key
855        );
856
857        self.ranges
858            .insert(self.current_vnode, (left_user_key, right_user_key));
859    }
860
861    /// Returns `Some` if >1 vnodes collected, `None` otherwise (single-vnode needs no hints).
862    fn finish(mut self, last_key: &[u8]) -> Option<VnodeStatistics> {
863        if !self.range_start_key.is_empty() {
864            self.seal_range(last_key);
865        }
866
867        if self.ranges.len() > 1 {
868            // Validate all ranges belong to the same table_id before building VnodeStatistics
869            let mut table_ids = self
870                .ranges
871                .values()
872                .flat_map(|(left, right)| [left.table_id, right.table_id]);
873            if let Some(first_table_id) = table_ids.next() {
874                for table_id in table_ids {
875                    assert_eq!(
876                        table_id, first_table_id,
877                        "all vnode ranges must belong to the same table_id, found {:?} and {:?}",
878                        table_id, first_table_id
879                    );
880                }
881            }
882
883            Some(VnodeStatistics::from_map(self.ranges))
884        } else {
885            None
886        }
887    }
888}
889
890pub struct SstableBuilderOutputStats {
891    filter_size: usize,
892    avg_key_size: usize,
893    avg_value_size: usize,
894    epoch_count: usize,
895    block_size_vec: Vec<usize>, // for statistics
896    sstable_file_size: usize,
897}
898
899impl SstableBuilderOutputStats {
900    pub fn report_stats(&self, metrics: &Arc<CompactorMetrics>) {
901        if self.filter_size != 0 {
902            metrics
903                .sstable_bloom_filter_size
904                .observe(self.filter_size as _);
905        }
906
907        if self.sstable_file_size != 0 {
908            metrics
909                .sstable_file_size
910                .observe(self.sstable_file_size as _);
911        }
912
913        if self.avg_key_size != 0 {
914            metrics.sstable_avg_key_size.observe(self.avg_key_size as _);
915        }
916
917        if self.avg_value_size != 0 {
918            metrics
919                .sstable_avg_value_size
920                .observe(self.avg_value_size as _);
921        }
922
923        if self.epoch_count != 0 {
924            metrics
925                .sstable_distinct_epoch_count
926                .observe(self.epoch_count as _);
927        }
928
929        if !self.block_size_vec.is_empty() {
930            for block_size in &self.block_size_vec {
931                metrics.sstable_block_size.observe(*block_size as _);
932            }
933        }
934    }
935}
936
937#[cfg(test)]
938pub(super) mod tests {
939    use std::collections::{Bound, HashMap};
940
941    use risingwave_common::catalog::TableId;
942    use risingwave_common::hash::VirtualNode;
943    use risingwave_common::util::epoch::test_epoch;
944    use risingwave_hummock_sdk::key::UserKey;
945
946    use super::*;
947    use crate::assert_bytes_eq;
948    use crate::compaction_catalog_manager::{
949        CompactionCatalogAgent, DummyFilterKeyExtractor, MultiFilterKeyExtractor,
950    };
951    use crate::hummock::iterator::test_utils::mock_sstable_store;
952    use crate::hummock::sstable::xor_filter::BlockedXor16FilterBuilder;
953    use crate::hummock::test_utils::{
954        TEST_KEYS_COUNT, default_builder_opt_for_test, gen_test_sstable_impl, mock_sst_writer,
955        test_key_of, test_value_of,
956    };
957    use crate::hummock::{CachePolicy, Sstable, SstableWriterOptions, Xor8FilterBuilder};
958    use crate::monitor::StoreLocalStatistic;
959
960    #[tokio::test]
961    async fn test_empty() {
962        let opt = SstableBuilderOptions {
963            capacity: 0,
964            block_capacity: 4096,
965            restart_interval: 16,
966            bloom_false_positive: 0.001,
967            ..Default::default()
968        };
969
970        let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
971        let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
972        let b = SstableBuilder::for_test(
973            0,
974            mock_sst_writer(&opt),
975            opt,
976            table_id_to_vnode,
977            table_id_to_watermark_serde,
978        );
979
980        b.finish().await.unwrap();
981    }
982
983    fn encode_full_key(vnode: VirtualNode, table_key_suffix: &[u8]) -> Vec<u8> {
984        let mut table_key = vnode.to_be_bytes().to_vec();
985        table_key.extend_from_slice(table_key_suffix);
986        FullKey::for_test(TableId::default(), table_key, 0).encode()
987    }
988
989    fn table_key_of(vnode: VirtualNode, suffix: &[u8]) -> Vec<u8> {
990        let mut key = vnode.to_be_bytes().to_vec();
991        key.extend_from_slice(suffix);
992        key
993    }
994
995    #[test]
996    fn test_vnode_user_key_range_basic_collection() {
997        // Test basic multi-vnode collection with boundary semantics verification.
998        // Validates: vnode switching triggers range sealing, boundaries are inclusive (right_exclusive=false).
999        let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(1024)).unwrap();
1000        let vnode_1 = VirtualNode::from_index(1);
1001        let vnode_2 = VirtualNode::from_index(2);
1002
1003        let k1 = encode_full_key(vnode_1, b"k1");
1004        let k2 = encode_full_key(vnode_1, b"k2");
1005        let k3 = encode_full_key(vnode_2, b"k3");
1006        let k4 = encode_full_key(vnode_2, b"k4");
1007
1008        collector.observe_key(vnode_1, &k1, &[]);
1009        collector.observe_key(vnode_1, &k2, &k1);
1010        collector.observe_key(vnode_2, &k3, &k2);
1011        collector.observe_key(vnode_2, &k4, &k3);
1012
1013        let info = collector.finish(&k4).unwrap();
1014        assert_eq!(info.vnode_user_key_ranges().len(), 2);
1015
1016        // Verify vnode_1: left = first key, right = last key before switch
1017        let (range1_left, range1_right) = info.get_vnode_user_key_range(vnode_1).unwrap();
1018        assert_eq!(range1_left.table_key.as_ref(), table_key_of(vnode_1, b"k1"));
1019        assert_eq!(
1020            range1_right.table_key.as_ref(),
1021            table_key_of(vnode_1, b"k2")
1022        );
1023
1024        // Verify vnode_2: left = first key, right = SST's last key
1025        let (range2_left, range2_right) = info.get_vnode_user_key_range(vnode_2).unwrap();
1026        assert_eq!(range2_left.table_key.as_ref(), table_key_of(vnode_2, b"k3"));
1027        assert_eq!(
1028            range2_right.table_key.as_ref(),
1029            table_key_of(vnode_2, b"k4")
1030        );
1031    }
1032
1033    #[test]
1034    fn test_vnode_user_key_range_capacity_limit() {
1035        // Test "allow over-limit write" semantics: inserting a range may exceed capacity,
1036        // but stops before starting the next range if it would exceed the limit.
1037        //
1038        // Calculation based on encoded key sizes:
1039        // Each range = sizeof(VirtualNode) + left.len() + right.len().
1040        // Use a limit that:
1041        //   - allows writing vnode_1 (range_size),
1042        //   - allows over-limit write of vnode_2 (range_size + estimated_next),
1043        //   - stops before vnode_3 (2 * range_size + estimated_next > limit).
1044        let vnode_1 = VirtualNode::from_index(1);
1045        let vnode_2 = VirtualNode::from_index(2);
1046        let vnode_3 = VirtualNode::from_index(3);
1047        let vnode_4 = VirtualNode::from_index(4);
1048
1049        let k1 = encode_full_key(vnode_1, b"k1");
1050        let k2 = encode_full_key(vnode_1, b"k2");
1051        let k3 = encode_full_key(vnode_2, b"k3");
1052        let k4 = encode_full_key(vnode_2, b"k4");
1053        let k5 = encode_full_key(vnode_3, b"k5");
1054        let k6 = encode_full_key(vnode_3, b"k6");
1055        let k7 = encode_full_key(vnode_4, b"k7");
1056
1057        let range_size = mem::size_of::<VirtualNode>() + k1.len() + k2.len();
1058        let estimated_next_size = mem::size_of::<VirtualNode>() + k3.len();
1059        let limit = range_size + estimated_next_size; // allow second range, stop before third
1060        let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(limit)).unwrap();
1061
1062        collector.observe_key(vnode_1, &k1, &[]);
1063        collector.observe_key(vnode_1, &k2, &k1);
1064        collector.observe_key(vnode_2, &k3, &k2); // Seals vnode_1 (6 bytes)
1065        collector.observe_key(vnode_2, &k4, &k3);
1066        collector.observe_key(vnode_3, &k5, &k4); // Seals vnode_2 (12 bytes total), stops before vnode_3
1067        collector.observe_key(vnode_3, &k6, &k5); // Ignored
1068        collector.observe_key(vnode_4, &k7, &k6); // Ignored
1069
1070        let info = collector.finish(&k7).unwrap();
1071        assert_eq!(
1072            info.vnode_user_key_ranges().len(),
1073            2,
1074            "Should collect exactly 2 vnodes"
1075        );
1076
1077        // Verify collected vnodes have correct boundaries
1078        let (range1_left, range1_right) = info.get_vnode_user_key_range(vnode_1).unwrap();
1079        assert_eq!(range1_left.table_key.as_ref(), table_key_of(vnode_1, b"k1"));
1080        assert_eq!(
1081            range1_right.table_key.as_ref(),
1082            table_key_of(vnode_1, b"k2")
1083        );
1084
1085        let (range2_left, range2_right) = info.get_vnode_user_key_range(vnode_2).unwrap();
1086        assert_eq!(range2_left.table_key.as_ref(), table_key_of(vnode_2, b"k3"));
1087        assert_eq!(
1088            range2_right.table_key.as_ref(),
1089            table_key_of(vnode_2, b"k4")
1090        );
1091
1092        // Verify stopped vnodes are not collected
1093        assert!(info.get_vnode_user_key_range(vnode_3).is_none());
1094        assert!(info.get_vnode_user_key_range(vnode_4).is_none());
1095    }
1096
1097    #[test]
1098    fn test_vnode_user_key_range_sparse_distribution() {
1099        // Test non-consecutive vnodes (production scenario: hash-based data distribution).
1100        // Validates: collector handles sparse vnode indices correctly, gaps are not filled.
1101        let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(1024)).unwrap();
1102        let vnode_5 = VirtualNode::from_index(5);
1103        let vnode_10 = VirtualNode::from_index(10);
1104        let vnode_100 = VirtualNode::from_index(100);
1105
1106        let keys = vec![
1107            (vnode_5, encode_full_key(vnode_5, b"key_005_001")),
1108            (vnode_5, encode_full_key(vnode_5, b"key_005_002")),
1109            (vnode_5, encode_full_key(vnode_5, b"key_005_999")),
1110            (vnode_10, encode_full_key(vnode_10, b"key_010_001")),
1111            (vnode_10, encode_full_key(vnode_10, b"key_010_100")),
1112            (vnode_100, encode_full_key(vnode_100, b"key_100_001")),
1113            (vnode_100, encode_full_key(vnode_100, b"key_100_999")),
1114        ];
1115
1116        let mut prev_key = Vec::new();
1117        for (vnode, key) in &keys {
1118            collector.observe_key(*vnode, key, &prev_key);
1119            prev_key = key.clone();
1120        }
1121
1122        let info = collector.finish(&prev_key).unwrap();
1123        assert_eq!(info.vnode_user_key_ranges().len(), 3);
1124
1125        // Verify each collected vnode has correct boundaries
1126        let (range5_left, range5_right) = info.get_vnode_user_key_range(vnode_5).unwrap();
1127        assert_eq!(
1128            range5_left.table_key.as_ref(),
1129            table_key_of(vnode_5, b"key_005_001")
1130        );
1131        assert_eq!(
1132            range5_right.table_key.as_ref(),
1133            table_key_of(vnode_5, b"key_005_999")
1134        );
1135
1136        let (range10_left, range10_right) = info.get_vnode_user_key_range(vnode_10).unwrap();
1137        assert_eq!(
1138            range10_left.table_key.as_ref(),
1139            table_key_of(vnode_10, b"key_010_001")
1140        );
1141        assert_eq!(
1142            range10_right.table_key.as_ref(),
1143            table_key_of(vnode_10, b"key_010_100")
1144        );
1145
1146        let (range100_left, range100_right) = info.get_vnode_user_key_range(vnode_100).unwrap();
1147        assert_eq!(
1148            range100_left.table_key.as_ref(),
1149            table_key_of(vnode_100, b"key_100_001")
1150        );
1151        assert_eq!(
1152            range100_right.table_key.as_ref(),
1153            table_key_of(vnode_100, b"key_100_999")
1154        );
1155
1156        // Verify gaps are not filled (no data for vnodes 0, 7, 50)
1157        assert!(
1158            info.get_vnode_user_key_range(VirtualNode::from_index(0))
1159                .is_none()
1160        );
1161        assert!(
1162            info.get_vnode_user_key_range(VirtualNode::from_index(7))
1163                .is_none()
1164        );
1165        assert!(
1166            info.get_vnode_user_key_range(VirtualNode::from_index(50))
1167                .is_none()
1168        );
1169    }
1170
1171    #[test]
1172    fn test_vnode_user_key_range_edge_cases() {
1173        // Test 1: Configuration disabled (None or 0) should return None
1174        assert!(VnodeUserKeyRangeCollector::with_limit(None).is_none());
1175        assert!(VnodeUserKeyRangeCollector::with_limit(Some(0)).is_none());
1176
1177        // Test 2: Empty collector (no keys) should return None
1178        let collector = VnodeUserKeyRangeCollector::with_limit(Some(1024)).unwrap();
1179        assert!(
1180            collector
1181                .finish(&encode_full_key(VirtualNode::ZERO, b"any"))
1182                .is_none()
1183        );
1184
1185        // Test 3: Single vnode (optimization: don't emit hints for single-vnode SSTs)
1186        let vnode = VirtualNode::from_index(5);
1187        let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(1024)).unwrap();
1188        let a = encode_full_key(vnode, b"a");
1189        let b = encode_full_key(vnode, b"b");
1190        let c = encode_full_key(vnode, b"c");
1191        collector.observe_key(vnode, &a, &[]);
1192        collector.observe_key(vnode, &b, &a);
1193        collector.observe_key(vnode, &c, &b);
1194        assert!(
1195            collector.finish(&c).is_none(),
1196            "Single-vnode SST should not emit hints"
1197        );
1198
1199        // Test 4: Extreme limit (1 byte) - first vnode exceeds, becomes single-vnode
1200        // Range size = sizeof(VirtualNode=2) + encoded_key_len * 2
1201        // With 1-byte limit: vnode_1 range exceeds but allowed (over-limit write)
1202        // vnode_2 check: current_size >= max_bytes, stop. Result: only vnode_1, returns None (single-vnode)
1203        let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(1)).unwrap();
1204        let vnode_1 = VirtualNode::from_index(1);
1205        let vnode_2 = VirtualNode::from_index(2);
1206        let key1 = encode_full_key(vnode_1, b"key1");
1207        let key2 = encode_full_key(vnode_1, b"key2");
1208        let key3 = encode_full_key(vnode_2, b"key3");
1209        collector.observe_key(vnode_1, &key1, &[]);
1210        collector.observe_key(vnode_1, &key2, &key1);
1211        collector.observe_key(vnode_2, &key3, &key2); // Seals vnode_1, stops before vnode_2
1212        assert!(
1213            collector.finish(&key3).is_none(),
1214            "Only 1 vnode collected, should return None"
1215        );
1216    }
1217
1218    #[tokio::test]
1219    async fn test_basic() {
1220        let opt = default_builder_opt_for_test();
1221
1222        let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
1223        let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
1224        let mut b = SstableBuilder::for_test(
1225            0,
1226            mock_sst_writer(&opt),
1227            opt,
1228            table_id_to_vnode,
1229            table_id_to_watermark_serde,
1230        );
1231
1232        for i in 0..TEST_KEYS_COUNT {
1233            b.add_for_test(
1234                test_key_of(i).to_ref(),
1235                HummockValue::put(&test_value_of(i)),
1236            )
1237            .await
1238            .unwrap();
1239        }
1240
1241        let output = b.finish().await.unwrap();
1242        let info = output.sst_info.sst_info;
1243
1244        assert_bytes_eq!(test_key_of(0).encode(), info.key_range.left);
1245        assert_bytes_eq!(
1246            test_key_of(TEST_KEYS_COUNT - 1).encode(),
1247            info.key_range.right
1248        );
1249        let (data, meta) = output.writer_output;
1250        assert_eq!(info.file_size, meta.estimated_size as u64);
1251        let offset = info.meta_offset as usize;
1252        let meta2 = SstableMeta::decode(&data[offset..]).unwrap();
1253        assert_eq!(meta2, meta);
1254    }
1255
1256    async fn test_with_xor_filter_builder<F: FilterBuilder>(
1257        bloom_false_positive: f64,
1258        expected_filter_type: PbSstableFilterType,
1259    ) {
1260        let key_count = 1000;
1261
1262        let opts = SstableBuilderOptions {
1263            capacity: 0,
1264            block_capacity: 4096,
1265            restart_interval: 16,
1266            bloom_false_positive,
1267            ..Default::default()
1268        };
1269
1270        // build remote table
1271        let sstable_store = mock_sstable_store().await;
1272        let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
1273        let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
1274        let sst_info = gen_test_sstable_impl::<Vec<u8>, F>(
1275            opts,
1276            0,
1277            (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
1278            sstable_store.clone(),
1279            CachePolicy::NotFill,
1280            table_id_to_vnode,
1281            table_id_to_watermark_serde,
1282        )
1283        .await;
1284        assert_eq!(sst_info.filter_type, expected_filter_type);
1285        let table = sstable_store
1286            .sstable(&sst_info, &mut StoreLocalStatistic::default())
1287            .await
1288            .unwrap();
1289
1290        assert!(table.has_filter());
1291        for i in 0..key_count {
1292            let full_key = test_key_of(i);
1293            let hash = Sstable::hash_for_filter(full_key.user_key.encode().as_slice(), 0);
1294            let key_ref = full_key.user_key.as_ref();
1295            assert!(
1296                table.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash),
1297                "failed at {}",
1298                i
1299            );
1300        }
1301    }
1302
1303    #[tokio::test]
1304    async fn test_xor_filter_builder_output() {
1305        test_with_xor_filter_builder::<Xor16FilterBuilder>(
1306            0.0,
1307            PbSstableFilterType::SstableFilterXor16,
1308        )
1309        .await;
1310        test_with_xor_filter_builder::<Xor16FilterBuilder>(
1311            0.01,
1312            PbSstableFilterType::SstableFilterXor16,
1313        )
1314        .await;
1315        test_with_xor_filter_builder::<Xor8FilterBuilder>(
1316            0.01,
1317            PbSstableFilterType::SstableFilterXor8,
1318        )
1319        .await;
1320        test_with_xor_filter_builder::<BlockedXor16FilterBuilder>(
1321            0.01,
1322            PbSstableFilterType::SstableFilterXor16,
1323        )
1324        .await;
1325    }
1326
1327    #[tokio::test]
1328    async fn test_no_xor_filter_block() {
1329        let opts = SstableBuilderOptions::default();
1330        // build remote table
1331        let sstable_store = mock_sstable_store().await;
1332        let writer_opts = SstableWriterOptions::default();
1333        let object_id = 1;
1334        let writer = sstable_store
1335            .clone()
1336            .create_sst_writer(object_id, writer_opts);
1337        let mut filter = MultiFilterKeyExtractor::default();
1338        filter.register(
1339            1.into(),
1340            FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
1341        );
1342        filter.register(
1343            2.into(),
1344            FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
1345        );
1346        filter.register(
1347            3.into(),
1348            FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
1349        );
1350
1351        let table_id_to_vnode = HashMap::from_iter(vec![
1352            (1.into(), VirtualNode::COUNT_FOR_TEST),
1353            (2.into(), VirtualNode::COUNT_FOR_TEST),
1354            (3.into(), VirtualNode::COUNT_FOR_TEST),
1355        ]);
1356        let table_id_to_watermark_serde =
1357            HashMap::from_iter(vec![(1.into(), None), (2.into(), None), (3.into(), None)]);
1358
1359        let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1360            FilterKeyExtractorImpl::Multi(filter),
1361            table_id_to_vnode,
1362            table_id_to_watermark_serde,
1363            HashMap::default(),
1364        ));
1365
1366        let mut builder = SstableBuilder::new(
1367            object_id,
1368            writer,
1369            BlockedXor16FilterBuilder::create(opts.filter_builder_options()),
1370            opts,
1371            compaction_catalog_agent_ref,
1372            None,
1373        );
1374
1375        let key_count: usize = 10000;
1376        for table_id in 1..4 {
1377            let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
1378            for idx in 0..key_count {
1379                table_key.resize(VirtualNode::SIZE, 0);
1380                table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
1381                let k = UserKey::for_test(TableId::new(table_id), table_key.as_ref());
1382                let v = test_value_of(idx);
1383                builder
1384                    .add(
1385                        FullKey::from_user_key(k, test_epoch(1)),
1386                        HummockValue::put(v.as_ref()),
1387                    )
1388                    .await
1389                    .unwrap();
1390            }
1391        }
1392        let ret = builder.finish().await.unwrap();
1393        let sst_info = ret.sst_info.sst_info.clone();
1394        ret.writer_output.await.unwrap().unwrap();
1395        let table = sstable_store
1396            .sstable(&sst_info, &mut StoreLocalStatistic::default())
1397            .await
1398            .unwrap();
1399        let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
1400        for idx in 0..key_count {
1401            table_key.resize(VirtualNode::SIZE, 0);
1402            table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
1403            let k = UserKey::for_test(TableId::new(2), table_key.as_slice());
1404            let hash = Sstable::hash_for_filter(&k.encode(), 2);
1405            let key_ref = k.as_ref();
1406            assert!(
1407                table.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash)
1408            );
1409        }
1410    }
1411}