Skip to main content

risingwave_storage/hummock/sstable/
builder.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap};
16use std::mem;
17use std::sync::Arc;
18use std::time::SystemTime;
19
20use bytes::{Bytes, BytesMut};
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::util::row_serde::OrderedRowSerde;
24use risingwave_hummock_sdk::key::{FullKey, MAX_KEY_LEN, TABLE_PREFIX_LEN, UserKey, user_key};
25use risingwave_hummock_sdk::key_range::KeyRange;
26use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner, VnodeStatistics};
27use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap};
28use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, LocalSstableInfo};
29use risingwave_pb::hummock::{PbSstableFilterLayout, PbSstableFilterType};
30
31use super::utils::CompressionAlgorithm;
32use super::{
33    BlockBuilder, BlockBuilderOptions, BlockMeta, DEFAULT_BLOCK_SIZE, DEFAULT_ENTRY_SIZE,
34    DEFAULT_RESTART_INTERVAL, SstableMeta, SstableWriter, VERSION,
35};
36use crate::compaction_catalog_manager::{
37    CompactionCatalogAgent, CompactionCatalogAgentRef, FilterKeyExtractorImpl,
38    FullKeyFilterKeyExtractor,
39};
40use crate::hummock::sstable::{
41    DEFAULT_FILTER_HASH_PREALLOC_KEY_COUNT_CAP, FilterBuilder, FilterBuilderOptions, utils,
42};
43use crate::hummock::value::HummockValue;
44use crate::hummock::{
45    Block, BlockHolder, BlockIterator, HummockResult, MemoryLimiter, Xor16FilterBuilder,
46    try_shorten_block_smallest_key,
47};
48use crate::monitor::CompactorMetrics;
49use crate::opts::StorageOpts;
50
51pub const DEFAULT_SSTABLE_SIZE: usize = 4 * 1024 * 1024;
52pub const DEFAULT_BLOOM_FALSE_POSITIVE: f64 = 0.001;
53pub const DEFAULT_MAX_SST_SIZE: u64 = 512 * 1024 * 1024;
54pub const MIN_BLOCK_SIZE: usize = 8 * 1024;
55
56#[derive(Clone, Debug)]
57pub struct SstableBuilderOptions {
58    /// Approximate sstable capacity.
59    pub capacity: usize,
60    /// Approximate block capacity.
61    pub block_capacity: usize,
62    /// Restart point interval.
63    pub restart_interval: usize,
64    /// Deprecated and ignored by SST filter builders; kept for backward compatibility.
65    pub bloom_false_positive: f64,
66    /// Compression algorithm.
67    pub compression_algorithm: CompressionAlgorithm,
68    pub max_sst_size: u64,
69    /// If set, block metadata keys will be shortened when their length exceeds this threshold.
70    pub shorten_block_meta_key_threshold: Option<usize>,
71    /// Max bytes for vnode key-range hints in SST metadata. None disables collection.
72    pub max_vnode_key_range_bytes: Option<usize>,
73    /// Estimated key count for one output SST. Used only as a filter-builder capacity hint.
74    pub estimated_output_key_count: Option<usize>,
75    /// Upper bound for the initial key-hash buffer allocation in plain filter builders.
76    pub filter_hash_prealloc_key_count_cap: usize,
77}
78
79impl From<&StorageOpts> for SstableBuilderOptions {
80    fn from(options: &StorageOpts) -> SstableBuilderOptions {
81        let capacity: usize = (options.sstable_size_mb as usize) * (1 << 20);
82        SstableBuilderOptions {
83            capacity,
84            block_capacity: (options.block_size_kb as usize) * (1 << 10),
85            restart_interval: DEFAULT_RESTART_INTERVAL,
86            bloom_false_positive: options.bloom_false_positive,
87            compression_algorithm: CompressionAlgorithm::None,
88            max_sst_size: options.compactor_max_sst_size,
89            shorten_block_meta_key_threshold: options.shorten_block_meta_key_threshold,
90            max_vnode_key_range_bytes: None,
91            estimated_output_key_count: None,
92            filter_hash_prealloc_key_count_cap: DEFAULT_FILTER_HASH_PREALLOC_KEY_COUNT_CAP,
93        }
94    }
95}
96
97impl Default for SstableBuilderOptions {
98    fn default() -> Self {
99        Self {
100            capacity: DEFAULT_SSTABLE_SIZE,
101            block_capacity: DEFAULT_BLOCK_SIZE,
102            restart_interval: DEFAULT_RESTART_INTERVAL,
103            bloom_false_positive: DEFAULT_BLOOM_FALSE_POSITIVE,
104            compression_algorithm: CompressionAlgorithm::None,
105            max_sst_size: DEFAULT_MAX_SST_SIZE,
106            shorten_block_meta_key_threshold: None,
107            max_vnode_key_range_bytes: None,
108            estimated_output_key_count: None,
109            filter_hash_prealloc_key_count_cap: DEFAULT_FILTER_HASH_PREALLOC_KEY_COUNT_CAP,
110        }
111    }
112}
113
114impl SstableBuilderOptions {
115    pub fn estimated_output_key_count(&self) -> usize {
116        self.estimated_output_key_count
117            .unwrap_or(self.capacity / DEFAULT_ENTRY_SIZE + 1)
118    }
119
120    pub fn filter_builder_options(&self) -> FilterBuilderOptions {
121        FilterBuilderOptions {
122            estimated_key_count: self.estimated_output_key_count(),
123            estimated_block_count: self.capacity / self.block_capacity + 1,
124            hash_prealloc_key_count_cap: self.filter_hash_prealloc_key_count_cap,
125        }
126    }
127}
128
129pub struct SstableBuilderOutput<WO> {
130    pub sst_info: LocalSstableInfo,
131    pub writer_output: WO,
132    pub stats: SstableBuilderOutputStats,
133}
134
135pub struct SstableBuilder<W: SstableWriter, F: FilterBuilder> {
136    /// Options.
137    options: SstableBuilderOptions,
138    /// Data writer.
139    writer: W,
140    /// Current block builder.
141    block_builder: BlockBuilder,
142
143    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
144    /// Block metadata vec.
145    block_metas: Vec<BlockMeta>,
146
147    /// `table_id` of added keys.
148    table_ids: BTreeSet<TableId>,
149    last_full_key: Vec<u8>,
150    /// Buffer for encoded key and value to avoid allocation.
151    raw_key: BytesMut,
152    raw_value: BytesMut,
153    last_table_id: Option<TableId>,
154    sst_object_id: HummockSstableObjectId,
155
156    /// Per table stats.
157    table_stats: TableStatsMap,
158    /// `last_table_stats` accumulates stats for `last_table_id` and finalizes it in `table_stats`
159    /// by `finalize_last_table_stats`
160    last_table_stats: TableStats,
161
162    filter_builder: F,
163
164    epoch_set: BTreeSet<u64>,
165    memory_limiter: Option<Arc<MemoryLimiter>>,
166
167    block_size_vec: Vec<usize>, // for statistics
168    vnode_range_collector: Option<VnodeUserKeyRangeCollector>,
169}
170
171impl<W: SstableWriter> SstableBuilder<W, Xor16FilterBuilder> {
172    pub fn for_test(
173        sstable_id: u64,
174        writer: W,
175        options: SstableBuilderOptions,
176        table_id_to_vnode: HashMap<impl Into<TableId>, usize>,
177        table_id_to_watermark_serde: HashMap<
178            impl Into<TableId>,
179            Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
180        >,
181    ) -> Self {
182        let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
183            FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
184            table_id_to_vnode
185                .into_iter()
186                .map(|(table_id, v)| (table_id.into(), v))
187                .collect(),
188            table_id_to_watermark_serde
189                .into_iter()
190                .map(|(table_id, v)| (table_id.into(), v))
191                .collect(),
192            HashMap::default(),
193        ));
194
195        Self::new(
196            sstable_id,
197            writer,
198            Xor16FilterBuilder::create(options.filter_builder_options()),
199            options,
200            compaction_catalog_agent_ref,
201            None,
202        )
203    }
204}
205
206impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
207    pub fn new(
208        sst_object_id: impl Into<HummockSstableObjectId>,
209        writer: W,
210        filter_builder: F,
211        options: SstableBuilderOptions,
212        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
213        memory_limiter: Option<Arc<MemoryLimiter>>,
214    ) -> Self {
215        let sst_object_id = sst_object_id.into();
216        Self {
217            vnode_range_collector: VnodeUserKeyRangeCollector::with_limit(
218                options.max_vnode_key_range_bytes,
219            ),
220            options: options.clone(),
221            writer,
222            block_builder: BlockBuilder::new(BlockBuilderOptions {
223                capacity: options.block_capacity,
224                restart_interval: options.restart_interval,
225                compression_algorithm: options.compression_algorithm,
226            }),
227            filter_builder,
228            block_metas: Vec::with_capacity(options.capacity / options.block_capacity + 1),
229            table_ids: BTreeSet::new(),
230            last_table_id: None,
231            raw_key: BytesMut::new(),
232            raw_value: BytesMut::new(),
233            last_full_key: vec![],
234            sst_object_id,
235            compaction_catalog_agent_ref,
236            table_stats: Default::default(),
237            last_table_stats: Default::default(),
238            epoch_set: BTreeSet::default(),
239            memory_limiter,
240            block_size_vec: Vec::new(),
241        }
242    }
243
244    /// Add kv pair to sstable.
245    pub async fn add_for_test(
246        &mut self,
247        full_key: FullKey<&[u8]>,
248        value: HummockValue<&[u8]>,
249    ) -> HummockResult<()> {
250        self.add(full_key, value).await
251    }
252
253    pub fn current_block_size(&self) -> usize {
254        self.block_builder.approximate_len()
255    }
256
257    /// Add raw data of block to sstable. return false means fallback
258    pub async fn add_raw_block(
259        &mut self,
260        buf: Bytes,
261        filter_data: Vec<u8>,
262        smallest_key: FullKey<Vec<u8>>,
263        largest_key: Vec<u8>,
264        mut meta: BlockMeta,
265    ) -> HummockResult<bool> {
266        let table_id = smallest_key.user_key.table_id;
267        if self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id {
268            if !self.block_builder.is_empty() {
269                // Try to finish the previous `Block`` when the `table_id` is switched, making sure that the data in the `Block` doesn't span two `table_ids`.
270                self.build_block().await?;
271            }
272
273            self.table_ids.insert(table_id);
274            self.finalize_last_table_stats();
275            self.last_table_id = Some(table_id);
276        }
277
278        if !self.block_builder.is_empty() {
279            let min_block_size = std::cmp::min(MIN_BLOCK_SIZE, self.options.block_capacity / 4);
280
281            // If the previous block is too small, we should merge it into the previous block.
282            if self.block_builder.approximate_len() < min_block_size {
283                let block = Block::decode(buf, meta.uncompressed_size as usize)?;
284                let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
285                iter.seek_to_first();
286                while iter.is_valid() {
287                    let value = HummockValue::from_slice(iter.value()).unwrap_or_else(|_| {
288                        panic!(
289                            "decode failed for fast compact sst_id {} block_idx {} last_table_id {:?}",
290                            self.sst_object_id, self.block_metas.len(), self.last_table_id
291                        )
292                    });
293                    self.add_impl(iter.key(), value, false).await?;
294                    iter.next();
295                }
296                return Ok(false);
297            }
298
299            self.build_block().await?;
300        }
301        self.last_full_key = largest_key;
302        assert_eq!(
303            meta.len as usize,
304            buf.len(),
305            "meta {} buf {} last_table_id {:?}",
306            meta.len,
307            buf.len(),
308            self.last_table_id
309        );
310        meta.offset = self.writer.data_len() as u32;
311        self.block_metas.push(meta);
312        self.filter_builder.add_raw_data(filter_data);
313        let block_meta = self.block_metas.last_mut().unwrap();
314        self.writer.write_block_bytes(buf, block_meta).await?;
315
316        Ok(true)
317    }
318
319    /// Add kv pair to sstable.
320    pub async fn add(
321        &mut self,
322        full_key: FullKey<&[u8]>,
323        value: HummockValue<&[u8]>,
324    ) -> HummockResult<()> {
325        self.add_impl(full_key, value, true).await
326    }
327
328    /// Add kv pair to sstable.
329    async fn add_impl(
330        &mut self,
331        full_key: FullKey<&[u8]>,
332        value: HummockValue<&[u8]>,
333        could_switch_block: bool,
334    ) -> HummockResult<()> {
335        const LARGE_KEY_LEN: usize = MAX_KEY_LEN >> 1;
336
337        let table_key_len = full_key.user_key.table_key.as_ref().len();
338        let table_value_len = match &value {
339            HummockValue::Put(t) => t.len(),
340            HummockValue::Delete => 0,
341        };
342        let large_value_len = self.options.max_sst_size as usize / 10;
343        let large_key_value_len = self.options.max_sst_size as usize / 2;
344        if table_key_len >= LARGE_KEY_LEN
345            || table_value_len > large_value_len
346            || table_key_len + table_value_len > large_key_value_len
347        {
348            let table_id = full_key.user_key.table_id;
349            tracing::warn!(
350                "A large key/value (table_id={}, key len={}, value len={}, epoch={}, spill offset={}) is added to block",
351                table_id,
352                table_key_len,
353                table_value_len,
354                full_key.epoch_with_gap.pure_epoch(),
355                full_key.epoch_with_gap.offset(),
356            );
357        }
358
359        // TODO: refine me
360        full_key.encode_into(&mut self.raw_key);
361        value.encode(&mut self.raw_value);
362        let is_new_user_key = self.last_full_key.is_empty()
363            || !user_key(&self.raw_key).eq(user_key(self.last_full_key.as_slice()));
364        let table_id = full_key.user_key.table_id;
365        let is_new_table = self.last_table_id != Some(table_id);
366        let current_block_size = self.current_block_size();
367        let is_block_full = current_block_size >= self.options.block_capacity
368            || (current_block_size > self.options.block_capacity / 4 * 3
369                && current_block_size + self.raw_value.len() + self.raw_key.len()
370                    > self.options.block_capacity);
371
372        if is_new_table {
373            assert!(
374                could_switch_block,
375                "is_new_user_key {} sst_id {} block_idx {} table_id {} last_table_id {:?} full_key {:?}",
376                is_new_user_key,
377                self.sst_object_id,
378                self.block_metas.len(),
379                table_id,
380                self.last_table_id,
381                full_key
382            );
383            self.table_ids.insert(table_id);
384            self.finalize_last_table_stats();
385            self.last_table_id = Some(table_id);
386            if !self.block_builder.is_empty() {
387                self.build_block().await?;
388            }
389        } else if is_block_full && could_switch_block {
390            self.build_block().await?;
391        }
392        self.last_table_stats.total_key_count += 1;
393        self.epoch_set.insert(full_key.epoch_with_gap.pure_epoch());
394
395        // Rotate block builder if the previous one has been built.
396        if self.block_builder.is_empty() {
397            let smallest_key = if let Some(threshold) =
398                self.options.shorten_block_meta_key_threshold
399                && !self.last_full_key.is_empty()
400                && full_key.encoded_len() >= threshold
401            {
402                let prev = FullKey::decode(&self.last_full_key);
403                if let Some(shortened) = try_shorten_block_smallest_key(&prev, &full_key) {
404                    shortened.encode()
405                } else {
406                    full_key.encode()
407                }
408            } else {
409                full_key.encode()
410            };
411
412            self.block_metas.push(BlockMeta {
413                offset: utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
414                    panic!(
415                        "WARN overflow can't convert writer_data_len {} into u32 sst_id {} block_idx {} tables {:?}",
416                        self.writer.data_len(),
417                        self.sst_object_id,
418                        self.block_metas.len(),
419                        self.table_ids,
420                    )
421                }),
422                len: 0,
423                smallest_key,
424                uncompressed_size: 0,
425                total_key_count: 0,
426                stale_key_count: 0,
427            });
428        }
429
430        let filter_key = self
431            .compaction_catalog_agent_ref
432            .extract(user_key(&self.raw_key));
433        // Add SST filter check.
434        if !filter_key.is_empty() {
435            self.filter_builder
436                .add_key(filter_key, table_id.as_raw_id());
437        }
438        // Use pre-encoded key to avoid redundant encoding
439        self.block_builder.add(
440            table_id,
441            &self.raw_key[TABLE_PREFIX_LEN..],
442            self.raw_value.as_ref(),
443        );
444        self.block_metas.last_mut().unwrap().total_key_count += 1;
445        if !is_new_user_key || value.is_delete() {
446            self.block_metas.last_mut().unwrap().stale_key_count += 1;
447        }
448        self.last_table_stats.total_key_size += full_key.encoded_len() as i64;
449        self.last_table_stats.total_value_size += value.encoded_len() as i64;
450
451        if let Some(collector) = self.vnode_range_collector.as_mut() {
452            collector.observe_key(
453                VirtualNode::from_index(full_key.user_key.get_vnode_id()),
454                &self.raw_key,
455                &self.last_full_key,
456            );
457        }
458
459        self.last_full_key.clear();
460        self.last_full_key.extend_from_slice(&self.raw_key);
461
462        self.raw_key.clear();
463        self.raw_value.clear();
464        Ok(())
465    }
466
467    /// Finish building sst.
468    ///
469    /// # Format
470    ///
471    /// data:
472    ///
473    /// ```plain
474    /// | Block 0 | ... | Block N-1 | N (4B) |
475    /// ```
476    pub async fn finish(mut self) -> HummockResult<SstableBuilderOutput<W::Output>> {
477        let smallest_key = if self.block_metas.is_empty() {
478            vec![]
479        } else {
480            self.block_metas[0].smallest_key.clone()
481        };
482        let largest_key = self.last_full_key.clone();
483        self.finalize_last_table_stats();
484
485        // Vnode key-range hints are only supported for single-table SSTs.
486        // Multi-table SST scenarios should not enable max_vnode_key_range_bytes in config.
487        assert!(
488            self.table_ids.len() <= 1 || self.vnode_range_collector.is_none(),
489            "vnode key-range hints are only supported for single-table SSTs, found {} tables",
490            self.table_ids.len()
491        );
492
493        self.build_block().await?;
494        let right_exclusive = false;
495        let meta_offset = self.writer.data_len() as u64;
496
497        let filter_data = self.filter_builder.finish(self.memory_limiter.clone());
498        let (filter_type, filter_layout) = if filter_data.is_none() {
499            (
500                PbSstableFilterType::SstableFilterNone,
501                PbSstableFilterLayout::Unspecified,
502            )
503        } else if self.filter_builder.support_blocked_raw_data() {
504            (
505                self.filter_builder.filter_type(),
506                PbSstableFilterLayout::Blocked,
507            )
508        } else {
509            (
510                self.filter_builder.filter_type(),
511                PbSstableFilterLayout::Plain,
512            )
513        };
514
515        let total_key_count = self
516            .block_metas
517            .iter()
518            .map(|block_meta| block_meta.total_key_count as u64)
519            .sum::<u64>();
520        let stale_key_count = self
521            .block_metas
522            .iter()
523            .map(|block_meta| block_meta.stale_key_count as u64)
524            .sum::<u64>();
525        let uncompressed_file_size = self
526            .block_metas
527            .iter()
528            .map(|block_meta| block_meta.uncompressed_size as u64)
529            .sum::<u64>();
530
531        #[expect(deprecated)]
532        let mut meta = SstableMeta {
533            block_metas: self.block_metas,
534            bloom_filter: filter_data.unwrap_or_default(),
535            estimated_size: 0,
536            key_count: utils::checked_into_u32(total_key_count).unwrap_or_else(|_| {
537                panic!(
538                    "WARN overflow can't convert total_key_count {} into u32 tables {:?}",
539                    total_key_count, self.table_ids,
540                )
541            }),
542            smallest_key,
543            largest_key,
544            version: VERSION,
545            meta_offset,
546            monotonic_tombstone_events: vec![],
547        };
548
549        let meta_encode_size = meta.encoded_size();
550        let encoded_size_u32 = utils::checked_into_u32(meta_encode_size).unwrap_or_else(|_| {
551            panic!(
552                "WARN overflow can't convert meta_encoded_size {} into u32 tables {:?}",
553                meta_encode_size, self.table_ids,
554            )
555        });
556        let meta_offset_u32 = utils::checked_into_u32(meta_offset).unwrap_or_else(|_| {
557            panic!(
558                "WARN overflow can't convert meta_offset {} into u32 tables {:?}",
559                meta_offset, self.table_ids,
560            )
561        });
562        meta.estimated_size = encoded_size_u32
563            .checked_add(meta_offset_u32)
564            .unwrap_or_else(|| {
565                panic!(
566                    "WARN overflow encoded_size_u32 {} meta_offset_u32 {} table_id {:?} table_ids {:?}",
567                    encoded_size_u32, meta_offset_u32, self.last_table_id, self.table_ids
568                )
569            });
570
571        let (avg_key_size, avg_value_size) = if self.table_stats.is_empty() {
572            (0, 0)
573        } else {
574            let total_key_count: usize = self
575                .table_stats
576                .values()
577                .map(|s| s.total_key_count as usize)
578                .sum();
579
580            let total_key_size: usize = self
581                .table_stats
582                .values()
583                .map(|s| s.total_key_size as usize)
584                .sum();
585
586            let total_value_size: usize = self
587                .table_stats
588                .values()
589                .map(|s| s.total_value_size as usize)
590                .sum();
591
592            (
593                total_key_size.checked_div(total_key_count).unwrap_or(0),
594                total_value_size.checked_div(total_key_count).unwrap_or(0),
595            )
596        };
597
598        let (min_epoch, max_epoch) = {
599            if self.epoch_set.is_empty() {
600                (HummockEpoch::MAX, u64::MIN)
601            } else {
602                (
603                    *self.epoch_set.first().unwrap(),
604                    *self.epoch_set.last().unwrap(),
605                )
606            }
607        };
608
609        let vnode_user_key_ranges = self
610            .vnode_range_collector
611            .take()
612            .and_then(|collector| collector.finish(&self.last_full_key));
613
614        let sst_info: SstableInfo = SstableInfoInner {
615            object_id: self.sst_object_id,
616            // use the same sst_id as object_id for initial sst
617            sst_id: self.sst_object_id.as_raw_id().into(),
618            key_range: KeyRange {
619                left: Bytes::from(meta.smallest_key.clone()),
620                right: Bytes::from(meta.largest_key.clone()),
621                right_exclusive,
622            },
623            file_size: meta.estimated_size as u64,
624            table_ids: self.table_ids.into_iter().collect(),
625            meta_offset: meta.meta_offset,
626            stale_key_count,
627            total_key_count,
628            uncompressed_file_size: uncompressed_file_size + meta.encoded_size() as u64,
629            min_epoch,
630            max_epoch,
631            range_tombstone_count: 0,
632            sst_size: meta.estimated_size as u64,
633            filter_type,
634            filter_layout,
635            vnode_statistics: vnode_user_key_ranges,
636        }
637        .into();
638
639        tracing::trace!(
640            "meta_size {} filter_size {} add_key_counts {} stale_key_count {} min_epoch {} max_epoch {} epoch_count {}",
641            meta.encoded_size(),
642            meta.bloom_filter.len(),
643            total_key_count,
644            stale_key_count,
645            min_epoch,
646            max_epoch,
647            self.epoch_set.len()
648        );
649        let filter_size = meta.bloom_filter.len();
650        let sstable_file_size = sst_info.file_size as usize;
651
652        if !meta.block_metas.is_empty() {
653            // fill total_compressed_size
654            let mut last_table_id = meta.block_metas[0].table_id();
655            let mut last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
656            for block_meta in &meta.block_metas {
657                let block_table_id = block_meta.table_id();
658                if last_table_id != block_table_id {
659                    last_table_id = block_table_id;
660                    last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
661                }
662
663                last_table_stats.total_compressed_size += block_meta.len as u64;
664            }
665        }
666
667        let writer_output = self.writer.finish(meta).await?;
668        // The timestamp is only used during full GC.
669        //
670        // Ideally object store object's last_modified should be used.
671        // However, it'll incur additional IO overhead since S3 lacks an interface to retrieve the last_modified timestamp after the PUT operation on an object.
672        //
673        // The local timestamp below is expected to precede the last_modified of object store object, given that the object store object is created afterward.
674        // It should help alleviate the clock drift issue.
675
676        let now = SystemTime::now()
677            .duration_since(SystemTime::UNIX_EPOCH)
678            .expect("Clock may have gone backwards")
679            .as_secs();
680        Ok(SstableBuilderOutput::<W::Output> {
681            sst_info: LocalSstableInfo::new(sst_info, self.table_stats, now),
682            writer_output,
683            stats: SstableBuilderOutputStats {
684                filter_size,
685                avg_key_size,
686                avg_value_size,
687                epoch_count: self.epoch_set.len(),
688                block_size_vec: self.block_size_vec,
689                sstable_file_size,
690            },
691        })
692    }
693
694    pub fn approximate_len(&self) -> usize {
695        self.writer.data_len()
696            + self.block_builder.approximate_len()
697            + self.filter_builder.approximate_len()
698    }
699
700    pub async fn build_block(&mut self) -> HummockResult<()> {
701        // Skip empty block.
702        if self.block_builder.is_empty() {
703            return Ok(());
704        }
705
706        let block_meta = self.block_metas.last_mut().unwrap();
707        let uncompressed_block_size = self.block_builder.uncompressed_block_size();
708        block_meta.uncompressed_size = utils::checked_into_u32(uncompressed_block_size)
709            .unwrap_or_else(|_| {
710                panic!(
711                    "WARN overflow can't convert uncompressed_block_size {} into u32 table {:?}",
712                    uncompressed_block_size,
713                    self.block_builder.table_id(),
714                )
715            });
716        let block = self.block_builder.build();
717        self.writer.write_block(block, block_meta).await?;
718        self.block_size_vec.push(block.len());
719        let data_len = utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
720            panic!(
721                "WARN overflow can't convert writer_data_len {} into u32 table {:?}",
722                self.writer.data_len(),
723                self.block_builder.table_id(),
724            )
725        });
726        block_meta.len = data_len.checked_sub(block_meta.offset).unwrap_or_else(|| {
727            panic!(
728                "data_len should >= meta_offset, found data_len={}, meta_offset={}",
729                data_len, block_meta.offset
730            )
731        });
732
733        self.filter_builder
734            .switch_block(self.memory_limiter.clone());
735
736        if data_len as usize > self.options.capacity * 2 {
737            tracing::warn!(
738                "WARN unexpected block size {} table {:?}",
739                data_len,
740                self.block_builder.table_id()
741            );
742        }
743
744        self.block_builder.clear();
745        Ok(())
746    }
747
748    pub fn is_empty(&self) -> bool {
749        self.writer.data_len() > 0
750    }
751
752    /// Returns true if we roughly reached capacity
753    pub fn reach_capacity(&self) -> bool {
754        self.approximate_len() >= self.options.capacity
755    }
756
757    fn finalize_last_table_stats(&mut self) {
758        if self.table_ids.is_empty() || self.last_table_id.is_none() {
759            return;
760        }
761        self.table_stats.insert(
762            self.last_table_id.unwrap(),
763            std::mem::take(&mut self.last_table_stats),
764        );
765    }
766}
767
768/// Collects vnode key-range hints during SST building.
769struct VnodeUserKeyRangeCollector {
770    max_bytes: usize,
771    current_size: usize,
772    ranges: BTreeMap<VirtualNode, (UserKey<Bytes>, UserKey<Bytes>)>,
773    current_vnode: VirtualNode,
774    range_start_key: Vec<u8>,
775}
776
777impl VnodeUserKeyRangeCollector {
778    fn new(max_bytes: usize) -> Self {
779        Self {
780            max_bytes,
781            current_size: 0,
782            ranges: BTreeMap::new(),
783            current_vnode: VirtualNode::ZERO,
784            range_start_key: Vec::new(),
785        }
786    }
787
788    fn with_limit(max_bytes: Option<usize>) -> Option<Self> {
789        max_bytes.filter(|&n| n > 0).map(Self::new)
790    }
791
792    /// Track vnode boundaries. On vnode switch, seals previous range with `prev_key` as right bound.
793    /// Range: `[first_key_of_vnode, last_key_of_vnode]` (inclusive).
794    fn observe_key(&mut self, vnode: VirtualNode, key: &[u8], prev_key: &[u8]) {
795        if self.current_size >= self.max_bytes {
796            return;
797        }
798
799        // First key
800        if self.range_start_key.is_empty() {
801            self.current_vnode = vnode;
802            self.range_start_key = key.to_vec();
803            return;
804        }
805
806        // Same vnode, nothing to do
807        if vnode == self.current_vnode {
808            return;
809        }
810
811        // Vnode changed: seal previous range
812        self.seal_range(prev_key);
813
814        // Check if budget exhausted after sealing
815        if self.current_size >= self.max_bytes {
816            return;
817        }
818
819        // Start new vnode
820        self.current_vnode = vnode;
821        self.range_start_key = key.to_vec();
822    }
823
824    /// Seal current range. Asserts `vnode/table_id` consistency between left and right keys.
825    fn seal_range(&mut self, right_key: &[u8]) {
826        let left_key = mem::take(&mut self.range_start_key);
827        self.current_size += mem::size_of::<VirtualNode>() + left_key.len() + right_key.len();
828
829        let left_full_key = FullKey::decode(&left_key);
830        let right_full_key = FullKey::decode(right_key);
831        let left_user_key = left_full_key.user_key.copy_into();
832        let right_user_key = right_full_key.user_key.copy_into();
833
834        // Sanity checks for data correctness:
835        // 1. left and right keys have same `vnode`
836        // 2. vnode matches `current_vnode` being tracked
837        // 3. left and right keys have same `table_id`
838        assert_eq!(
839            left_user_key.get_vnode_id(),
840            right_user_key.get_vnode_id(),
841            "vnode changed within range: left_user {:?}, right_user {:?}",
842            left_user_key,
843            right_user_key
844        );
845        assert_eq!(
846            left_user_key.get_vnode_id(),
847            self.current_vnode.to_index(),
848            "vnode mismatch: left {:?}, right {:?}, expected vnode {}",
849            left_user_key,
850            right_user_key,
851            self.current_vnode.to_index()
852        );
853        assert_eq!(
854            left_user_key.table_id, right_user_key.table_id,
855            "table_id changed within range: left {:?}, right {:?}",
856            left_user_key, right_user_key
857        );
858
859        self.ranges
860            .insert(self.current_vnode, (left_user_key, right_user_key));
861    }
862
863    /// Returns `Some` if >1 vnodes collected, `None` otherwise (single-vnode needs no hints).
864    fn finish(mut self, last_key: &[u8]) -> Option<VnodeStatistics> {
865        if !self.range_start_key.is_empty() {
866            self.seal_range(last_key);
867        }
868
869        if self.ranges.len() > 1 {
870            // Validate all ranges belong to the same table_id before building VnodeStatistics
871            let mut table_ids = self
872                .ranges
873                .values()
874                .flat_map(|(left, right)| [left.table_id, right.table_id]);
875            if let Some(first_table_id) = table_ids.next() {
876                for table_id in table_ids {
877                    assert_eq!(
878                        table_id, first_table_id,
879                        "all vnode ranges must belong to the same table_id, found {:?} and {:?}",
880                        table_id, first_table_id
881                    );
882                }
883            }
884
885            Some(VnodeStatistics::from_map(self.ranges))
886        } else {
887            None
888        }
889    }
890}
891
892pub struct SstableBuilderOutputStats {
893    filter_size: usize,
894    avg_key_size: usize,
895    avg_value_size: usize,
896    epoch_count: usize,
897    block_size_vec: Vec<usize>, // for statistics
898    sstable_file_size: usize,
899}
900
901impl SstableBuilderOutputStats {
902    pub fn report_stats(&self, metrics: &Arc<CompactorMetrics>) {
903        if self.filter_size != 0 {
904            metrics
905                .sstable_bloom_filter_size
906                .observe(self.filter_size as _);
907        }
908
909        if self.sstable_file_size != 0 {
910            metrics
911                .sstable_file_size
912                .observe(self.sstable_file_size as _);
913        }
914
915        if self.avg_key_size != 0 {
916            metrics.sstable_avg_key_size.observe(self.avg_key_size as _);
917        }
918
919        if self.avg_value_size != 0 {
920            metrics
921                .sstable_avg_value_size
922                .observe(self.avg_value_size as _);
923        }
924
925        if self.epoch_count != 0 {
926            metrics
927                .sstable_distinct_epoch_count
928                .observe(self.epoch_count as _);
929        }
930
931        if !self.block_size_vec.is_empty() {
932            for block_size in &self.block_size_vec {
933                metrics.sstable_block_size.observe(*block_size as _);
934            }
935        }
936    }
937}
938
939#[cfg(test)]
940pub(super) mod tests {
941    use std::collections::{Bound, HashMap};
942
943    use risingwave_common::catalog::TableId;
944    use risingwave_common::hash::VirtualNode;
945    use risingwave_common::util::epoch::test_epoch;
946    use risingwave_hummock_sdk::key::UserKey;
947
948    use super::*;
949    use crate::assert_bytes_eq;
950    use crate::compaction_catalog_manager::{
951        CompactionCatalogAgent, DummyFilterKeyExtractor, MultiFilterKeyExtractor,
952    };
953    use crate::hummock::iterator::test_utils::mock_sstable_store;
954    use crate::hummock::sstable::xor_filter::BlockedXor16FilterBuilder;
955    use crate::hummock::test_utils::{
956        TEST_KEYS_COUNT, default_builder_opt_for_test, gen_test_sstable_impl, mock_sst_writer,
957        test_key_of, test_value_of,
958    };
959    use crate::hummock::{CachePolicy, Sstable, SstableWriterOptions, Xor8FilterBuilder};
960    use crate::monitor::StoreLocalStatistic;
961
962    #[tokio::test]
963    async fn test_empty() {
964        let opt = SstableBuilderOptions {
965            capacity: 0,
966            block_capacity: 4096,
967            restart_interval: 16,
968            bloom_false_positive: 0.001,
969            ..Default::default()
970        };
971
972        let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
973        let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
974        let b = SstableBuilder::for_test(
975            0,
976            mock_sst_writer(&opt),
977            opt,
978            table_id_to_vnode,
979            table_id_to_watermark_serde,
980        );
981
982        b.finish().await.unwrap();
983    }
984
985    fn encode_full_key(vnode: VirtualNode, table_key_suffix: &[u8]) -> Vec<u8> {
986        let mut table_key = vnode.to_be_bytes().to_vec();
987        table_key.extend_from_slice(table_key_suffix);
988        FullKey::for_test(TableId::default(), table_key, 0).encode()
989    }
990
991    fn table_key_of(vnode: VirtualNode, suffix: &[u8]) -> Vec<u8> {
992        let mut key = vnode.to_be_bytes().to_vec();
993        key.extend_from_slice(suffix);
994        key
995    }
996
997    #[test]
998    fn test_vnode_user_key_range_basic_collection() {
999        // Test basic multi-vnode collection with boundary semantics verification.
1000        // Validates: vnode switching triggers range sealing, boundaries are inclusive (right_exclusive=false).
1001        let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(1024)).unwrap();
1002        let vnode_1 = VirtualNode::from_index(1);
1003        let vnode_2 = VirtualNode::from_index(2);
1004
1005        let k1 = encode_full_key(vnode_1, b"k1");
1006        let k2 = encode_full_key(vnode_1, b"k2");
1007        let k3 = encode_full_key(vnode_2, b"k3");
1008        let k4 = encode_full_key(vnode_2, b"k4");
1009
1010        collector.observe_key(vnode_1, &k1, &[]);
1011        collector.observe_key(vnode_1, &k2, &k1);
1012        collector.observe_key(vnode_2, &k3, &k2);
1013        collector.observe_key(vnode_2, &k4, &k3);
1014
1015        let info = collector.finish(&k4).unwrap();
1016        assert_eq!(info.vnode_user_key_ranges().len(), 2);
1017
1018        // Verify vnode_1: left = first key, right = last key before switch
1019        let (range1_left, range1_right) = info.get_vnode_user_key_range(vnode_1).unwrap();
1020        assert_eq!(range1_left.table_key.as_ref(), table_key_of(vnode_1, b"k1"));
1021        assert_eq!(
1022            range1_right.table_key.as_ref(),
1023            table_key_of(vnode_1, b"k2")
1024        );
1025
1026        // Verify vnode_2: left = first key, right = SST's last key
1027        let (range2_left, range2_right) = info.get_vnode_user_key_range(vnode_2).unwrap();
1028        assert_eq!(range2_left.table_key.as_ref(), table_key_of(vnode_2, b"k3"));
1029        assert_eq!(
1030            range2_right.table_key.as_ref(),
1031            table_key_of(vnode_2, b"k4")
1032        );
1033    }
1034
1035    #[test]
1036    fn test_vnode_user_key_range_capacity_limit() {
1037        // Test "allow over-limit write" semantics: inserting a range may exceed capacity,
1038        // but stops before starting the next range if it would exceed the limit.
1039        //
1040        // Calculation based on encoded key sizes:
1041        // Each range = sizeof(VirtualNode) + left.len() + right.len().
1042        // Use a limit that:
1043        //   - allows writing vnode_1 (range_size),
1044        //   - allows over-limit write of vnode_2 (range_size + estimated_next),
1045        //   - stops before vnode_3 (2 * range_size + estimated_next > limit).
1046        let vnode_1 = VirtualNode::from_index(1);
1047        let vnode_2 = VirtualNode::from_index(2);
1048        let vnode_3 = VirtualNode::from_index(3);
1049        let vnode_4 = VirtualNode::from_index(4);
1050
1051        let k1 = encode_full_key(vnode_1, b"k1");
1052        let k2 = encode_full_key(vnode_1, b"k2");
1053        let k3 = encode_full_key(vnode_2, b"k3");
1054        let k4 = encode_full_key(vnode_2, b"k4");
1055        let k5 = encode_full_key(vnode_3, b"k5");
1056        let k6 = encode_full_key(vnode_3, b"k6");
1057        let k7 = encode_full_key(vnode_4, b"k7");
1058
1059        let range_size = mem::size_of::<VirtualNode>() + k1.len() + k2.len();
1060        let estimated_next_size = mem::size_of::<VirtualNode>() + k3.len();
1061        let limit = range_size + estimated_next_size; // allow second range, stop before third
1062        let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(limit)).unwrap();
1063
1064        collector.observe_key(vnode_1, &k1, &[]);
1065        collector.observe_key(vnode_1, &k2, &k1);
1066        collector.observe_key(vnode_2, &k3, &k2); // Seals vnode_1 (6 bytes)
1067        collector.observe_key(vnode_2, &k4, &k3);
1068        collector.observe_key(vnode_3, &k5, &k4); // Seals vnode_2 (12 bytes total), stops before vnode_3
1069        collector.observe_key(vnode_3, &k6, &k5); // Ignored
1070        collector.observe_key(vnode_4, &k7, &k6); // Ignored
1071
1072        let info = collector.finish(&k7).unwrap();
1073        assert_eq!(
1074            info.vnode_user_key_ranges().len(),
1075            2,
1076            "Should collect exactly 2 vnodes"
1077        );
1078
1079        // Verify collected vnodes have correct boundaries
1080        let (range1_left, range1_right) = info.get_vnode_user_key_range(vnode_1).unwrap();
1081        assert_eq!(range1_left.table_key.as_ref(), table_key_of(vnode_1, b"k1"));
1082        assert_eq!(
1083            range1_right.table_key.as_ref(),
1084            table_key_of(vnode_1, b"k2")
1085        );
1086
1087        let (range2_left, range2_right) = info.get_vnode_user_key_range(vnode_2).unwrap();
1088        assert_eq!(range2_left.table_key.as_ref(), table_key_of(vnode_2, b"k3"));
1089        assert_eq!(
1090            range2_right.table_key.as_ref(),
1091            table_key_of(vnode_2, b"k4")
1092        );
1093
1094        // Verify stopped vnodes are not collected
1095        assert!(info.get_vnode_user_key_range(vnode_3).is_none());
1096        assert!(info.get_vnode_user_key_range(vnode_4).is_none());
1097    }
1098
1099    #[test]
1100    fn test_vnode_user_key_range_sparse_distribution() {
1101        // Test non-consecutive vnodes (production scenario: hash-based data distribution).
1102        // Validates: collector handles sparse vnode indices correctly, gaps are not filled.
1103        let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(1024)).unwrap();
1104        let vnode_5 = VirtualNode::from_index(5);
1105        let vnode_10 = VirtualNode::from_index(10);
1106        let vnode_100 = VirtualNode::from_index(100);
1107
1108        let keys = vec![
1109            (vnode_5, encode_full_key(vnode_5, b"key_005_001")),
1110            (vnode_5, encode_full_key(vnode_5, b"key_005_002")),
1111            (vnode_5, encode_full_key(vnode_5, b"key_005_999")),
1112            (vnode_10, encode_full_key(vnode_10, b"key_010_001")),
1113            (vnode_10, encode_full_key(vnode_10, b"key_010_100")),
1114            (vnode_100, encode_full_key(vnode_100, b"key_100_001")),
1115            (vnode_100, encode_full_key(vnode_100, b"key_100_999")),
1116        ];
1117
1118        let mut prev_key = Vec::new();
1119        for (vnode, key) in &keys {
1120            collector.observe_key(*vnode, key, &prev_key);
1121            prev_key = key.clone();
1122        }
1123
1124        let info = collector.finish(&prev_key).unwrap();
1125        assert_eq!(info.vnode_user_key_ranges().len(), 3);
1126
1127        // Verify each collected vnode has correct boundaries
1128        let (range5_left, range5_right) = info.get_vnode_user_key_range(vnode_5).unwrap();
1129        assert_eq!(
1130            range5_left.table_key.as_ref(),
1131            table_key_of(vnode_5, b"key_005_001")
1132        );
1133        assert_eq!(
1134            range5_right.table_key.as_ref(),
1135            table_key_of(vnode_5, b"key_005_999")
1136        );
1137
1138        let (range10_left, range10_right) = info.get_vnode_user_key_range(vnode_10).unwrap();
1139        assert_eq!(
1140            range10_left.table_key.as_ref(),
1141            table_key_of(vnode_10, b"key_010_001")
1142        );
1143        assert_eq!(
1144            range10_right.table_key.as_ref(),
1145            table_key_of(vnode_10, b"key_010_100")
1146        );
1147
1148        let (range100_left, range100_right) = info.get_vnode_user_key_range(vnode_100).unwrap();
1149        assert_eq!(
1150            range100_left.table_key.as_ref(),
1151            table_key_of(vnode_100, b"key_100_001")
1152        );
1153        assert_eq!(
1154            range100_right.table_key.as_ref(),
1155            table_key_of(vnode_100, b"key_100_999")
1156        );
1157
1158        // Verify gaps are not filled (no data for vnodes 0, 7, 50)
1159        assert!(
1160            info.get_vnode_user_key_range(VirtualNode::from_index(0))
1161                .is_none()
1162        );
1163        assert!(
1164            info.get_vnode_user_key_range(VirtualNode::from_index(7))
1165                .is_none()
1166        );
1167        assert!(
1168            info.get_vnode_user_key_range(VirtualNode::from_index(50))
1169                .is_none()
1170        );
1171    }
1172
1173    #[test]
1174    fn test_vnode_user_key_range_edge_cases() {
1175        // Test 1: Configuration disabled (None or 0) should return None
1176        assert!(VnodeUserKeyRangeCollector::with_limit(None).is_none());
1177        assert!(VnodeUserKeyRangeCollector::with_limit(Some(0)).is_none());
1178
1179        // Test 2: Empty collector (no keys) should return None
1180        let collector = VnodeUserKeyRangeCollector::with_limit(Some(1024)).unwrap();
1181        assert!(
1182            collector
1183                .finish(&encode_full_key(VirtualNode::ZERO, b"any"))
1184                .is_none()
1185        );
1186
1187        // Test 3: Single vnode (optimization: don't emit hints for single-vnode SSTs)
1188        let vnode = VirtualNode::from_index(5);
1189        let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(1024)).unwrap();
1190        let a = encode_full_key(vnode, b"a");
1191        let b = encode_full_key(vnode, b"b");
1192        let c = encode_full_key(vnode, b"c");
1193        collector.observe_key(vnode, &a, &[]);
1194        collector.observe_key(vnode, &b, &a);
1195        collector.observe_key(vnode, &c, &b);
1196        assert!(
1197            collector.finish(&c).is_none(),
1198            "Single-vnode SST should not emit hints"
1199        );
1200
1201        // Test 4: Extreme limit (1 byte) - first vnode exceeds, becomes single-vnode
1202        // Range size = sizeof(VirtualNode=2) + encoded_key_len * 2
1203        // With 1-byte limit: vnode_1 range exceeds but allowed (over-limit write)
1204        // vnode_2 check: current_size >= max_bytes, stop. Result: only vnode_1, returns None (single-vnode)
1205        let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(1)).unwrap();
1206        let vnode_1 = VirtualNode::from_index(1);
1207        let vnode_2 = VirtualNode::from_index(2);
1208        let key1 = encode_full_key(vnode_1, b"key1");
1209        let key2 = encode_full_key(vnode_1, b"key2");
1210        let key3 = encode_full_key(vnode_2, b"key3");
1211        collector.observe_key(vnode_1, &key1, &[]);
1212        collector.observe_key(vnode_1, &key2, &key1);
1213        collector.observe_key(vnode_2, &key3, &key2); // Seals vnode_1, stops before vnode_2
1214        assert!(
1215            collector.finish(&key3).is_none(),
1216            "Only 1 vnode collected, should return None"
1217        );
1218    }
1219
1220    #[tokio::test]
1221    async fn test_basic() {
1222        let opt = default_builder_opt_for_test();
1223
1224        let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
1225        let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
1226        let mut b = SstableBuilder::for_test(
1227            0,
1228            mock_sst_writer(&opt),
1229            opt,
1230            table_id_to_vnode,
1231            table_id_to_watermark_serde,
1232        );
1233
1234        for i in 0..TEST_KEYS_COUNT {
1235            b.add_for_test(
1236                test_key_of(i).to_ref(),
1237                HummockValue::put(&test_value_of(i)),
1238            )
1239            .await
1240            .unwrap();
1241        }
1242
1243        let output = b.finish().await.unwrap();
1244        let info = output.sst_info.sst_info;
1245
1246        assert_bytes_eq!(test_key_of(0).encode(), info.key_range.left);
1247        assert_bytes_eq!(
1248            test_key_of(TEST_KEYS_COUNT - 1).encode(),
1249            info.key_range.right
1250        );
1251        let (data, meta) = output.writer_output;
1252        assert_eq!(info.file_size, meta.estimated_size as u64);
1253        let offset = info.meta_offset as usize;
1254        let meta2 = SstableMeta::decode(&data[offset..]).unwrap();
1255        assert_eq!(meta2, meta);
1256    }
1257
1258    async fn test_with_xor_filter_builder<F: FilterBuilder>(
1259        bloom_false_positive: f64,
1260        expected_filter_type: PbSstableFilterType,
1261        expected_filter_layout: PbSstableFilterLayout,
1262    ) {
1263        let key_count = 1000;
1264
1265        let opts = SstableBuilderOptions {
1266            capacity: 0,
1267            block_capacity: 4096,
1268            restart_interval: 16,
1269            bloom_false_positive,
1270            ..Default::default()
1271        };
1272
1273        // build remote table
1274        let sstable_store = mock_sstable_store().await;
1275        let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
1276        let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
1277        let sst_info = gen_test_sstable_impl::<Vec<u8>, F>(
1278            opts,
1279            0,
1280            (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
1281            sstable_store.clone(),
1282            CachePolicy::NotFill,
1283            table_id_to_vnode,
1284            table_id_to_watermark_serde,
1285        )
1286        .await;
1287        assert_eq!(sst_info.filter_type, expected_filter_type);
1288        assert_eq!(sst_info.filter_layout, expected_filter_layout);
1289        let table = sstable_store
1290            .sstable(&sst_info, &mut StoreLocalStatistic::default())
1291            .await
1292            .unwrap();
1293
1294        assert!(table.has_filter());
1295        for i in 0..key_count {
1296            let full_key = test_key_of(i);
1297            let hash = Sstable::hash_for_filter(full_key.user_key.encode().as_slice(), 0);
1298            let key_ref = full_key.user_key.as_ref();
1299            assert!(
1300                table.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash),
1301                "failed at {}",
1302                i
1303            );
1304        }
1305    }
1306
1307    #[tokio::test]
1308    async fn test_xor_filter_builder_output() {
1309        test_with_xor_filter_builder::<Xor16FilterBuilder>(
1310            0.0,
1311            PbSstableFilterType::SstableFilterXor16,
1312            PbSstableFilterLayout::Plain,
1313        )
1314        .await;
1315        test_with_xor_filter_builder::<Xor16FilterBuilder>(
1316            0.01,
1317            PbSstableFilterType::SstableFilterXor16,
1318            PbSstableFilterLayout::Plain,
1319        )
1320        .await;
1321        test_with_xor_filter_builder::<Xor8FilterBuilder>(
1322            0.01,
1323            PbSstableFilterType::SstableFilterXor8,
1324            PbSstableFilterLayout::Plain,
1325        )
1326        .await;
1327        test_with_xor_filter_builder::<BlockedXor16FilterBuilder>(
1328            0.01,
1329            PbSstableFilterType::SstableFilterXor16,
1330            PbSstableFilterLayout::Blocked,
1331        )
1332        .await;
1333    }
1334
1335    #[tokio::test]
1336    async fn test_no_xor_filter_block() {
1337        let opts = SstableBuilderOptions::default();
1338        // build remote table
1339        let sstable_store = mock_sstable_store().await;
1340        let writer_opts = SstableWriterOptions::default();
1341        let object_id = 1;
1342        let writer = sstable_store
1343            .clone()
1344            .create_sst_writer(object_id, writer_opts);
1345        let mut filter = MultiFilterKeyExtractor::default();
1346        filter.register(
1347            1.into(),
1348            FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
1349        );
1350        filter.register(
1351            2.into(),
1352            FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
1353        );
1354        filter.register(
1355            3.into(),
1356            FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
1357        );
1358
1359        let table_id_to_vnode = HashMap::from_iter(vec![
1360            (1.into(), VirtualNode::COUNT_FOR_TEST),
1361            (2.into(), VirtualNode::COUNT_FOR_TEST),
1362            (3.into(), VirtualNode::COUNT_FOR_TEST),
1363        ]);
1364        let table_id_to_watermark_serde =
1365            HashMap::from_iter(vec![(1.into(), None), (2.into(), None), (3.into(), None)]);
1366
1367        let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1368            FilterKeyExtractorImpl::Multi(filter),
1369            table_id_to_vnode,
1370            table_id_to_watermark_serde,
1371            HashMap::default(),
1372        ));
1373
1374        let mut builder = SstableBuilder::new(
1375            object_id,
1376            writer,
1377            BlockedXor16FilterBuilder::create(opts.filter_builder_options()),
1378            opts,
1379            compaction_catalog_agent_ref,
1380            None,
1381        );
1382
1383        let key_count: usize = 10000;
1384        for table_id in 1..4 {
1385            let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
1386            for idx in 0..key_count {
1387                table_key.resize(VirtualNode::SIZE, 0);
1388                table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
1389                let k = UserKey::for_test(TableId::new(table_id), table_key.as_ref());
1390                let v = test_value_of(idx);
1391                builder
1392                    .add(
1393                        FullKey::from_user_key(k, test_epoch(1)),
1394                        HummockValue::put(v.as_ref()),
1395                    )
1396                    .await
1397                    .unwrap();
1398            }
1399        }
1400        let ret = builder.finish().await.unwrap();
1401        let sst_info = ret.sst_info.sst_info.clone();
1402        ret.writer_output.await.unwrap().unwrap();
1403        let table = sstable_store
1404            .sstable(&sst_info, &mut StoreLocalStatistic::default())
1405            .await
1406            .unwrap();
1407        let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
1408        for idx in 0..key_count {
1409            table_key.resize(VirtualNode::SIZE, 0);
1410            table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
1411            let k = UserKey::for_test(TableId::new(2), table_key.as_slice());
1412            let hash = Sstable::hash_for_filter(&k.encode(), 2);
1413            let key_ref = k.as_ref();
1414            assert!(
1415                table.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash)
1416            );
1417        }
1418    }
1419}