1use std::collections::{BTreeMap, BTreeSet, HashMap};
16use std::mem;
17use std::sync::Arc;
18use std::time::SystemTime;
19
20use bytes::{Bytes, BytesMut};
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::util::row_serde::OrderedRowSerde;
24use risingwave_hummock_sdk::key::{FullKey, MAX_KEY_LEN, TABLE_PREFIX_LEN, UserKey, user_key};
25use risingwave_hummock_sdk::key_range::KeyRange;
26use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner, VnodeStatistics};
27use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap};
28use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, LocalSstableInfo};
29use risingwave_pb::hummock::{PbSstableFilterLayout, PbSstableFilterType};
30
31use super::utils::CompressionAlgorithm;
32use super::{
33 BlockBuilder, BlockBuilderOptions, BlockMeta, DEFAULT_BLOCK_SIZE, DEFAULT_ENTRY_SIZE,
34 DEFAULT_RESTART_INTERVAL, SstableMeta, SstableWriter, VERSION,
35};
36use crate::compaction_catalog_manager::{
37 CompactionCatalogAgent, CompactionCatalogAgentRef, FilterKeyExtractorImpl,
38 FullKeyFilterKeyExtractor,
39};
40use crate::hummock::sstable::{
41 DEFAULT_FILTER_HASH_PREALLOC_KEY_COUNT_CAP, FilterBuilder, FilterBuilderOptions, utils,
42};
43use crate::hummock::value::HummockValue;
44use crate::hummock::{
45 Block, BlockHolder, BlockIterator, HummockResult, MemoryLimiter, Xor16FilterBuilder,
46 try_shorten_block_smallest_key,
47};
48use crate::monitor::CompactorMetrics;
49use crate::opts::StorageOpts;
50
51pub const DEFAULT_SSTABLE_SIZE: usize = 4 * 1024 * 1024;
52pub const DEFAULT_BLOOM_FALSE_POSITIVE: f64 = 0.001;
53pub const DEFAULT_MAX_SST_SIZE: u64 = 512 * 1024 * 1024;
54pub const MIN_BLOCK_SIZE: usize = 8 * 1024;
55
56#[derive(Clone, Debug)]
57pub struct SstableBuilderOptions {
58 pub capacity: usize,
60 pub block_capacity: usize,
62 pub restart_interval: usize,
64 pub bloom_false_positive: f64,
66 pub compression_algorithm: CompressionAlgorithm,
68 pub max_sst_size: u64,
69 pub shorten_block_meta_key_threshold: Option<usize>,
71 pub max_vnode_key_range_bytes: Option<usize>,
73 pub estimated_output_key_count: Option<usize>,
75 pub filter_hash_prealloc_key_count_cap: usize,
77}
78
79impl From<&StorageOpts> for SstableBuilderOptions {
80 fn from(options: &StorageOpts) -> SstableBuilderOptions {
81 let capacity: usize = (options.sstable_size_mb as usize) * (1 << 20);
82 SstableBuilderOptions {
83 capacity,
84 block_capacity: (options.block_size_kb as usize) * (1 << 10),
85 restart_interval: DEFAULT_RESTART_INTERVAL,
86 bloom_false_positive: options.bloom_false_positive,
87 compression_algorithm: CompressionAlgorithm::None,
88 max_sst_size: options.compactor_max_sst_size,
89 shorten_block_meta_key_threshold: options.shorten_block_meta_key_threshold,
90 max_vnode_key_range_bytes: None,
91 estimated_output_key_count: None,
92 filter_hash_prealloc_key_count_cap: DEFAULT_FILTER_HASH_PREALLOC_KEY_COUNT_CAP,
93 }
94 }
95}
96
97impl Default for SstableBuilderOptions {
98 fn default() -> Self {
99 Self {
100 capacity: DEFAULT_SSTABLE_SIZE,
101 block_capacity: DEFAULT_BLOCK_SIZE,
102 restart_interval: DEFAULT_RESTART_INTERVAL,
103 bloom_false_positive: DEFAULT_BLOOM_FALSE_POSITIVE,
104 compression_algorithm: CompressionAlgorithm::None,
105 max_sst_size: DEFAULT_MAX_SST_SIZE,
106 shorten_block_meta_key_threshold: None,
107 max_vnode_key_range_bytes: None,
108 estimated_output_key_count: None,
109 filter_hash_prealloc_key_count_cap: DEFAULT_FILTER_HASH_PREALLOC_KEY_COUNT_CAP,
110 }
111 }
112}
113
114impl SstableBuilderOptions {
115 pub fn estimated_output_key_count(&self) -> usize {
116 self.estimated_output_key_count
117 .unwrap_or(self.capacity / DEFAULT_ENTRY_SIZE + 1)
118 }
119
120 pub fn filter_builder_options(&self) -> FilterBuilderOptions {
121 FilterBuilderOptions {
122 estimated_key_count: self.estimated_output_key_count(),
123 estimated_block_count: self.capacity / self.block_capacity + 1,
124 hash_prealloc_key_count_cap: self.filter_hash_prealloc_key_count_cap,
125 }
126 }
127}
128
129pub struct SstableBuilderOutput<WO> {
130 pub sst_info: LocalSstableInfo,
131 pub writer_output: WO,
132 pub stats: SstableBuilderOutputStats,
133}
134
135pub struct SstableBuilder<W: SstableWriter, F: FilterBuilder> {
136 options: SstableBuilderOptions,
138 writer: W,
140 block_builder: BlockBuilder,
142
143 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
144 block_metas: Vec<BlockMeta>,
146
147 table_ids: BTreeSet<TableId>,
149 last_full_key: Vec<u8>,
150 raw_key: BytesMut,
152 raw_value: BytesMut,
153 last_table_id: Option<TableId>,
154 sst_object_id: HummockSstableObjectId,
155
156 table_stats: TableStatsMap,
158 last_table_stats: TableStats,
161
162 filter_builder: F,
163
164 epoch_set: BTreeSet<u64>,
165 memory_limiter: Option<Arc<MemoryLimiter>>,
166
167 block_size_vec: Vec<usize>, vnode_range_collector: Option<VnodeUserKeyRangeCollector>,
169}
170
171impl<W: SstableWriter> SstableBuilder<W, Xor16FilterBuilder> {
172 pub fn for_test(
173 sstable_id: u64,
174 writer: W,
175 options: SstableBuilderOptions,
176 table_id_to_vnode: HashMap<impl Into<TableId>, usize>,
177 table_id_to_watermark_serde: HashMap<
178 impl Into<TableId>,
179 Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
180 >,
181 ) -> Self {
182 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
183 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
184 table_id_to_vnode
185 .into_iter()
186 .map(|(table_id, v)| (table_id.into(), v))
187 .collect(),
188 table_id_to_watermark_serde
189 .into_iter()
190 .map(|(table_id, v)| (table_id.into(), v))
191 .collect(),
192 HashMap::default(),
193 ));
194
195 Self::new(
196 sstable_id,
197 writer,
198 Xor16FilterBuilder::create(options.filter_builder_options()),
199 options,
200 compaction_catalog_agent_ref,
201 None,
202 )
203 }
204}
205
206impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
207 pub fn new(
208 sst_object_id: impl Into<HummockSstableObjectId>,
209 writer: W,
210 filter_builder: F,
211 options: SstableBuilderOptions,
212 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
213 memory_limiter: Option<Arc<MemoryLimiter>>,
214 ) -> Self {
215 let sst_object_id = sst_object_id.into();
216 Self {
217 vnode_range_collector: VnodeUserKeyRangeCollector::with_limit(
218 options.max_vnode_key_range_bytes,
219 ),
220 options: options.clone(),
221 writer,
222 block_builder: BlockBuilder::new(BlockBuilderOptions {
223 capacity: options.block_capacity,
224 restart_interval: options.restart_interval,
225 compression_algorithm: options.compression_algorithm,
226 }),
227 filter_builder,
228 block_metas: Vec::with_capacity(options.capacity / options.block_capacity + 1),
229 table_ids: BTreeSet::new(),
230 last_table_id: None,
231 raw_key: BytesMut::new(),
232 raw_value: BytesMut::new(),
233 last_full_key: vec![],
234 sst_object_id,
235 compaction_catalog_agent_ref,
236 table_stats: Default::default(),
237 last_table_stats: Default::default(),
238 epoch_set: BTreeSet::default(),
239 memory_limiter,
240 block_size_vec: Vec::new(),
241 }
242 }
243
244 pub async fn add_for_test(
246 &mut self,
247 full_key: FullKey<&[u8]>,
248 value: HummockValue<&[u8]>,
249 ) -> HummockResult<()> {
250 self.add(full_key, value).await
251 }
252
253 pub fn current_block_size(&self) -> usize {
254 self.block_builder.approximate_len()
255 }
256
257 pub async fn add_raw_block(
259 &mut self,
260 buf: Bytes,
261 filter_data: Vec<u8>,
262 smallest_key: FullKey<Vec<u8>>,
263 largest_key: Vec<u8>,
264 mut meta: BlockMeta,
265 ) -> HummockResult<bool> {
266 let table_id = smallest_key.user_key.table_id;
267 if self.last_table_id.is_none() || self.last_table_id.unwrap() != table_id {
268 if !self.block_builder.is_empty() {
269 self.build_block().await?;
271 }
272
273 self.table_ids.insert(table_id);
274 self.finalize_last_table_stats();
275 self.last_table_id = Some(table_id);
276 }
277
278 if !self.block_builder.is_empty() {
279 let min_block_size = std::cmp::min(MIN_BLOCK_SIZE, self.options.block_capacity / 4);
280
281 if self.block_builder.approximate_len() < min_block_size {
283 let block = Block::decode(buf, meta.uncompressed_size as usize)?;
284 let mut iter = BlockIterator::new(BlockHolder::from_owned_block(Box::new(block)));
285 iter.seek_to_first();
286 while iter.is_valid() {
287 let value = HummockValue::from_slice(iter.value()).unwrap_or_else(|_| {
288 panic!(
289 "decode failed for fast compact sst_id {} block_idx {} last_table_id {:?}",
290 self.sst_object_id, self.block_metas.len(), self.last_table_id
291 )
292 });
293 self.add_impl(iter.key(), value, false).await?;
294 iter.next();
295 }
296 return Ok(false);
297 }
298
299 self.build_block().await?;
300 }
301 self.last_full_key = largest_key;
302 assert_eq!(
303 meta.len as usize,
304 buf.len(),
305 "meta {} buf {} last_table_id {:?}",
306 meta.len,
307 buf.len(),
308 self.last_table_id
309 );
310 meta.offset = self.writer.data_len() as u32;
311 self.block_metas.push(meta);
312 self.filter_builder.add_raw_data(filter_data);
313 let block_meta = self.block_metas.last_mut().unwrap();
314 self.writer.write_block_bytes(buf, block_meta).await?;
315
316 Ok(true)
317 }
318
319 pub async fn add(
321 &mut self,
322 full_key: FullKey<&[u8]>,
323 value: HummockValue<&[u8]>,
324 ) -> HummockResult<()> {
325 self.add_impl(full_key, value, true).await
326 }
327
328 async fn add_impl(
330 &mut self,
331 full_key: FullKey<&[u8]>,
332 value: HummockValue<&[u8]>,
333 could_switch_block: bool,
334 ) -> HummockResult<()> {
335 const LARGE_KEY_LEN: usize = MAX_KEY_LEN >> 1;
336
337 let table_key_len = full_key.user_key.table_key.as_ref().len();
338 let table_value_len = match &value {
339 HummockValue::Put(t) => t.len(),
340 HummockValue::Delete => 0,
341 };
342 let large_value_len = self.options.max_sst_size as usize / 10;
343 let large_key_value_len = self.options.max_sst_size as usize / 2;
344 if table_key_len >= LARGE_KEY_LEN
345 || table_value_len > large_value_len
346 || table_key_len + table_value_len > large_key_value_len
347 {
348 let table_id = full_key.user_key.table_id;
349 tracing::warn!(
350 "A large key/value (table_id={}, key len={}, value len={}, epoch={}, spill offset={}) is added to block",
351 table_id,
352 table_key_len,
353 table_value_len,
354 full_key.epoch_with_gap.pure_epoch(),
355 full_key.epoch_with_gap.offset(),
356 );
357 }
358
359 full_key.encode_into(&mut self.raw_key);
361 value.encode(&mut self.raw_value);
362 let is_new_user_key = self.last_full_key.is_empty()
363 || !user_key(&self.raw_key).eq(user_key(self.last_full_key.as_slice()));
364 let table_id = full_key.user_key.table_id;
365 let is_new_table = self.last_table_id != Some(table_id);
366 let current_block_size = self.current_block_size();
367 let is_block_full = current_block_size >= self.options.block_capacity
368 || (current_block_size > self.options.block_capacity / 4 * 3
369 && current_block_size + self.raw_value.len() + self.raw_key.len()
370 > self.options.block_capacity);
371
372 if is_new_table {
373 assert!(
374 could_switch_block,
375 "is_new_user_key {} sst_id {} block_idx {} table_id {} last_table_id {:?} full_key {:?}",
376 is_new_user_key,
377 self.sst_object_id,
378 self.block_metas.len(),
379 table_id,
380 self.last_table_id,
381 full_key
382 );
383 self.table_ids.insert(table_id);
384 self.finalize_last_table_stats();
385 self.last_table_id = Some(table_id);
386 if !self.block_builder.is_empty() {
387 self.build_block().await?;
388 }
389 } else if is_block_full && could_switch_block {
390 self.build_block().await?;
391 }
392 self.last_table_stats.total_key_count += 1;
393 self.epoch_set.insert(full_key.epoch_with_gap.pure_epoch());
394
395 if self.block_builder.is_empty() {
397 let smallest_key = if let Some(threshold) =
398 self.options.shorten_block_meta_key_threshold
399 && !self.last_full_key.is_empty()
400 && full_key.encoded_len() >= threshold
401 {
402 let prev = FullKey::decode(&self.last_full_key);
403 if let Some(shortened) = try_shorten_block_smallest_key(&prev, &full_key) {
404 shortened.encode()
405 } else {
406 full_key.encode()
407 }
408 } else {
409 full_key.encode()
410 };
411
412 self.block_metas.push(BlockMeta {
413 offset: utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
414 panic!(
415 "WARN overflow can't convert writer_data_len {} into u32 sst_id {} block_idx {} tables {:?}",
416 self.writer.data_len(),
417 self.sst_object_id,
418 self.block_metas.len(),
419 self.table_ids,
420 )
421 }),
422 len: 0,
423 smallest_key,
424 uncompressed_size: 0,
425 total_key_count: 0,
426 stale_key_count: 0,
427 });
428 }
429
430 let filter_key = self
431 .compaction_catalog_agent_ref
432 .extract(user_key(&self.raw_key));
433 if !filter_key.is_empty() {
435 self.filter_builder
436 .add_key(filter_key, table_id.as_raw_id());
437 }
438 self.block_builder.add(
440 table_id,
441 &self.raw_key[TABLE_PREFIX_LEN..],
442 self.raw_value.as_ref(),
443 );
444 self.block_metas.last_mut().unwrap().total_key_count += 1;
445 if !is_new_user_key || value.is_delete() {
446 self.block_metas.last_mut().unwrap().stale_key_count += 1;
447 }
448 self.last_table_stats.total_key_size += full_key.encoded_len() as i64;
449 self.last_table_stats.total_value_size += value.encoded_len() as i64;
450
451 if let Some(collector) = self.vnode_range_collector.as_mut() {
452 collector.observe_key(
453 VirtualNode::from_index(full_key.user_key.get_vnode_id()),
454 &self.raw_key,
455 &self.last_full_key,
456 );
457 }
458
459 self.last_full_key.clear();
460 self.last_full_key.extend_from_slice(&self.raw_key);
461
462 self.raw_key.clear();
463 self.raw_value.clear();
464 Ok(())
465 }
466
467 pub async fn finish(mut self) -> HummockResult<SstableBuilderOutput<W::Output>> {
477 let smallest_key = if self.block_metas.is_empty() {
478 vec![]
479 } else {
480 self.block_metas[0].smallest_key.clone()
481 };
482 let largest_key = self.last_full_key.clone();
483 self.finalize_last_table_stats();
484
485 assert!(
488 self.table_ids.len() <= 1 || self.vnode_range_collector.is_none(),
489 "vnode key-range hints are only supported for single-table SSTs, found {} tables",
490 self.table_ids.len()
491 );
492
493 self.build_block().await?;
494 let right_exclusive = false;
495 let meta_offset = self.writer.data_len() as u64;
496
497 let filter_data = self.filter_builder.finish(self.memory_limiter.clone());
498 let (filter_type, filter_layout) = if filter_data.is_none() {
499 (
500 PbSstableFilterType::SstableFilterNone,
501 PbSstableFilterLayout::Unspecified,
502 )
503 } else if self.filter_builder.support_blocked_raw_data() {
504 (
505 self.filter_builder.filter_type(),
506 PbSstableFilterLayout::Blocked,
507 )
508 } else {
509 (
510 self.filter_builder.filter_type(),
511 PbSstableFilterLayout::Plain,
512 )
513 };
514
515 let total_key_count = self
516 .block_metas
517 .iter()
518 .map(|block_meta| block_meta.total_key_count as u64)
519 .sum::<u64>();
520 let stale_key_count = self
521 .block_metas
522 .iter()
523 .map(|block_meta| block_meta.stale_key_count as u64)
524 .sum::<u64>();
525 let uncompressed_file_size = self
526 .block_metas
527 .iter()
528 .map(|block_meta| block_meta.uncompressed_size as u64)
529 .sum::<u64>();
530
531 #[expect(deprecated)]
532 let mut meta = SstableMeta {
533 block_metas: self.block_metas,
534 bloom_filter: filter_data.unwrap_or_default(),
535 estimated_size: 0,
536 key_count: utils::checked_into_u32(total_key_count).unwrap_or_else(|_| {
537 panic!(
538 "WARN overflow can't convert total_key_count {} into u32 tables {:?}",
539 total_key_count, self.table_ids,
540 )
541 }),
542 smallest_key,
543 largest_key,
544 version: VERSION,
545 meta_offset,
546 monotonic_tombstone_events: vec![],
547 };
548
549 let meta_encode_size = meta.encoded_size();
550 let encoded_size_u32 = utils::checked_into_u32(meta_encode_size).unwrap_or_else(|_| {
551 panic!(
552 "WARN overflow can't convert meta_encoded_size {} into u32 tables {:?}",
553 meta_encode_size, self.table_ids,
554 )
555 });
556 let meta_offset_u32 = utils::checked_into_u32(meta_offset).unwrap_or_else(|_| {
557 panic!(
558 "WARN overflow can't convert meta_offset {} into u32 tables {:?}",
559 meta_offset, self.table_ids,
560 )
561 });
562 meta.estimated_size = encoded_size_u32
563 .checked_add(meta_offset_u32)
564 .unwrap_or_else(|| {
565 panic!(
566 "WARN overflow encoded_size_u32 {} meta_offset_u32 {} table_id {:?} table_ids {:?}",
567 encoded_size_u32, meta_offset_u32, self.last_table_id, self.table_ids
568 )
569 });
570
571 let (avg_key_size, avg_value_size) = if self.table_stats.is_empty() {
572 (0, 0)
573 } else {
574 let total_key_count: usize = self
575 .table_stats
576 .values()
577 .map(|s| s.total_key_count as usize)
578 .sum();
579
580 let total_key_size: usize = self
581 .table_stats
582 .values()
583 .map(|s| s.total_key_size as usize)
584 .sum();
585
586 let total_value_size: usize = self
587 .table_stats
588 .values()
589 .map(|s| s.total_value_size as usize)
590 .sum();
591
592 (
593 total_key_size.checked_div(total_key_count).unwrap_or(0),
594 total_value_size.checked_div(total_key_count).unwrap_or(0),
595 )
596 };
597
598 let (min_epoch, max_epoch) = {
599 if self.epoch_set.is_empty() {
600 (HummockEpoch::MAX, u64::MIN)
601 } else {
602 (
603 *self.epoch_set.first().unwrap(),
604 *self.epoch_set.last().unwrap(),
605 )
606 }
607 };
608
609 let vnode_user_key_ranges = self
610 .vnode_range_collector
611 .take()
612 .and_then(|collector| collector.finish(&self.last_full_key));
613
614 let sst_info: SstableInfo = SstableInfoInner {
615 object_id: self.sst_object_id,
616 sst_id: self.sst_object_id.as_raw_id().into(),
618 key_range: KeyRange {
619 left: Bytes::from(meta.smallest_key.clone()),
620 right: Bytes::from(meta.largest_key.clone()),
621 right_exclusive,
622 },
623 file_size: meta.estimated_size as u64,
624 table_ids: self.table_ids.into_iter().collect(),
625 meta_offset: meta.meta_offset,
626 stale_key_count,
627 total_key_count,
628 uncompressed_file_size: uncompressed_file_size + meta.encoded_size() as u64,
629 min_epoch,
630 max_epoch,
631 range_tombstone_count: 0,
632 sst_size: meta.estimated_size as u64,
633 filter_type,
634 filter_layout,
635 vnode_statistics: vnode_user_key_ranges,
636 }
637 .into();
638
639 tracing::trace!(
640 "meta_size {} filter_size {} add_key_counts {} stale_key_count {} min_epoch {} max_epoch {} epoch_count {}",
641 meta.encoded_size(),
642 meta.bloom_filter.len(),
643 total_key_count,
644 stale_key_count,
645 min_epoch,
646 max_epoch,
647 self.epoch_set.len()
648 );
649 let filter_size = meta.bloom_filter.len();
650 let sstable_file_size = sst_info.file_size as usize;
651
652 if !meta.block_metas.is_empty() {
653 let mut last_table_id = meta.block_metas[0].table_id();
655 let mut last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
656 for block_meta in &meta.block_metas {
657 let block_table_id = block_meta.table_id();
658 if last_table_id != block_table_id {
659 last_table_id = block_table_id;
660 last_table_stats = self.table_stats.get_mut(&last_table_id).unwrap();
661 }
662
663 last_table_stats.total_compressed_size += block_meta.len as u64;
664 }
665 }
666
667 let writer_output = self.writer.finish(meta).await?;
668 let now = SystemTime::now()
677 .duration_since(SystemTime::UNIX_EPOCH)
678 .expect("Clock may have gone backwards")
679 .as_secs();
680 Ok(SstableBuilderOutput::<W::Output> {
681 sst_info: LocalSstableInfo::new(sst_info, self.table_stats, now),
682 writer_output,
683 stats: SstableBuilderOutputStats {
684 filter_size,
685 avg_key_size,
686 avg_value_size,
687 epoch_count: self.epoch_set.len(),
688 block_size_vec: self.block_size_vec,
689 sstable_file_size,
690 },
691 })
692 }
693
694 pub fn approximate_len(&self) -> usize {
695 self.writer.data_len()
696 + self.block_builder.approximate_len()
697 + self.filter_builder.approximate_len()
698 }
699
700 pub async fn build_block(&mut self) -> HummockResult<()> {
701 if self.block_builder.is_empty() {
703 return Ok(());
704 }
705
706 let block_meta = self.block_metas.last_mut().unwrap();
707 let uncompressed_block_size = self.block_builder.uncompressed_block_size();
708 block_meta.uncompressed_size = utils::checked_into_u32(uncompressed_block_size)
709 .unwrap_or_else(|_| {
710 panic!(
711 "WARN overflow can't convert uncompressed_block_size {} into u32 table {:?}",
712 uncompressed_block_size,
713 self.block_builder.table_id(),
714 )
715 });
716 let block = self.block_builder.build();
717 self.writer.write_block(block, block_meta).await?;
718 self.block_size_vec.push(block.len());
719 let data_len = utils::checked_into_u32(self.writer.data_len()).unwrap_or_else(|_| {
720 panic!(
721 "WARN overflow can't convert writer_data_len {} into u32 table {:?}",
722 self.writer.data_len(),
723 self.block_builder.table_id(),
724 )
725 });
726 block_meta.len = data_len.checked_sub(block_meta.offset).unwrap_or_else(|| {
727 panic!(
728 "data_len should >= meta_offset, found data_len={}, meta_offset={}",
729 data_len, block_meta.offset
730 )
731 });
732
733 self.filter_builder
734 .switch_block(self.memory_limiter.clone());
735
736 if data_len as usize > self.options.capacity * 2 {
737 tracing::warn!(
738 "WARN unexpected block size {} table {:?}",
739 data_len,
740 self.block_builder.table_id()
741 );
742 }
743
744 self.block_builder.clear();
745 Ok(())
746 }
747
748 pub fn is_empty(&self) -> bool {
749 self.writer.data_len() > 0
750 }
751
752 pub fn reach_capacity(&self) -> bool {
754 self.approximate_len() >= self.options.capacity
755 }
756
757 fn finalize_last_table_stats(&mut self) {
758 if self.table_ids.is_empty() || self.last_table_id.is_none() {
759 return;
760 }
761 self.table_stats.insert(
762 self.last_table_id.unwrap(),
763 std::mem::take(&mut self.last_table_stats),
764 );
765 }
766}
767
768struct VnodeUserKeyRangeCollector {
770 max_bytes: usize,
771 current_size: usize,
772 ranges: BTreeMap<VirtualNode, (UserKey<Bytes>, UserKey<Bytes>)>,
773 current_vnode: VirtualNode,
774 range_start_key: Vec<u8>,
775}
776
777impl VnodeUserKeyRangeCollector {
778 fn new(max_bytes: usize) -> Self {
779 Self {
780 max_bytes,
781 current_size: 0,
782 ranges: BTreeMap::new(),
783 current_vnode: VirtualNode::ZERO,
784 range_start_key: Vec::new(),
785 }
786 }
787
788 fn with_limit(max_bytes: Option<usize>) -> Option<Self> {
789 max_bytes.filter(|&n| n > 0).map(Self::new)
790 }
791
792 fn observe_key(&mut self, vnode: VirtualNode, key: &[u8], prev_key: &[u8]) {
795 if self.current_size >= self.max_bytes {
796 return;
797 }
798
799 if self.range_start_key.is_empty() {
801 self.current_vnode = vnode;
802 self.range_start_key = key.to_vec();
803 return;
804 }
805
806 if vnode == self.current_vnode {
808 return;
809 }
810
811 self.seal_range(prev_key);
813
814 if self.current_size >= self.max_bytes {
816 return;
817 }
818
819 self.current_vnode = vnode;
821 self.range_start_key = key.to_vec();
822 }
823
824 fn seal_range(&mut self, right_key: &[u8]) {
826 let left_key = mem::take(&mut self.range_start_key);
827 self.current_size += mem::size_of::<VirtualNode>() + left_key.len() + right_key.len();
828
829 let left_full_key = FullKey::decode(&left_key);
830 let right_full_key = FullKey::decode(right_key);
831 let left_user_key = left_full_key.user_key.copy_into();
832 let right_user_key = right_full_key.user_key.copy_into();
833
834 assert_eq!(
839 left_user_key.get_vnode_id(),
840 right_user_key.get_vnode_id(),
841 "vnode changed within range: left_user {:?}, right_user {:?}",
842 left_user_key,
843 right_user_key
844 );
845 assert_eq!(
846 left_user_key.get_vnode_id(),
847 self.current_vnode.to_index(),
848 "vnode mismatch: left {:?}, right {:?}, expected vnode {}",
849 left_user_key,
850 right_user_key,
851 self.current_vnode.to_index()
852 );
853 assert_eq!(
854 left_user_key.table_id, right_user_key.table_id,
855 "table_id changed within range: left {:?}, right {:?}",
856 left_user_key, right_user_key
857 );
858
859 self.ranges
860 .insert(self.current_vnode, (left_user_key, right_user_key));
861 }
862
863 fn finish(mut self, last_key: &[u8]) -> Option<VnodeStatistics> {
865 if !self.range_start_key.is_empty() {
866 self.seal_range(last_key);
867 }
868
869 if self.ranges.len() > 1 {
870 let mut table_ids = self
872 .ranges
873 .values()
874 .flat_map(|(left, right)| [left.table_id, right.table_id]);
875 if let Some(first_table_id) = table_ids.next() {
876 for table_id in table_ids {
877 assert_eq!(
878 table_id, first_table_id,
879 "all vnode ranges must belong to the same table_id, found {:?} and {:?}",
880 table_id, first_table_id
881 );
882 }
883 }
884
885 Some(VnodeStatistics::from_map(self.ranges))
886 } else {
887 None
888 }
889 }
890}
891
892pub struct SstableBuilderOutputStats {
893 filter_size: usize,
894 avg_key_size: usize,
895 avg_value_size: usize,
896 epoch_count: usize,
897 block_size_vec: Vec<usize>, sstable_file_size: usize,
899}
900
901impl SstableBuilderOutputStats {
902 pub fn report_stats(&self, metrics: &Arc<CompactorMetrics>) {
903 if self.filter_size != 0 {
904 metrics
905 .sstable_bloom_filter_size
906 .observe(self.filter_size as _);
907 }
908
909 if self.sstable_file_size != 0 {
910 metrics
911 .sstable_file_size
912 .observe(self.sstable_file_size as _);
913 }
914
915 if self.avg_key_size != 0 {
916 metrics.sstable_avg_key_size.observe(self.avg_key_size as _);
917 }
918
919 if self.avg_value_size != 0 {
920 metrics
921 .sstable_avg_value_size
922 .observe(self.avg_value_size as _);
923 }
924
925 if self.epoch_count != 0 {
926 metrics
927 .sstable_distinct_epoch_count
928 .observe(self.epoch_count as _);
929 }
930
931 if !self.block_size_vec.is_empty() {
932 for block_size in &self.block_size_vec {
933 metrics.sstable_block_size.observe(*block_size as _);
934 }
935 }
936 }
937}
938
939#[cfg(test)]
940pub(super) mod tests {
941 use std::collections::{Bound, HashMap};
942
943 use risingwave_common::catalog::TableId;
944 use risingwave_common::hash::VirtualNode;
945 use risingwave_common::util::epoch::test_epoch;
946 use risingwave_hummock_sdk::key::UserKey;
947
948 use super::*;
949 use crate::assert_bytes_eq;
950 use crate::compaction_catalog_manager::{
951 CompactionCatalogAgent, DummyFilterKeyExtractor, MultiFilterKeyExtractor,
952 };
953 use crate::hummock::iterator::test_utils::mock_sstable_store;
954 use crate::hummock::sstable::xor_filter::BlockedXor16FilterBuilder;
955 use crate::hummock::test_utils::{
956 TEST_KEYS_COUNT, default_builder_opt_for_test, gen_test_sstable_impl, mock_sst_writer,
957 test_key_of, test_value_of,
958 };
959 use crate::hummock::{CachePolicy, Sstable, SstableWriterOptions, Xor8FilterBuilder};
960 use crate::monitor::StoreLocalStatistic;
961
962 #[tokio::test]
963 async fn test_empty() {
964 let opt = SstableBuilderOptions {
965 capacity: 0,
966 block_capacity: 4096,
967 restart_interval: 16,
968 bloom_false_positive: 0.001,
969 ..Default::default()
970 };
971
972 let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
973 let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
974 let b = SstableBuilder::for_test(
975 0,
976 mock_sst_writer(&opt),
977 opt,
978 table_id_to_vnode,
979 table_id_to_watermark_serde,
980 );
981
982 b.finish().await.unwrap();
983 }
984
985 fn encode_full_key(vnode: VirtualNode, table_key_suffix: &[u8]) -> Vec<u8> {
986 let mut table_key = vnode.to_be_bytes().to_vec();
987 table_key.extend_from_slice(table_key_suffix);
988 FullKey::for_test(TableId::default(), table_key, 0).encode()
989 }
990
991 fn table_key_of(vnode: VirtualNode, suffix: &[u8]) -> Vec<u8> {
992 let mut key = vnode.to_be_bytes().to_vec();
993 key.extend_from_slice(suffix);
994 key
995 }
996
997 #[test]
998 fn test_vnode_user_key_range_basic_collection() {
999 let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(1024)).unwrap();
1002 let vnode_1 = VirtualNode::from_index(1);
1003 let vnode_2 = VirtualNode::from_index(2);
1004
1005 let k1 = encode_full_key(vnode_1, b"k1");
1006 let k2 = encode_full_key(vnode_1, b"k2");
1007 let k3 = encode_full_key(vnode_2, b"k3");
1008 let k4 = encode_full_key(vnode_2, b"k4");
1009
1010 collector.observe_key(vnode_1, &k1, &[]);
1011 collector.observe_key(vnode_1, &k2, &k1);
1012 collector.observe_key(vnode_2, &k3, &k2);
1013 collector.observe_key(vnode_2, &k4, &k3);
1014
1015 let info = collector.finish(&k4).unwrap();
1016 assert_eq!(info.vnode_user_key_ranges().len(), 2);
1017
1018 let (range1_left, range1_right) = info.get_vnode_user_key_range(vnode_1).unwrap();
1020 assert_eq!(range1_left.table_key.as_ref(), table_key_of(vnode_1, b"k1"));
1021 assert_eq!(
1022 range1_right.table_key.as_ref(),
1023 table_key_of(vnode_1, b"k2")
1024 );
1025
1026 let (range2_left, range2_right) = info.get_vnode_user_key_range(vnode_2).unwrap();
1028 assert_eq!(range2_left.table_key.as_ref(), table_key_of(vnode_2, b"k3"));
1029 assert_eq!(
1030 range2_right.table_key.as_ref(),
1031 table_key_of(vnode_2, b"k4")
1032 );
1033 }
1034
1035 #[test]
1036 fn test_vnode_user_key_range_capacity_limit() {
1037 let vnode_1 = VirtualNode::from_index(1);
1047 let vnode_2 = VirtualNode::from_index(2);
1048 let vnode_3 = VirtualNode::from_index(3);
1049 let vnode_4 = VirtualNode::from_index(4);
1050
1051 let k1 = encode_full_key(vnode_1, b"k1");
1052 let k2 = encode_full_key(vnode_1, b"k2");
1053 let k3 = encode_full_key(vnode_2, b"k3");
1054 let k4 = encode_full_key(vnode_2, b"k4");
1055 let k5 = encode_full_key(vnode_3, b"k5");
1056 let k6 = encode_full_key(vnode_3, b"k6");
1057 let k7 = encode_full_key(vnode_4, b"k7");
1058
1059 let range_size = mem::size_of::<VirtualNode>() + k1.len() + k2.len();
1060 let estimated_next_size = mem::size_of::<VirtualNode>() + k3.len();
1061 let limit = range_size + estimated_next_size; let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(limit)).unwrap();
1063
1064 collector.observe_key(vnode_1, &k1, &[]);
1065 collector.observe_key(vnode_1, &k2, &k1);
1066 collector.observe_key(vnode_2, &k3, &k2); collector.observe_key(vnode_2, &k4, &k3);
1068 collector.observe_key(vnode_3, &k5, &k4); collector.observe_key(vnode_3, &k6, &k5); collector.observe_key(vnode_4, &k7, &k6); let info = collector.finish(&k7).unwrap();
1073 assert_eq!(
1074 info.vnode_user_key_ranges().len(),
1075 2,
1076 "Should collect exactly 2 vnodes"
1077 );
1078
1079 let (range1_left, range1_right) = info.get_vnode_user_key_range(vnode_1).unwrap();
1081 assert_eq!(range1_left.table_key.as_ref(), table_key_of(vnode_1, b"k1"));
1082 assert_eq!(
1083 range1_right.table_key.as_ref(),
1084 table_key_of(vnode_1, b"k2")
1085 );
1086
1087 let (range2_left, range2_right) = info.get_vnode_user_key_range(vnode_2).unwrap();
1088 assert_eq!(range2_left.table_key.as_ref(), table_key_of(vnode_2, b"k3"));
1089 assert_eq!(
1090 range2_right.table_key.as_ref(),
1091 table_key_of(vnode_2, b"k4")
1092 );
1093
1094 assert!(info.get_vnode_user_key_range(vnode_3).is_none());
1096 assert!(info.get_vnode_user_key_range(vnode_4).is_none());
1097 }
1098
1099 #[test]
1100 fn test_vnode_user_key_range_sparse_distribution() {
1101 let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(1024)).unwrap();
1104 let vnode_5 = VirtualNode::from_index(5);
1105 let vnode_10 = VirtualNode::from_index(10);
1106 let vnode_100 = VirtualNode::from_index(100);
1107
1108 let keys = vec![
1109 (vnode_5, encode_full_key(vnode_5, b"key_005_001")),
1110 (vnode_5, encode_full_key(vnode_5, b"key_005_002")),
1111 (vnode_5, encode_full_key(vnode_5, b"key_005_999")),
1112 (vnode_10, encode_full_key(vnode_10, b"key_010_001")),
1113 (vnode_10, encode_full_key(vnode_10, b"key_010_100")),
1114 (vnode_100, encode_full_key(vnode_100, b"key_100_001")),
1115 (vnode_100, encode_full_key(vnode_100, b"key_100_999")),
1116 ];
1117
1118 let mut prev_key = Vec::new();
1119 for (vnode, key) in &keys {
1120 collector.observe_key(*vnode, key, &prev_key);
1121 prev_key = key.clone();
1122 }
1123
1124 let info = collector.finish(&prev_key).unwrap();
1125 assert_eq!(info.vnode_user_key_ranges().len(), 3);
1126
1127 let (range5_left, range5_right) = info.get_vnode_user_key_range(vnode_5).unwrap();
1129 assert_eq!(
1130 range5_left.table_key.as_ref(),
1131 table_key_of(vnode_5, b"key_005_001")
1132 );
1133 assert_eq!(
1134 range5_right.table_key.as_ref(),
1135 table_key_of(vnode_5, b"key_005_999")
1136 );
1137
1138 let (range10_left, range10_right) = info.get_vnode_user_key_range(vnode_10).unwrap();
1139 assert_eq!(
1140 range10_left.table_key.as_ref(),
1141 table_key_of(vnode_10, b"key_010_001")
1142 );
1143 assert_eq!(
1144 range10_right.table_key.as_ref(),
1145 table_key_of(vnode_10, b"key_010_100")
1146 );
1147
1148 let (range100_left, range100_right) = info.get_vnode_user_key_range(vnode_100).unwrap();
1149 assert_eq!(
1150 range100_left.table_key.as_ref(),
1151 table_key_of(vnode_100, b"key_100_001")
1152 );
1153 assert_eq!(
1154 range100_right.table_key.as_ref(),
1155 table_key_of(vnode_100, b"key_100_999")
1156 );
1157
1158 assert!(
1160 info.get_vnode_user_key_range(VirtualNode::from_index(0))
1161 .is_none()
1162 );
1163 assert!(
1164 info.get_vnode_user_key_range(VirtualNode::from_index(7))
1165 .is_none()
1166 );
1167 assert!(
1168 info.get_vnode_user_key_range(VirtualNode::from_index(50))
1169 .is_none()
1170 );
1171 }
1172
1173 #[test]
1174 fn test_vnode_user_key_range_edge_cases() {
1175 assert!(VnodeUserKeyRangeCollector::with_limit(None).is_none());
1177 assert!(VnodeUserKeyRangeCollector::with_limit(Some(0)).is_none());
1178
1179 let collector = VnodeUserKeyRangeCollector::with_limit(Some(1024)).unwrap();
1181 assert!(
1182 collector
1183 .finish(&encode_full_key(VirtualNode::ZERO, b"any"))
1184 .is_none()
1185 );
1186
1187 let vnode = VirtualNode::from_index(5);
1189 let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(1024)).unwrap();
1190 let a = encode_full_key(vnode, b"a");
1191 let b = encode_full_key(vnode, b"b");
1192 let c = encode_full_key(vnode, b"c");
1193 collector.observe_key(vnode, &a, &[]);
1194 collector.observe_key(vnode, &b, &a);
1195 collector.observe_key(vnode, &c, &b);
1196 assert!(
1197 collector.finish(&c).is_none(),
1198 "Single-vnode SST should not emit hints"
1199 );
1200
1201 let mut collector = VnodeUserKeyRangeCollector::with_limit(Some(1)).unwrap();
1206 let vnode_1 = VirtualNode::from_index(1);
1207 let vnode_2 = VirtualNode::from_index(2);
1208 let key1 = encode_full_key(vnode_1, b"key1");
1209 let key2 = encode_full_key(vnode_1, b"key2");
1210 let key3 = encode_full_key(vnode_2, b"key3");
1211 collector.observe_key(vnode_1, &key1, &[]);
1212 collector.observe_key(vnode_1, &key2, &key1);
1213 collector.observe_key(vnode_2, &key3, &key2); assert!(
1215 collector.finish(&key3).is_none(),
1216 "Only 1 vnode collected, should return None"
1217 );
1218 }
1219
1220 #[tokio::test]
1221 async fn test_basic() {
1222 let opt = default_builder_opt_for_test();
1223
1224 let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
1225 let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
1226 let mut b = SstableBuilder::for_test(
1227 0,
1228 mock_sst_writer(&opt),
1229 opt,
1230 table_id_to_vnode,
1231 table_id_to_watermark_serde,
1232 );
1233
1234 for i in 0..TEST_KEYS_COUNT {
1235 b.add_for_test(
1236 test_key_of(i).to_ref(),
1237 HummockValue::put(&test_value_of(i)),
1238 )
1239 .await
1240 .unwrap();
1241 }
1242
1243 let output = b.finish().await.unwrap();
1244 let info = output.sst_info.sst_info;
1245
1246 assert_bytes_eq!(test_key_of(0).encode(), info.key_range.left);
1247 assert_bytes_eq!(
1248 test_key_of(TEST_KEYS_COUNT - 1).encode(),
1249 info.key_range.right
1250 );
1251 let (data, meta) = output.writer_output;
1252 assert_eq!(info.file_size, meta.estimated_size as u64);
1253 let offset = info.meta_offset as usize;
1254 let meta2 = SstableMeta::decode(&data[offset..]).unwrap();
1255 assert_eq!(meta2, meta);
1256 }
1257
1258 async fn test_with_xor_filter_builder<F: FilterBuilder>(
1259 bloom_false_positive: f64,
1260 expected_filter_type: PbSstableFilterType,
1261 expected_filter_layout: PbSstableFilterLayout,
1262 ) {
1263 let key_count = 1000;
1264
1265 let opts = SstableBuilderOptions {
1266 capacity: 0,
1267 block_capacity: 4096,
1268 restart_interval: 16,
1269 bloom_false_positive,
1270 ..Default::default()
1271 };
1272
1273 let sstable_store = mock_sstable_store().await;
1275 let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
1276 let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
1277 let sst_info = gen_test_sstable_impl::<Vec<u8>, F>(
1278 opts,
1279 0,
1280 (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
1281 sstable_store.clone(),
1282 CachePolicy::NotFill,
1283 table_id_to_vnode,
1284 table_id_to_watermark_serde,
1285 )
1286 .await;
1287 assert_eq!(sst_info.filter_type, expected_filter_type);
1288 assert_eq!(sst_info.filter_layout, expected_filter_layout);
1289 let table = sstable_store
1290 .sstable(&sst_info, &mut StoreLocalStatistic::default())
1291 .await
1292 .unwrap();
1293
1294 assert!(table.has_filter());
1295 for i in 0..key_count {
1296 let full_key = test_key_of(i);
1297 let hash = Sstable::hash_for_filter(full_key.user_key.encode().as_slice(), 0);
1298 let key_ref = full_key.user_key.as_ref();
1299 assert!(
1300 table.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash),
1301 "failed at {}",
1302 i
1303 );
1304 }
1305 }
1306
1307 #[tokio::test]
1308 async fn test_xor_filter_builder_output() {
1309 test_with_xor_filter_builder::<Xor16FilterBuilder>(
1310 0.0,
1311 PbSstableFilterType::SstableFilterXor16,
1312 PbSstableFilterLayout::Plain,
1313 )
1314 .await;
1315 test_with_xor_filter_builder::<Xor16FilterBuilder>(
1316 0.01,
1317 PbSstableFilterType::SstableFilterXor16,
1318 PbSstableFilterLayout::Plain,
1319 )
1320 .await;
1321 test_with_xor_filter_builder::<Xor8FilterBuilder>(
1322 0.01,
1323 PbSstableFilterType::SstableFilterXor8,
1324 PbSstableFilterLayout::Plain,
1325 )
1326 .await;
1327 test_with_xor_filter_builder::<BlockedXor16FilterBuilder>(
1328 0.01,
1329 PbSstableFilterType::SstableFilterXor16,
1330 PbSstableFilterLayout::Blocked,
1331 )
1332 .await;
1333 }
1334
1335 #[tokio::test]
1336 async fn test_no_xor_filter_block() {
1337 let opts = SstableBuilderOptions::default();
1338 let sstable_store = mock_sstable_store().await;
1340 let writer_opts = SstableWriterOptions::default();
1341 let object_id = 1;
1342 let writer = sstable_store
1343 .clone()
1344 .create_sst_writer(object_id, writer_opts);
1345 let mut filter = MultiFilterKeyExtractor::default();
1346 filter.register(
1347 1.into(),
1348 FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
1349 );
1350 filter.register(
1351 2.into(),
1352 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
1353 );
1354 filter.register(
1355 3.into(),
1356 FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
1357 );
1358
1359 let table_id_to_vnode = HashMap::from_iter(vec![
1360 (1.into(), VirtualNode::COUNT_FOR_TEST),
1361 (2.into(), VirtualNode::COUNT_FOR_TEST),
1362 (3.into(), VirtualNode::COUNT_FOR_TEST),
1363 ]);
1364 let table_id_to_watermark_serde =
1365 HashMap::from_iter(vec![(1.into(), None), (2.into(), None), (3.into(), None)]);
1366
1367 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
1368 FilterKeyExtractorImpl::Multi(filter),
1369 table_id_to_vnode,
1370 table_id_to_watermark_serde,
1371 HashMap::default(),
1372 ));
1373
1374 let mut builder = SstableBuilder::new(
1375 object_id,
1376 writer,
1377 BlockedXor16FilterBuilder::create(opts.filter_builder_options()),
1378 opts,
1379 compaction_catalog_agent_ref,
1380 None,
1381 );
1382
1383 let key_count: usize = 10000;
1384 for table_id in 1..4 {
1385 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
1386 for idx in 0..key_count {
1387 table_key.resize(VirtualNode::SIZE, 0);
1388 table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
1389 let k = UserKey::for_test(TableId::new(table_id), table_key.as_ref());
1390 let v = test_value_of(idx);
1391 builder
1392 .add(
1393 FullKey::from_user_key(k, test_epoch(1)),
1394 HummockValue::put(v.as_ref()),
1395 )
1396 .await
1397 .unwrap();
1398 }
1399 }
1400 let ret = builder.finish().await.unwrap();
1401 let sst_info = ret.sst_info.sst_info.clone();
1402 ret.writer_output.await.unwrap().unwrap();
1403 let table = sstable_store
1404 .sstable(&sst_info, &mut StoreLocalStatistic::default())
1405 .await
1406 .unwrap();
1407 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
1408 for idx in 0..key_count {
1409 table_key.resize(VirtualNode::SIZE, 0);
1410 table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
1411 let k = UserKey::for_test(TableId::new(2), table_key.as_slice());
1412 let hash = Sstable::hash_for_filter(&k.encode(), 2);
1413 let key_ref = k.as_ref();
1414 assert!(
1415 table.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash)
1416 );
1417 }
1418 }
1419}