1use std::collections::BTreeMap;
16use std::mem::size_of;
17use std::ops::Deref;
18use std::sync::Arc;
19
20use bytes::Bytes;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_pb::hummock::{
24 PbBloomFilterType, PbKeyRange, PbSstableFilterLayout, PbSstableFilterType, PbSstableInfo,
25 PbVnodeStatistics, PbVnodeUserKeyRange,
26};
27
28use crate::key::UserKey;
29use crate::key_range::KeyRange;
30use crate::version::{ObjectIdReader, SstableIdReader};
31use crate::{HummockSstableId, HummockSstableObjectId};
32
33pub type VnodeUserKeyRange = (UserKey<Bytes>, UserKey<Bytes>);
34
35#[derive(Debug, PartialEq, Clone, Default)]
36pub struct VnodeStatistics {
37 vnode_user_key_ranges: BTreeMap<VirtualNode, VnodeUserKeyRange>,
39}
40
41#[derive(Debug, PartialEq, Clone)]
42pub struct SstableInfoInner {
43 pub object_id: HummockSstableObjectId,
44 pub sst_id: HummockSstableId,
45 pub key_range: KeyRange,
46 pub file_size: u64,
47 pub table_ids: Vec<TableId>,
48 pub meta_offset: u64,
49 pub stale_key_count: u64,
50 pub total_key_count: u64,
51 pub min_epoch: u64,
52 pub max_epoch: u64,
53 pub uncompressed_file_size: u64,
54 pub range_tombstone_count: u64,
55 pub filter_type: PbSstableFilterType,
56 pub filter_layout: PbSstableFilterLayout,
57 pub sst_size: u64,
58 pub vnode_statistics: Option<VnodeStatistics>,
59}
60
61impl SstableInfoInner {
62 pub fn estimated_encode_len(&self) -> usize {
63 let mut basic = size_of::<u64>() + size_of::<u64>() + size_of::<u64>() + self.table_ids.len() * size_of::<u32>() + size_of::<u64>() + size_of::<u64>() + size_of::<u64>() + size_of::<u64>() + size_of::<u64>() + size_of::<u64>() + size_of::<u64>() + size_of::<u64>(); basic += size_of::<u32>(); if self.filter_layout != PbSstableFilterLayout::Unspecified {
77 basic += size_of::<u32>(); }
79 basic += self.key_range.left.len() + self.key_range.right.len() + size_of::<bool>();
80 if let Some(vnode_statistics) = &self.vnode_statistics {
81 for (min_key, max_key) in vnode_statistics.vnode_user_key_ranges.values() {
82 basic += size_of::<u32>() + min_key.encoded_len() + max_key.encoded_len();
83 }
84 }
85
86 basic
87 }
88
89 pub fn to_protobuf(&self) -> PbSstableInfo {
90 self.into()
91 }
92}
93
94#[cfg(any(test, feature = "test"))]
95impl Default for SstableInfoInner {
96 fn default() -> Self {
97 PbSstableInfo::default().into()
98 }
99}
100
101fn filter_metadata_from_pb(
102 filter_type: Option<i32>,
103 filter_layout: Option<i32>,
104 bloom_filter_kind: i32,
105) -> (PbSstableFilterType, PbSstableFilterLayout) {
106 let bloom_filter_kind = PbBloomFilterType::try_from(bloom_filter_kind)
107 .expect("invalid legacy bloom_filter_kind in SST info");
108
109 if let Some(filter_type) = filter_type {
112 assert_eq!(
113 bloom_filter_kind,
114 PbBloomFilterType::BloomFilterUnspecified,
115 "new SST filter metadata must not set legacy bloom_filter_kind"
116 );
117 let filter_type =
118 PbSstableFilterType::try_from(filter_type).expect("invalid filter_type in SST info");
119 assert_ne!(
120 filter_type,
121 PbSstableFilterType::SstableFilterUnspecified,
122 "new SST filter metadata must use a resolved filter_type"
123 );
124
125 let filter_layout = filter_layout
126 .map(|filter_layout| {
127 PbSstableFilterLayout::try_from(filter_layout)
128 .expect("invalid filter_layout in SST info")
129 })
130 .unwrap_or(PbSstableFilterLayout::Unspecified);
131 if filter_type == PbSstableFilterType::SstableFilterNone {
132 assert_eq!(
133 filter_layout,
134 PbSstableFilterLayout::Unspecified,
135 "SST filter metadata with no filter must not set filter_layout"
136 );
137 }
138
139 return (filter_type, filter_layout);
140 }
141
142 assert!(
143 filter_layout.is_none(),
144 "legacy SST filter metadata must not set filter_layout"
145 );
146 match bloom_filter_kind {
147 PbBloomFilterType::BloomFilterUnspecified => (
148 PbSstableFilterType::SstableFilterNone,
149 PbSstableFilterLayout::Unspecified,
150 ),
151 PbBloomFilterType::Sstable => (
152 PbSstableFilterType::SstableFilterXor16,
153 PbSstableFilterLayout::Plain,
154 ),
155 PbBloomFilterType::Blocked => (
156 PbSstableFilterType::SstableFilterXor16,
157 PbSstableFilterLayout::Blocked,
158 ),
159 }
160}
161
162fn assert_resolved_filter_metadata(
163 filter_type: PbSstableFilterType,
164 filter_layout: PbSstableFilterLayout,
165) {
166 assert_ne!(
167 filter_type,
168 PbSstableFilterType::SstableFilterUnspecified,
169 "SST filter metadata must use a resolved filter_type"
170 );
171 if filter_type == PbSstableFilterType::SstableFilterNone {
172 assert_eq!(
173 filter_layout,
174 PbSstableFilterLayout::Unspecified,
175 "SST filter metadata with no filter must not set filter_layout"
176 );
177 }
178}
179
180impl From<&PbVnodeStatistics> for VnodeStatistics {
181 fn from(info: &PbVnodeStatistics) -> Self {
182 Self {
183 vnode_user_key_ranges: info
184 .vnode_user_key_ranges
185 .iter()
186 .map(|(&vnode, range)| {
187 let min_key = UserKey::decode(&range.min_key).copy_into();
188 let max_key = UserKey::decode(&range.max_key).copy_into();
189
190 assert_eq!(min_key.table_id, max_key.table_id);
192 assert_eq!(min_key.get_vnode_id(), max_key.get_vnode_id());
193
194 (VirtualNode::from_index(vnode as usize), (min_key, max_key))
195 })
196 .collect(),
197 }
198 }
199}
200
201impl From<PbVnodeStatistics> for VnodeStatistics {
202 fn from(info: PbVnodeStatistics) -> Self {
203 (&info).into()
204 }
205}
206
207impl From<&VnodeStatistics> for PbVnodeStatistics {
208 fn from(info: &VnodeStatistics) -> Self {
209 Self {
210 vnode_user_key_ranges: info
211 .vnode_user_key_ranges
212 .iter()
213 .map(|(vnode, (min_key, max_key))| {
214 (
215 vnode.to_index() as u32,
216 PbVnodeUserKeyRange {
217 min_key: min_key.encode(),
218 max_key: max_key.encode(),
219 },
220 )
221 })
222 .collect(),
223 }
224 }
225}
226
227impl From<VnodeStatistics> for PbVnodeStatistics {
228 fn from(info: VnodeStatistics) -> Self {
229 (&info).into()
230 }
231}
232
233impl From<PbSstableInfo> for SstableInfoInner {
234 #[expect(
235 deprecated,
236 reason = "read legacy SST filter metadata for compatibility"
237 )]
238 fn from(pb_sstable_info: PbSstableInfo) -> Self {
239 assert!(pb_sstable_info.table_ids.is_sorted());
240 let (filter_type, filter_layout) = filter_metadata_from_pb(
241 pb_sstable_info.filter_type,
242 pb_sstable_info.filter_layout,
243 pb_sstable_info.bloom_filter_kind,
244 );
245 Self {
246 object_id: pb_sstable_info.object_id,
247 sst_id: pb_sstable_info.sst_id,
248 key_range: {
249 if let Some(pb_keyrange) = pb_sstable_info.key_range {
251 KeyRange {
252 left: pb_keyrange.left.into(),
253 right: pb_keyrange.right.into(),
254 right_exclusive: pb_keyrange.right_exclusive,
255 }
256 } else {
257 KeyRange::inf()
258 }
259 },
260 file_size: pb_sstable_info.file_size,
261 table_ids: pb_sstable_info.table_ids,
262 meta_offset: pb_sstable_info.meta_offset,
263 stale_key_count: pb_sstable_info.stale_key_count,
264 total_key_count: pb_sstable_info.total_key_count,
265 min_epoch: pb_sstable_info.min_epoch,
266 max_epoch: pb_sstable_info.max_epoch,
267 uncompressed_file_size: pb_sstable_info.uncompressed_file_size,
268 range_tombstone_count: pb_sstable_info.range_tombstone_count,
269 filter_type,
270 filter_layout,
271 sst_size: if pb_sstable_info.sst_size == 0 {
272 pb_sstable_info.file_size
273 } else {
274 pb_sstable_info.sst_size
275 },
276 vnode_statistics: pb_sstable_info
277 .vnode_statistics
278 .as_ref()
279 .map(VnodeStatistics::from),
280 }
281 }
282}
283
284impl From<&PbSstableInfo> for SstableInfoInner {
285 #[expect(
286 deprecated,
287 reason = "read legacy SST filter metadata for compatibility"
288 )]
289 fn from(pb_sstable_info: &PbSstableInfo) -> Self {
290 assert!(pb_sstable_info.table_ids.is_sorted());
291 let (filter_type, filter_layout) = filter_metadata_from_pb(
292 pb_sstable_info.filter_type,
293 pb_sstable_info.filter_layout,
294 pb_sstable_info.bloom_filter_kind,
295 );
296 Self {
297 object_id: pb_sstable_info.object_id,
298 sst_id: pb_sstable_info.sst_id,
299 key_range: {
300 if let Some(pb_keyrange) = &pb_sstable_info.key_range {
301 KeyRange {
302 left: pb_keyrange.left.clone().into(),
303 right: pb_keyrange.right.clone().into(),
304 right_exclusive: pb_keyrange.right_exclusive,
305 }
306 } else {
307 KeyRange::inf()
308 }
309 },
310 file_size: pb_sstable_info.file_size,
311 table_ids: pb_sstable_info.table_ids.clone(),
312 meta_offset: pb_sstable_info.meta_offset,
313 stale_key_count: pb_sstable_info.stale_key_count,
314 total_key_count: pb_sstable_info.total_key_count,
315 min_epoch: pb_sstable_info.min_epoch,
316 max_epoch: pb_sstable_info.max_epoch,
317 uncompressed_file_size: pb_sstable_info.uncompressed_file_size,
318 range_tombstone_count: pb_sstable_info.range_tombstone_count,
319 filter_type,
320 filter_layout,
321 sst_size: if pb_sstable_info.sst_size == 0 {
322 pb_sstable_info.file_size
323 } else {
324 pb_sstable_info.sst_size
325 },
326 vnode_statistics: pb_sstable_info
327 .vnode_statistics
328 .as_ref()
329 .map(VnodeStatistics::from),
330 }
331 }
332}
333
334impl From<SstableInfoInner> for PbSstableInfo {
335 #[expect(deprecated, reason = "write legacy SST filter metadata as unset")]
336 fn from(sstable_info: SstableInfoInner) -> Self {
337 assert!(sstable_info.table_ids.is_sorted());
338 assert_resolved_filter_metadata(sstable_info.filter_type, sstable_info.filter_layout);
339 PbSstableInfo {
340 object_id: sstable_info.object_id,
341 sst_id: sstable_info.sst_id,
342 key_range: {
343 let keyrange = sstable_info.key_range;
344 if keyrange.inf_key_range() {
345 None
349 } else {
350 let pb_key_range = PbKeyRange {
351 left: keyrange.left.into(),
352 right: keyrange.right.into(),
353 right_exclusive: keyrange.right_exclusive,
354 };
355 Some(pb_key_range)
356 }
357 },
358
359 file_size: sstable_info.file_size,
360 table_ids: sstable_info.table_ids,
361 meta_offset: sstable_info.meta_offset,
362 stale_key_count: sstable_info.stale_key_count,
363 total_key_count: sstable_info.total_key_count,
364 min_epoch: sstable_info.min_epoch,
365 max_epoch: sstable_info.max_epoch,
366 uncompressed_file_size: sstable_info.uncompressed_file_size,
367 range_tombstone_count: sstable_info.range_tombstone_count,
368 bloom_filter_kind: PbBloomFilterType::BloomFilterUnspecified as i32,
369 filter_type: Some(sstable_info.filter_type.into()),
370 filter_layout: (sstable_info.filter_layout != PbSstableFilterLayout::Unspecified)
371 .then_some(sstable_info.filter_layout.into()),
372 sst_size: sstable_info.sst_size,
373 vnode_statistics: sstable_info
374 .vnode_statistics
375 .as_ref()
376 .map(PbVnodeStatistics::from),
377 }
378 }
379}
380
381impl From<&SstableInfoInner> for PbSstableInfo {
382 #[expect(deprecated, reason = "write legacy SST filter metadata as unset")]
383 fn from(sstable_info: &SstableInfoInner) -> Self {
384 assert!(sstable_info.table_ids.is_sorted());
385 assert_resolved_filter_metadata(sstable_info.filter_type, sstable_info.filter_layout);
386 PbSstableInfo {
387 object_id: sstable_info.object_id,
388 sst_id: sstable_info.sst_id,
389 key_range: {
390 let keyrange = &sstable_info.key_range;
391 if keyrange.inf_key_range() {
392 None
393 } else {
394 let pb_key_range = PbKeyRange {
395 left: keyrange.left.to_vec(),
396 right: keyrange.right.to_vec(),
397 right_exclusive: keyrange.right_exclusive,
398 };
399 Some(pb_key_range)
400 }
401 },
402
403 file_size: sstable_info.file_size,
404 table_ids: sstable_info.table_ids.clone(),
405 meta_offset: sstable_info.meta_offset,
406 stale_key_count: sstable_info.stale_key_count,
407 total_key_count: sstable_info.total_key_count,
408 min_epoch: sstable_info.min_epoch,
409 max_epoch: sstable_info.max_epoch,
410 uncompressed_file_size: sstable_info.uncompressed_file_size,
411 range_tombstone_count: sstable_info.range_tombstone_count,
412 bloom_filter_kind: PbBloomFilterType::BloomFilterUnspecified as i32,
413 filter_type: Some(sstable_info.filter_type.into()),
414 filter_layout: (sstable_info.filter_layout != PbSstableFilterLayout::Unspecified)
415 .then_some(sstable_info.filter_layout.into()),
416 sst_size: sstable_info.sst_size,
417 vnode_statistics: sstable_info
418 .vnode_statistics
419 .as_ref()
420 .map(PbVnodeStatistics::from),
421 }
422 }
423}
424
425impl VnodeStatistics {
426 pub fn from_map(vnode_user_key_ranges: BTreeMap<VirtualNode, VnodeUserKeyRange>) -> Self {
427 Self {
428 vnode_user_key_ranges,
429 }
430 }
431
432 pub fn get_vnode_user_key_range(&self, vnode: VirtualNode) -> Option<&VnodeUserKeyRange> {
434 self.vnode_user_key_ranges.get(&vnode)
435 }
436
437 #[cfg(any(test, feature = "test"))]
438 pub fn vnode_user_key_ranges(&self) -> &BTreeMap<VirtualNode, VnodeUserKeyRange> {
439 &self.vnode_user_key_ranges
440 }
441}
442
443impl SstableInfo {
444 pub fn remove_key_range(&mut self) {
445 let mut sst = self.get_inner();
446 sst.key_range = KeyRange::default();
447 *self = sst.into()
448 }
449}
450
451impl SstableIdReader for SstableInfoInner {
452 fn sst_id(&self) -> HummockSstableId {
453 self.sst_id
454 }
455}
456
457impl ObjectIdReader for SstableInfoInner {
458 fn object_id(&self) -> HummockSstableObjectId {
459 self.object_id
460 }
461}
462
463#[derive(Debug, PartialEq, Clone)]
464#[cfg_attr(any(test, feature = "test"), derive(Default))]
465pub struct SstableInfo(Arc<SstableInfoInner>);
466
467impl From<&PbSstableInfo> for SstableInfo {
468 fn from(s: &PbSstableInfo) -> Self {
469 SstableInfo(SstableInfoInner::from(s).into())
470 }
471}
472
473impl From<PbSstableInfo> for SstableInfo {
474 fn from(s: PbSstableInfo) -> Self {
475 SstableInfo(SstableInfoInner::from(s).into())
476 }
477}
478
479impl From<SstableInfo> for PbSstableInfo {
480 fn from(s: SstableInfo) -> Self {
481 (&s).into()
482 }
483}
484
485impl From<SstableInfoInner> for SstableInfo {
486 fn from(s: SstableInfoInner) -> Self {
487 Self(s.into())
488 }
489}
490
491impl From<&SstableInfo> for PbSstableInfo {
492 fn from(s: &SstableInfo) -> Self {
493 s.0.as_ref().into()
494 }
495}
496
497impl Deref for SstableInfo {
498 type Target = SstableInfoInner;
499
500 fn deref(&self) -> &Self::Target {
501 &self.0
502 }
503}
504
505impl SstableInfo {
506 pub fn get_inner(&self) -> SstableInfoInner {
507 (*self.0).clone()
508 }
509
510 pub fn set_inner(&mut self, inner: SstableInfoInner) {
511 self.0 = Arc::new(inner);
512 }
513}
514
515impl SstableIdReader for SstableInfo {
516 fn sst_id(&self) -> HummockSstableId {
517 self.sst_id
518 }
519}
520
521impl ObjectIdReader for SstableInfo {
522 fn object_id(&self) -> HummockSstableObjectId {
523 self.object_id
524 }
525}
526
527#[cfg(test)]
528mod tests {
529 use super::*;
530
531 #[test]
532 #[expect(deprecated, reason = "test legacy SST filter metadata compatibility")]
533 fn test_filter_metadata_from_pb() {
534 let cases = [
535 (
536 "new filter metadata",
537 None,
538 Some(PbSstableFilterType::SstableFilterXor8),
539 Some(PbSstableFilterLayout::Plain),
540 PbSstableFilterType::SstableFilterXor8,
541 PbSstableFilterLayout::Plain,
542 ),
543 (
544 "legacy blocked bloom kind",
545 Some(PbBloomFilterType::Blocked),
546 None,
547 None,
548 PbSstableFilterType::SstableFilterXor16,
549 PbSstableFilterLayout::Blocked,
550 ),
551 (
552 "legacy plain bloom kind",
553 Some(PbBloomFilterType::Sstable),
554 None,
555 None,
556 PbSstableFilterType::SstableFilterXor16,
557 PbSstableFilterLayout::Plain,
558 ),
559 (
560 "explicit no filter",
561 None,
562 Some(PbSstableFilterType::SstableFilterNone),
563 None,
564 PbSstableFilterType::SstableFilterNone,
565 PbSstableFilterLayout::Unspecified,
566 ),
567 ];
568
569 for (
570 case_name,
571 bloom_filter_kind,
572 filter_type,
573 filter_layout,
574 expected_filter_type,
575 expected_filter_layout,
576 ) in cases
577 {
578 let sst_info = SstableInfoInner::from(PbSstableInfo {
579 bloom_filter_kind: bloom_filter_kind
580 .unwrap_or(PbBloomFilterType::BloomFilterUnspecified)
581 .into(),
582 filter_type: filter_type.map(Into::into),
583 filter_layout: filter_layout.map(Into::into),
584 ..Default::default()
585 });
586
587 assert_eq!(sst_info.filter_type, expected_filter_type, "{case_name}");
588 assert_eq!(
589 sst_info.filter_layout, expected_filter_layout,
590 "{case_name}"
591 );
592 }
593
594 let sst_info = SstableInfoInner {
595 filter_type: PbSstableFilterType::SstableFilterNone,
596 filter_layout: PbSstableFilterLayout::Unspecified,
597 ..Default::default()
598 };
599 let pb_sst_info = PbSstableInfo::from(sst_info);
600 assert_eq!(
601 pb_sst_info.bloom_filter_kind,
602 PbBloomFilterType::BloomFilterUnspecified as i32
603 );
604 assert_eq!(
605 pb_sst_info.filter_type,
606 Some(PbSstableFilterType::SstableFilterNone as i32)
607 );
608 assert_eq!(pb_sst_info.filter_layout, None);
609 }
610}