risingwave_storage/hummock/sstable/
builder.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashMap};
16use std::sync::Arc;
17use std::time::SystemTime;
18
19use bytes::{Bytes, BytesMut};
20use risingwave_common::catalog::TableId;
21use risingwave_common::util::row_serde::OrderedRowSerde;
22use risingwave_hummock_sdk::key::{FullKey, MAX_KEY_LEN, TABLE_PREFIX_LEN, user_key};
23use risingwave_hummock_sdk::key_range::KeyRange;
24use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
25use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap};
26use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, LocalSstableInfo};
27use risingwave_pb::hummock::BloomFilterType;
28
29use super::utils::CompressionAlgorithm;
30use super::{
31    BlockBuilder, BlockBuilderOptions, BlockMeta, DEFAULT_BLOCK_SIZE, DEFAULT_ENTRY_SIZE,
32    DEFAULT_RESTART_INTERVAL, SstableMeta, SstableWriter, VERSION,
33};
34use crate::compaction_catalog_manager::{
35    CompactionCatalogAgent, CompactionCatalogAgentRef, FilterKeyExtractorImpl,
36    FullKeyFilterKeyExtractor,
37};
38use crate::hummock::sstable::{FilterBuilder, utils};
39use crate::hummock::value::HummockValue;
40use crate::hummock::{
41    Block, BlockHolder, BlockIterator, HummockResult, MemoryLimiter, Xor16FilterBuilder,
42    try_shorten_block_smallest_key,
43};
44use crate::monitor::CompactorMetrics;
45use crate::opts::StorageOpts;
46
47pub const DEFAULT_SSTABLE_SIZE: usize = 4 * 1024 * 1024;
48pub const DEFAULT_BLOOM_FALSE_POSITIVE: f64 = 0.001;
49pub const DEFAULT_MAX_SST_SIZE: u64 = 512 * 1024 * 1024;
50pub const MIN_BLOCK_SIZE: usize = 8 * 1024;
51
52#[derive(Clone, Debug)]
53pub struct SstableBuilderOptions {
54    /// Approximate sstable capacity.
55    pub capacity: usize,
56    /// Approximate block capacity.
57    pub block_capacity: usize,
58    /// Restart point interval.
59    pub restart_interval: usize,
60    /// False positive probability of bloom filter.
61    pub bloom_false_positive: f64,
62    /// Compression algorithm.
63    pub compression_algorithm: CompressionAlgorithm,
64    pub max_sst_size: u64,
65    /// If set, block metadata keys will be shortened when their length exceeds this threshold.
66    pub shorten_block_meta_key_threshold: Option<usize>,
67}
68
69impl From<&StorageOpts> for SstableBuilderOptions {
70    fn from(options: &StorageOpts) -> SstableBuilderOptions {
71        let capacity: usize = (options.sstable_size_mb as usize) * (1 << 20);
72        SstableBuilderOptions {
73            capacity,
74            block_capacity: (options.block_size_kb as usize) * (1 << 10),
75            restart_interval: DEFAULT_RESTART_INTERVAL,
76            bloom_false_positive: options.bloom_false_positive,
77            compression_algorithm: CompressionAlgorithm::None,
78            max_sst_size: options.compactor_max_sst_size,
79            shorten_block_meta_key_threshold: options.shorten_block_meta_key_threshold,
80        }
81    }
82}
83
84impl Default for SstableBuilderOptions {
85    fn default() -> Self {
86        Self {
87            capacity: DEFAULT_SSTABLE_SIZE,
88            block_capacity: DEFAULT_BLOCK_SIZE,
89            restart_interval: DEFAULT_RESTART_INTERVAL,
90            bloom_false_positive: DEFAULT_BLOOM_FALSE_POSITIVE,
91            compression_algorithm: CompressionAlgorithm::None,
92            max_sst_size: DEFAULT_MAX_SST_SIZE,
93            shorten_block_meta_key_threshold: None,
94        }
95    }
96}
97
98pub struct SstableBuilderOutput<WO> {
99    pub sst_info: LocalSstableInfo,
100    pub writer_output: WO,
101    pub stats: SstableBuilderOutputStats,
102}
103
104pub struct SstableBuilder<W: SstableWriter, F: FilterBuilder> {
105    /// Options.
106    options: SstableBuilderOptions,
107    /// Data writer.
108    writer: W,
109    /// Current block builder.
110    block_builder: BlockBuilder,
111
112    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
113    /// Block metadata vec.
114    block_metas: Vec<BlockMeta>,
115
116    /// `table_id` of added keys.
117    table_ids: BTreeSet<TableId>,
118    last_full_key: Vec<u8>,
119    /// Buffer for encoded key and value to avoid allocation.
120    raw_key: BytesMut,
121    raw_value: BytesMut,
122    last_table_id: Option<TableId>,
123    sst_object_id: HummockSstableObjectId,
124
125    /// Per table stats.
126    table_stats: TableStatsMap,
127    /// `last_table_stats` accumulates stats for `last_table_id` and finalizes it in `table_stats`
128    /// by `finalize_last_table_stats`
129    last_table_stats: TableStats,
130
131    filter_builder: F,
132
133    epoch_set: BTreeSet<u64>,
134    memory_limiter: Option<Arc<MemoryLimiter>>,
135
136    block_size_vec: Vec<usize>, // for statistics
137}
138
139impl<W: SstableWriter> SstableBuilder<W, Xor16FilterBuilder> {
140    pub fn for_test(
141        sstable_id: u64,
142        writer: W,
143        options: SstableBuilderOptions,
144        table_id_to_vnode: HashMap<impl Into<TableId>, usize>,
145        table_id_to_watermark_serde: HashMap<
146            impl Into<TableId>,
147            Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
148        >,
149    ) -> Self {
150        let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
151            FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
152            table_id_to_vnode
153                .into_iter()
154                .map(|(table_id, v)| (table_id.into(), v))
155                .collect(),
156            table_id_to_watermark_serde
157                .into_iter()
158                .map(|(table_id, v)| (table_id.into(), v))
159                .collect(),
160            HashMap::default(),
161        ));
162
163        Self::new(
164            sstable_id,
165            writer,
166            Xor16FilterBuilder::new(options.capacity / DEFAULT_ENTRY_SIZE + 1),
167            options,
168            compaction_catalog_agent_ref,
169            None,
170        )
171    }
172}
173
174impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
175    pub fn new(
176        sst_object_id: impl Into<HummockSstableObjectId>,
177        writer: W,
178        filter_builder: F,
179        options: SstableBuilderOptions,
180        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
181        memory_limiter: Option<Arc<MemoryLimiter>>,
182    ) -> Self {
183        let sst_object_id = sst_object_id.into();
184        Self {
185            options: options.clone(),
186            writer,
187            block_builder: BlockBuilder::new(BlockBuilderOptions {
188                capacity: options.block_capacity,
189                restart_interval: options.restart_interval,
190                compression_algorithm: options.compression_algorithm,
191            }),
192            filter_builder,
193            block_metas: Vec::with_capacity(options.capacity / options.block_capacity + 1),
194            table_ids: BTreeSet::new(),
195            last_table_id: None,
196            raw_key: BytesMut::new(),
197            raw_value: BytesMut::new(),
198            last_full_key: vec![],
199            sst_object_id,
200            compaction_catalog_agent_ref,
201            table_stats: Default::default(),
202            last_table_stats: Default::default(),
203            epoch_set: BTreeSet::default(),
204            memory_limiter,
205            block_size_vec: Vec::new(),
206        }
207    }
208
209    /// Add kv pair to sstable.
210    pub async fn add_for_test(
211        &mut self,
212        full_key: FullKey<&[u8]>,
213        value: HummockValue<&[u8]>,
214    ) -> HummockResult<()> {
215        self.add(full_key, value).await
216    }
217
218    pub fn current_block_size(&self) -> usize {
219        self.block_builder.approximate_len()
220    }
221
222    /// Add raw data of block to sstable. return false means fallback
223    pub async fn add_raw_block(
224        &mut self,
225        buf: Bytes,
226        filter_data: Vec<u8>,
227        smallest_key: FullKey<Vec<u8>>,
228        largest_key: Vec<u8>,
229        mut meta: BlockMeta,
230    ) -> HummockResult<bool> {
231        let table_id = smallest_key.user_key.table_id;
232        if self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id {
233            if !self.block_builder.is_empty() {
234                // Try to finish the previous `Block`` when the `table_id` is switched, making sure that the data in the `Block` doesn't span two `table_ids`.
235                self.build_block().await?;
236            }
237
238            self.table_ids.insert(table_id);
239            self.finalize_last_table_stats();
240            self.last_table_id = Some(table_id);
241        }
242
243        if !self.block_builder.is_empty() {
244            let min_block_size = std::cmp::min(MIN_BLOCK_SIZE, self.options.block_capacity / 4);
245
246            // If the previous block is too small, we should merge it into the previous block.
247            if self.block_builder.approximate_len() < min_block_size {
248                let block = Block::decode(buf, meta.uncompressed_size as usize)?;
249                let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
250                iter.seek_to_first();
251                while iter.is_valid() {
252                    let value = HummockValue::from_slice(iter.value()).unwrap_or_else(|_| {
253                        panic!(
254                            "decode failed for fast compact sst_id {} block_idx {} last_table_id {:?}",
255                            self.sst_object_id, self.block_metas.len(), self.last_table_id
256                        )
257                    });
258                    self.add_impl(iter.key(), value, false).await?;
259                    iter.next();
260                }
261                return Ok(false);
262            }
263
264            self.build_block().await?;
265        }
266        self.last_full_key = largest_key;
267        assert_eq!(
268            meta.len as usize,
269            buf.len(),
270            "meta {} buf {} last_table_id {:?}",
271            meta.len,
272            buf.len(),
273            self.last_table_id
274        );
275        meta.offset = self.writer.data_len() as u32;
276        self.block_metas.push(meta);
277        self.filter_builder.add_raw_data(filter_data);
278        let block_meta = self.block_metas.last_mut().unwrap();
279        self.writer.write_block_bytes(buf, block_meta).await?;
280
281        Ok(true)
282    }
283
284    /// Add kv pair to sstable.
285    pub async fn add(
286        &mut self,
287        full_key: FullKey<&[u8]>,
288        value: HummockValue<&[u8]>,
289    ) -> HummockResult<()> {
290        self.add_impl(full_key, value, true).await
291    }
292
293    /// Add kv pair to sstable.
294    async fn add_impl(
295        &mut self,
296        full_key: FullKey<&[u8]>,
297        value: HummockValue<&[u8]>,
298        could_switch_block: bool,
299    ) -> HummockResult<()> {
300        const LARGE_KEY_LEN: usize = MAX_KEY_LEN >> 1;
301
302        let table_key_len = full_key.user_key.table_key.as_ref().len();
303        let table_value_len = match &value {
304            HummockValue::Put(t) => t.len(),
305            HummockValue::Delete => 0,
306        };
307        let large_value_len = self.options.max_sst_size as usize / 10;
308        let large_key_value_len = self.options.max_sst_size as usize / 2;
309        if table_key_len >= LARGE_KEY_LEN
310            || table_value_len > large_value_len
311            || table_key_len + table_value_len > large_key_value_len
312        {
313            let table_id = full_key.user_key.table_id;
314            tracing::warn!(
315                "A large key/value (table_id={}, key len={}, value len={}, epoch={}, spill offset={}) is added to block",
316                table_id,
317                table_key_len,
318                table_value_len,
319                full_key.epoch_with_gap.pure_epoch(),
320                full_key.epoch_with_gap.offset(),
321            );
322        }
323
324        // TODO: refine me
325        full_key.encode_into(&mut self.raw_key);
326        value.encode(&mut self.raw_value);
327        let is_new_user_key = self.last_full_key.is_empty()
328            || !user_key(&self.raw_key).eq(user_key(&self.last_full_key));
329        let table_id = full_key.user_key.table_id;
330        let is_new_table = self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id;
331        let current_block_size = self.current_block_size();
332        let is_block_full = current_block_size >= self.options.block_capacity
333            || (current_block_size > self.options.block_capacity / 4 * 3
334                && current_block_size + self.raw_value.len() + self.raw_key.len()
335                    > self.options.block_capacity);
336
337        if is_new_table {
338            assert!(
339                could_switch_block,
340                "is_new_user_key {} sst_id {} block_idx {} table_id {} last_table_id {:?} full_key {:?}",
341                is_new_user_key,
342                self.sst_object_id,
343                self.block_metas.len(),
344                table_id,
345                self.last_table_id,
346                full_key
347            );
348            self.table_ids.insert(table_id);
349            self.finalize_last_table_stats();
350            self.last_table_id = Some(table_id);
351            if !self.block_builder.is_empty() {
352                self.build_block().await?;
353            }
354        } else if is_block_full && could_switch_block {
355            self.build_block().await?;
356        }
357        self.last_table_stats.total_key_count += 1;
358        self.epoch_set.insert(full_key.epoch_with_gap.pure_epoch());
359
360        // Rotate block builder if the previous one has been built.
361        if self.block_builder.is_empty() {
362            let smallest_key = if let Some(threshold) =
363                self.options.shorten_block_meta_key_threshold
364                && !self.last_full_key.is_empty()
365                && full_key.encoded_len() >= threshold
366            {
367                let prev = FullKey::decode(&self.last_full_key);
368                if let Some(shortened) = try_shorten_block_smallest_key(&prev, &full_key) {
369                    shortened.encode()
370                } else {
371                    full_key.encode()
372                }
373            } else {
374                full_key.encode()
375            };
376
377            self.block_metas.push(BlockMeta {
378                offset: utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
379                    panic!(
380                        "WARN overflow can't convert writer_data_len {} into u32 sst_id {} block_idx {} tables {:?}",
381                        self.writer.data_len(),
382                        self.sst_object_id,
383                        self.block_metas.len(),
384                        self.table_ids,
385                    )
386                }),
387                len: 0,
388                smallest_key,
389                uncompressed_size: 0,
390                total_key_count: 0,
391                stale_key_count: 0,
392            });
393        }
394
395        let table_id = full_key.user_key.table_id;
396        let mut extract_key = user_key(&self.raw_key);
397        extract_key = self.compaction_catalog_agent_ref.extract(extract_key);
398        // add bloom_filter check
399        if !extract_key.is_empty() {
400            self.filter_builder
401                .add_key(extract_key, table_id.as_raw_id());
402        }
403        // Use pre-encoded key to avoid redundant encoding
404        self.block_builder.add(
405            table_id,
406            &self.raw_key[TABLE_PREFIX_LEN..],
407            self.raw_value.as_ref(),
408        );
409        self.block_metas.last_mut().unwrap().total_key_count += 1;
410        if !is_new_user_key || value.is_delete() {
411            self.block_metas.last_mut().unwrap().stale_key_count += 1;
412        }
413        self.last_table_stats.total_key_size += full_key.encoded_len() as i64;
414        self.last_table_stats.total_value_size += value.encoded_len() as i64;
415
416        self.last_full_key.clear();
417        self.last_full_key.extend_from_slice(&self.raw_key);
418
419        self.raw_key.clear();
420        self.raw_value.clear();
421        Ok(())
422    }
423
424    /// Finish building sst.
425    ///
426    /// # Format
427    ///
428    /// data:
429    ///
430    /// ```plain
431    /// | Block 0 | ... | Block N-1 | N (4B) |
432    /// ```
433    pub async fn finish(mut self) -> HummockResult<SstableBuilderOutput<W::Output>> {
434        let smallest_key = if self.block_metas.is_empty() {
435            vec![]
436        } else {
437            self.block_metas[0].smallest_key.clone()
438        };
439        let largest_key = self.last_full_key.clone();
440        self.finalize_last_table_stats();
441
442        self.build_block().await?;
443        let right_exclusive = false;
444        let meta_offset = self.writer.data_len() as u64;
445
446        let bloom_filter_kind = if self.filter_builder.support_blocked_raw_data() {
447            BloomFilterType::Blocked
448        } else {
449            BloomFilterType::Sstable
450        };
451        let bloom_filter = if self.options.bloom_false_positive > 0.0 {
452            self.filter_builder.finish(self.memory_limiter.clone())
453        } else {
454            vec![]
455        };
456
457        let total_key_count = self
458            .block_metas
459            .iter()
460            .map(|block_meta| block_meta.total_key_count as u64)
461            .sum::<u64>();
462        let stale_key_count = self
463            .block_metas
464            .iter()
465            .map(|block_meta| block_meta.stale_key_count as u64)
466            .sum::<u64>();
467        let uncompressed_file_size = self
468            .block_metas
469            .iter()
470            .map(|block_meta| block_meta.uncompressed_size as u64)
471            .sum::<u64>();
472
473        #[expect(deprecated)]
474        let mut meta = SstableMeta {
475            block_metas: self.block_metas,
476            bloom_filter,
477            estimated_size: 0,
478            key_count: utils::checked_into_u32(total_key_count).unwrap_or_else(|_| {
479                panic!(
480                    "WARN overflow can't convert total_key_count {} into u32 tables {:?}",
481                    total_key_count, self.table_ids,
482                )
483            }),
484            smallest_key,
485            largest_key,
486            version: VERSION,
487            meta_offset,
488            monotonic_tombstone_events: vec![],
489        };
490
491        let meta_encode_size = meta.encoded_size();
492        let encoded_size_u32 = utils::checked_into_u32(meta_encode_size).unwrap_or_else(|_| {
493            panic!(
494                "WARN overflow can't convert meta_encoded_size {} into u32 tables {:?}",
495                meta_encode_size, self.table_ids,
496            )
497        });
498        let meta_offset_u32 = utils::checked_into_u32(meta_offset).unwrap_or_else(|_| {
499            panic!(
500                "WARN overflow can't convert meta_offset {} into u32 tables {:?}",
501                meta_offset, self.table_ids,
502            )
503        });
504        meta.estimated_size = encoded_size_u32
505            .checked_add(meta_offset_u32)
506            .unwrap_or_else(|| {
507                panic!(
508                    "WARN overflow encoded_size_u32 {} meta_offset_u32 {} table_id {:?} table_ids {:?}",
509                    encoded_size_u32, meta_offset_u32, self.last_table_id, self.table_ids
510                )
511            });
512
513        let (avg_key_size, avg_value_size) = if self.table_stats.is_empty() {
514            (0, 0)
515        } else {
516            let total_key_count: usize = self
517                .table_stats
518                .values()
519                .map(|s| s.total_key_count as usize)
520                .sum();
521
522            if total_key_count == 0 {
523                (0, 0)
524            } else {
525                let total_key_size: usize = self
526                    .table_stats
527                    .values()
528                    .map(|s| s.total_key_size as usize)
529                    .sum();
530
531                let total_value_size: usize = self
532                    .table_stats
533                    .values()
534                    .map(|s| s.total_value_size as usize)
535                    .sum();
536
537                (
538                    total_key_size / total_key_count,
539                    total_value_size / total_key_count,
540                )
541            }
542        };
543
544        let (min_epoch, max_epoch) = {
545            if self.epoch_set.is_empty() {
546                (HummockEpoch::MAX, u64::MIN)
547            } else {
548                (
549                    *self.epoch_set.first().unwrap(),
550                    *self.epoch_set.last().unwrap(),
551                )
552            }
553        };
554
555        let sst_info: SstableInfo = SstableInfoInner {
556            object_id: self.sst_object_id,
557            // use the same sst_id as object_id for initial sst
558            sst_id: self.sst_object_id.inner().into(),
559            bloom_filter_kind,
560            key_range: KeyRange {
561                left: Bytes::from(meta.smallest_key.clone()),
562                right: Bytes::from(meta.largest_key.clone()),
563                right_exclusive,
564            },
565            file_size: meta.estimated_size as u64,
566            table_ids: self.table_ids.into_iter().collect(),
567            meta_offset: meta.meta_offset,
568            stale_key_count,
569            total_key_count,
570            uncompressed_file_size: uncompressed_file_size + meta.encoded_size() as u64,
571            min_epoch,
572            max_epoch,
573            range_tombstone_count: 0,
574            sst_size: meta.estimated_size as u64,
575        }
576        .into();
577        tracing::trace!(
578            "meta_size {} bloom_filter_size {}  add_key_counts {} stale_key_count {} min_epoch {} max_epoch {} epoch_count {}",
579            meta.encoded_size(),
580            meta.bloom_filter.len(),
581            total_key_count,
582            stale_key_count,
583            min_epoch,
584            max_epoch,
585            self.epoch_set.len()
586        );
587        let bloom_filter_size = meta.bloom_filter.len();
588        let sstable_file_size = sst_info.file_size as usize;
589
590        if !meta.block_metas.is_empty() {
591            // fill total_compressed_size
592            let mut last_table_id = meta.block_metas[0].table_id();
593            let mut last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
594            for block_meta in &meta.block_metas {
595                let block_table_id = block_meta.table_id();
596                if last_table_id != block_table_id {
597                    last_table_id = block_table_id;
598                    last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
599                }
600
601                last_table_stats.total_compressed_size += block_meta.len as u64;
602            }
603        }
604
605        let writer_output = self.writer.finish(meta).await?;
606        // The timestamp is only used during full GC.
607        //
608        // Ideally object store object's last_modified should be used.
609        // However, it'll incur additional IO overhead since S3 lacks an interface to retrieve the last_modified timestamp after the PUT operation on an object.
610        //
611        // The local timestamp below is expected to precede the last_modified of object store object, given that the object store object is created afterward.
612        // It should help alleviate the clock drift issue.
613
614        let now = SystemTime::now()
615            .duration_since(SystemTime::UNIX_EPOCH)
616            .expect("Clock may have gone backwards")
617            .as_secs();
618        Ok(SstableBuilderOutput::<W::Output> {
619            sst_info: LocalSstableInfo::new(sst_info, self.table_stats, now),
620            writer_output,
621            stats: SstableBuilderOutputStats {
622                bloom_filter_size,
623                avg_key_size,
624                avg_value_size,
625                epoch_count: self.epoch_set.len(),
626                block_size_vec: self.block_size_vec,
627                sstable_file_size,
628            },
629        })
630    }
631
632    pub fn approximate_len(&self) -> usize {
633        self.writer.data_len()
634            + self.block_builder.approximate_len()
635            + self.filter_builder.approximate_len()
636    }
637
638    pub async fn build_block(&mut self) -> HummockResult<()> {
639        // Skip empty block.
640        if self.block_builder.is_empty() {
641            return Ok(());
642        }
643
644        let block_meta = self.block_metas.last_mut().unwrap();
645        let uncompressed_block_size = self.block_builder.uncompressed_block_size();
646        block_meta.uncompressed_size = utils::checked_into_u32(uncompressed_block_size)
647            .unwrap_or_else(|_| {
648                panic!(
649                    "WARN overflow can't convert uncompressed_block_size {} into u32 table {:?}",
650                    uncompressed_block_size,
651                    self.block_builder.table_id(),
652                )
653            });
654        let block = self.block_builder.build();
655        self.writer.write_block(block, block_meta).await?;
656        self.block_size_vec.push(block.len());
657        self.filter_builder
658            .switch_block(self.memory_limiter.clone());
659        let data_len = utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
660            panic!(
661                "WARN overflow can't convert writer_data_len {} into u32 table {:?}",
662                self.writer.data_len(),
663                self.block_builder.table_id(),
664            )
665        });
666        block_meta.len = data_len.checked_sub(block_meta.offset).unwrap_or_else(|| {
667            panic!(
668                "data_len should >= meta_offset, found data_len={}, meta_offset={}",
669                data_len, block_meta.offset
670            )
671        });
672
673        if data_len as usize > self.options.capacity * 2 {
674            tracing::warn!(
675                "WARN unexpected block size {} table {:?}",
676                data_len,
677                self.block_builder.table_id()
678            );
679        }
680
681        self.block_builder.clear();
682        Ok(())
683    }
684
685    pub fn is_empty(&self) -> bool {
686        self.writer.data_len() > 0
687    }
688
689    /// Returns true if we roughly reached capacity
690    pub fn reach_capacity(&self) -> bool {
691        self.approximate_len() >= self.options.capacity
692    }
693
694    fn finalize_last_table_stats(&mut self) {
695        if self.table_ids.is_empty() || self.last_table_id.is_none() {
696            return;
697        }
698        self.table_stats.insert(
699            self.last_table_id.unwrap(),
700            std::mem::take(&mut self.last_table_stats),
701        );
702    }
703}
704
705pub struct SstableBuilderOutputStats {
706    bloom_filter_size: usize,
707    avg_key_size: usize,
708    avg_value_size: usize,
709    epoch_count: usize,
710    block_size_vec: Vec<usize>, // for statistics
711    sstable_file_size: usize,
712}
713
714impl SstableBuilderOutputStats {
715    pub fn report_stats(&self, metrics: &Arc<CompactorMetrics>) {
716        if self.bloom_filter_size != 0 {
717            metrics
718                .sstable_bloom_filter_size
719                .observe(self.bloom_filter_size as _);
720        }
721
722        if self.sstable_file_size != 0 {
723            metrics
724                .sstable_file_size
725                .observe(self.sstable_file_size as _);
726        }
727
728        if self.avg_key_size != 0 {
729            metrics.sstable_avg_key_size.observe(self.avg_key_size as _);
730        }
731
732        if self.avg_value_size != 0 {
733            metrics
734                .sstable_avg_value_size
735                .observe(self.avg_value_size as _);
736        }
737
738        if self.epoch_count != 0 {
739            metrics
740                .sstable_distinct_epoch_count
741                .observe(self.epoch_count as _);
742        }
743
744        if !self.block_size_vec.is_empty() {
745            for block_size in &self.block_size_vec {
746                metrics.sstable_block_size.observe(*block_size as _);
747            }
748        }
749    }
750}
751
752#[cfg(test)]
753pub(super) mod tests {
754    use std::collections::{Bound, HashMap};
755
756    use risingwave_common::catalog::TableId;
757    use risingwave_common::hash::VirtualNode;
758    use risingwave_common::util::epoch::test_epoch;
759    use risingwave_hummock_sdk::key::UserKey;
760
761    use super::*;
762    use crate::assert_bytes_eq;
763    use crate::compaction_catalog_manager::{
764        CompactionCatalogAgent, DummyFilterKeyExtractor, MultiFilterKeyExtractor,
765    };
766    use crate::hummock::iterator::test_utils::mock_sstable_store;
767    use crate::hummock::sstable::xor_filter::BlockedXor16FilterBuilder;
768    use crate::hummock::test_utils::{
769        TEST_KEYS_COUNT, default_builder_opt_for_test, gen_test_sstable_impl, mock_sst_writer,
770        test_key_of, test_value_of,
771    };
772    use crate::hummock::{CachePolicy, Sstable, SstableWriterOptions, Xor8FilterBuilder};
773    use crate::monitor::StoreLocalStatistic;
774
775    #[tokio::test]
776    async fn test_empty() {
777        let opt = SstableBuilderOptions {
778            capacity: 0,
779            block_capacity: 4096,
780            restart_interval: 16,
781            bloom_false_positive: 0.001,
782            ..Default::default()
783        };
784
785        let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
786        let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
787        let b = SstableBuilder::for_test(
788            0,
789            mock_sst_writer(&opt),
790            opt,
791            table_id_to_vnode,
792            table_id_to_watermark_serde,
793        );
794
795        b.finish().await.unwrap();
796    }
797
798    #[tokio::test]
799    async fn test_basic() {
800        let opt = default_builder_opt_for_test();
801
802        let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
803        let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
804        let mut b = SstableBuilder::for_test(
805            0,
806            mock_sst_writer(&opt),
807            opt,
808            table_id_to_vnode,
809            table_id_to_watermark_serde,
810        );
811
812        for i in 0..TEST_KEYS_COUNT {
813            b.add_for_test(
814                test_key_of(i).to_ref(),
815                HummockValue::put(&test_value_of(i)),
816            )
817            .await
818            .unwrap();
819        }
820
821        let output = b.finish().await.unwrap();
822        let info = output.sst_info.sst_info;
823
824        assert_bytes_eq!(test_key_of(0).encode(), info.key_range.left);
825        assert_bytes_eq!(
826            test_key_of(TEST_KEYS_COUNT - 1).encode(),
827            info.key_range.right
828        );
829        let (data, meta) = output.writer_output;
830        assert_eq!(info.file_size, meta.estimated_size as u64);
831        let offset = info.meta_offset as usize;
832        let meta2 = SstableMeta::decode(&data[offset..]).unwrap();
833        assert_eq!(meta2, meta);
834    }
835
836    async fn test_with_bloom_filter<F: FilterBuilder>(with_blooms: bool) {
837        let key_count = 1000;
838
839        let opts = SstableBuilderOptions {
840            capacity: 0,
841            block_capacity: 4096,
842            restart_interval: 16,
843            bloom_false_positive: if with_blooms { 0.01 } else { 0.0 },
844            ..Default::default()
845        };
846
847        // build remote table
848        let sstable_store = mock_sstable_store().await;
849        let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
850        let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
851        let sst_info = gen_test_sstable_impl::<Vec<u8>, F>(
852            opts,
853            0,
854            (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
855            sstable_store.clone(),
856            CachePolicy::NotFill,
857            table_id_to_vnode,
858            table_id_to_watermark_serde,
859        )
860        .await;
861        let table = sstable_store
862            .sstable(&sst_info, &mut StoreLocalStatistic::default())
863            .await
864            .unwrap();
865
866        assert_eq!(table.has_bloom_filter(), with_blooms);
867        for i in 0..key_count {
868            let full_key = test_key_of(i);
869            if table.has_bloom_filter() {
870                let hash = Sstable::hash_for_bloom_filter(full_key.user_key.encode().as_slice(), 0);
871                let key_ref = full_key.user_key.as_ref();
872                assert!(
873                    table.may_match_hash(
874                        &(Bound::Included(key_ref), Bound::Included(key_ref)),
875                        hash
876                    ),
877                    "failed at {}",
878                    i
879                );
880            }
881        }
882    }
883
884    #[tokio::test]
885    async fn test_bloom_filter() {
886        test_with_bloom_filter::<Xor16FilterBuilder>(false).await;
887        test_with_bloom_filter::<Xor16FilterBuilder>(true).await;
888        test_with_bloom_filter::<Xor8FilterBuilder>(true).await;
889        test_with_bloom_filter::<BlockedXor16FilterBuilder>(true).await;
890    }
891
892    #[tokio::test]
893    async fn test_no_bloom_filter_block() {
894        let opts = SstableBuilderOptions::default();
895        // build remote table
896        let sstable_store = mock_sstable_store().await;
897        let writer_opts = SstableWriterOptions::default();
898        let object_id = 1;
899        let writer = sstable_store
900            .clone()
901            .create_sst_writer(object_id, writer_opts);
902        let mut filter = MultiFilterKeyExtractor::default();
903        filter.register(
904            1.into(),
905            FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
906        );
907        filter.register(
908            2.into(),
909            FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
910        );
911        filter.register(
912            3.into(),
913            FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
914        );
915
916        let table_id_to_vnode = HashMap::from_iter(vec![
917            (1.into(), VirtualNode::COUNT_FOR_TEST),
918            (2.into(), VirtualNode::COUNT_FOR_TEST),
919            (3.into(), VirtualNode::COUNT_FOR_TEST),
920        ]);
921        let table_id_to_watermark_serde =
922            HashMap::from_iter(vec![(1.into(), None), (2.into(), None), (3.into(), None)]);
923
924        let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
925            FilterKeyExtractorImpl::Multi(filter),
926            table_id_to_vnode,
927            table_id_to_watermark_serde,
928            HashMap::default(),
929        ));
930
931        let mut builder = SstableBuilder::new(
932            object_id,
933            writer,
934            BlockedXor16FilterBuilder::new(1024),
935            opts,
936            compaction_catalog_agent_ref,
937            None,
938        );
939
940        let key_count: usize = 10000;
941        for table_id in 1..4 {
942            let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
943            for idx in 0..key_count {
944                table_key.resize(VirtualNode::SIZE, 0);
945                table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
946                let k = UserKey::for_test(TableId::new(table_id), table_key.as_ref());
947                let v = test_value_of(idx);
948                builder
949                    .add(
950                        FullKey::from_user_key(k, test_epoch(1)),
951                        HummockValue::put(v.as_ref()),
952                    )
953                    .await
954                    .unwrap();
955            }
956        }
957        let ret = builder.finish().await.unwrap();
958        let sst_info = ret.sst_info.sst_info.clone();
959        ret.writer_output.await.unwrap().unwrap();
960        let table = sstable_store
961            .sstable(&sst_info, &mut StoreLocalStatistic::default())
962            .await
963            .unwrap();
964        let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
965        for idx in 0..key_count {
966            table_key.resize(VirtualNode::SIZE, 0);
967            table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
968            let k = UserKey::for_test(TableId::new(2), table_key.as_slice());
969            let hash = Sstable::hash_for_bloom_filter(&k.encode(), 2);
970            let key_ref = k.as_ref();
971            assert!(
972                table.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash)
973            );
974        }
975    }
976}