1use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::iter;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RwLock;
22use risingwave_common::catalog::{ColumnDesc, TableId};
23use risingwave_common::hash::{VirtualNode, VnodeCountCompat};
24use risingwave_common::util::memcmp_encoding;
25use risingwave_common::util::row_serde::OrderedRowSerde;
26use risingwave_common::util::sort_util::OrderType;
27use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
28use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
29use risingwave_hummock_sdk::compaction_group::StateTableId;
30use risingwave_hummock_sdk::key::{TABLE_PREFIX_LEN, get_table_id};
31use risingwave_pb::catalog::Table;
32use risingwave_rpc_client::MetaClient;
33use risingwave_rpc_client::error::{Result as RpcResult, RpcError};
34use thiserror_ext::AsReport;
35
36use crate::hummock::{HummockError, HummockResult};
37use crate::row_serde::value_serde::ValueRowSerdeNew;
38
39pub trait FilterKeyExtractor: Send + Sync {
41 fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8];
42}
43
44pub enum FilterKeyExtractorImpl {
45 Schema(SchemaFilterKeyExtractor),
46 FullKey(FullKeyFilterKeyExtractor),
47 Dummy(DummyFilterKeyExtractor),
48 Multi(MultiFilterKeyExtractor),
49 FixedLength(FixedLengthFilterKeyExtractor),
50}
51
52impl FilterKeyExtractorImpl {
53 pub fn from_table(table_catalog: &Table) -> Self {
54 let read_prefix_len = table_catalog.get_read_prefix_len_hint() as usize;
55
56 if read_prefix_len == 0 {
57 FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor)
58 } else if read_prefix_len > table_catalog.get_pk().len() {
59 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)
62 } else {
63 FilterKeyExtractorImpl::Schema(SchemaFilterKeyExtractor::new(table_catalog))
64 }
65 }
66}
67
68macro_rules! impl_filter_key_extractor {
69 ($( { $variant_name:ident } ),*) => {
70 impl FilterKeyExtractorImpl {
71 pub fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8]{
72 match self {
73 $( Self::$variant_name(inner) => inner.extract(user_key), )*
74 }
75 }
76 }
77 }
78
79}
80
81macro_rules! for_all_filter_key_extractor_variants {
82 ($macro:ident) => {
83 $macro! {
84 { Schema },
85 { FullKey },
86 { Dummy },
87 { Multi },
88 { FixedLength }
89 }
90 };
91}
92
93for_all_filter_key_extractor_variants! { impl_filter_key_extractor }
94
95#[derive(Default)]
96pub struct FullKeyFilterKeyExtractor;
97
98impl FilterKeyExtractor for FullKeyFilterKeyExtractor {
99 fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
100 user_key
101 }
102}
103
104#[derive(Default)]
105pub struct DummyFilterKeyExtractor;
106impl FilterKeyExtractor for DummyFilterKeyExtractor {
107 fn extract<'a>(&self, _user_key: &'a [u8]) -> &'a [u8] {
108 &[]
109 }
110}
111
112#[derive(Default)]
114pub struct FixedLengthFilterKeyExtractor {
115 fixed_length: usize,
116}
117
118impl FilterKeyExtractor for FixedLengthFilterKeyExtractor {
119 fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
120 &user_key[0..self.fixed_length]
121 }
122}
123
124impl FixedLengthFilterKeyExtractor {
125 pub fn new(fixed_length: usize) -> Self {
126 Self { fixed_length }
127 }
128}
129
130pub struct SchemaFilterKeyExtractor {
133 read_prefix_len: usize,
138 deserializer: OrderedRowSerde,
139 }
142
143impl FilterKeyExtractor for SchemaFilterKeyExtractor {
144 fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
145 if user_key.len() < TABLE_PREFIX_LEN + VirtualNode::SIZE {
146 return &[];
147 }
148
149 let (_table_prefix, key) = user_key.split_at(TABLE_PREFIX_LEN);
150 let (_vnode_prefix, pk) = key.split_at(VirtualNode::SIZE);
151
152 let bloom_filter_key_len = self
156 .deserializer
157 .deserialize_prefix_len(pk, self.read_prefix_len)
158 .unwrap();
159
160 let end_position = TABLE_PREFIX_LEN + VirtualNode::SIZE + bloom_filter_key_len;
161 &user_key[TABLE_PREFIX_LEN + VirtualNode::SIZE..end_position]
162 }
163}
164
165impl SchemaFilterKeyExtractor {
166 pub fn new(table_catalog: &Table) -> Self {
167 let pk_indices: Vec<usize> = table_catalog
168 .pk
169 .iter()
170 .map(|col_order| col_order.column_index as usize)
171 .collect();
172
173 let read_prefix_len = table_catalog.get_read_prefix_len_hint() as usize;
174
175 let data_types = pk_indices
176 .iter()
177 .map(|column_idx| &table_catalog.columns[*column_idx])
178 .map(|col| ColumnDesc::from(col.column_desc.as_ref().unwrap()).data_type)
179 .collect();
180
181 let order_types: Vec<OrderType> = table_catalog
182 .pk
183 .iter()
184 .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
185 .collect();
186
187 Self {
188 read_prefix_len,
189 deserializer: OrderedRowSerde::new(data_types, order_types),
190 }
191 }
192}
193
194#[derive(Default)]
195pub struct MultiFilterKeyExtractor {
196 id_to_filter_key_extractor: HashMap<TableId, FilterKeyExtractorImpl>,
197}
198
199impl MultiFilterKeyExtractor {
200 pub fn register(&mut self, table_id: TableId, filter_key_extractor: FilterKeyExtractorImpl) {
201 self.id_to_filter_key_extractor
202 .insert(table_id, filter_key_extractor);
203 }
204
205 pub fn size(&self) -> usize {
206 self.id_to_filter_key_extractor.len()
207 }
208
209 pub fn get_existing_table_ids(&self) -> HashSet<TableId> {
210 self.id_to_filter_key_extractor.keys().cloned().collect()
211 }
212}
213
214impl Debug for MultiFilterKeyExtractor {
215 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
216 write!(f, "MultiFilterKeyExtractor size {} ", self.size())
217 }
218}
219
220impl FilterKeyExtractor for MultiFilterKeyExtractor {
221 fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
222 if user_key.len() < TABLE_PREFIX_LEN + VirtualNode::SIZE {
223 return user_key;
224 }
225
226 let table_id = get_table_id(user_key);
227 self.id_to_filter_key_extractor
228 .get(&table_id)
229 .unwrap()
230 .extract(user_key)
231 }
232}
233
234#[async_trait::async_trait]
235pub trait StateTableAccessor: Send + Sync {
236 async fn get_tables(&self, table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>>;
237}
238
239#[derive(Default)]
240pub struct FakeRemoteTableAccessor {}
241
242pub struct RemoteTableAccessor {
243 meta_client: MetaClient,
244}
245
246impl RemoteTableAccessor {
247 pub fn new(meta_client: MetaClient) -> Self {
248 Self { meta_client }
249 }
250}
251
252#[async_trait::async_trait]
253impl StateTableAccessor for RemoteTableAccessor {
254 async fn get_tables(&self, table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>> {
255 self.meta_client.get_tables(table_ids.to_vec(), true).await
256 }
257}
258
259#[async_trait::async_trait]
260impl StateTableAccessor for FakeRemoteTableAccessor {
261 async fn get_tables(&self, _table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>> {
262 Err(RpcError::Internal(anyhow::anyhow!(
263 "fake accessor does not support fetch remote table"
264 )))
265 }
266}
267
268pub struct CompactionCatalogManager {
270 table_id_to_catalog: RwLock<HashMap<StateTableId, Table>>,
272 table_accessor: Box<dyn StateTableAccessor>,
274}
275
276impl Default for CompactionCatalogManager {
277 fn default() -> Self {
278 Self::new(Box::<FakeRemoteTableAccessor>::default())
279 }
280}
281
282impl CompactionCatalogManager {
283 pub fn new(table_accessor: Box<dyn StateTableAccessor>) -> Self {
284 Self {
285 table_id_to_catalog: Default::default(),
286 table_accessor,
287 }
288 }
289}
290
291impl CompactionCatalogManager {
292 pub fn update(&self, table_id: TableId, catalog: Table) {
294 self.table_id_to_catalog.write().insert(table_id, catalog);
295 }
296
297 pub fn sync(&self, catalog_map: HashMap<TableId, Table>) {
299 let mut guard = self.table_id_to_catalog.write();
300 guard.clear();
301 guard.extend(catalog_map);
302 }
303
304 pub fn remove(&self, table_id: TableId) {
306 self.table_id_to_catalog.write().remove(&table_id);
307 }
308
309 pub async fn acquire(
312 &self,
313 mut table_ids: Vec<StateTableId>,
314 ) -> HummockResult<CompactionCatalogAgentRef> {
315 if table_ids.is_empty() {
316 return Err(HummockError::other("table_id_set is empty"));
321 }
322
323 let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
324 let mut table_id_to_vnode = HashMap::new();
325 let mut table_id_to_watermark_serde = HashMap::new();
326 let mut table_id_to_value_watermark_serde = HashMap::new();
327
328 {
329 let guard = self.table_id_to_catalog.read();
330 table_ids.retain(|table_id| match guard.get(table_id) {
331 Some(table_catalog) => {
332 multi_filter_key_extractor
334 .register(*table_id, FilterKeyExtractorImpl::from_table(table_catalog));
335
336 table_id_to_vnode.insert(*table_id, table_catalog.vnode_count());
338
339 table_id_to_watermark_serde
341 .insert(*table_id, build_watermark_col_serde(table_catalog));
342 table_id_to_value_watermark_serde.insert(
343 *table_id,
344 build_value_watermark_col_serde(table_catalog).map(Arc::new),
345 );
346
347 false
348 }
349
350 None => true,
351 });
352 }
353
354 if !table_ids.is_empty() {
355 let mut state_tables =
356 self.table_accessor
357 .get_tables(&table_ids)
358 .await
359 .map_err(|e| {
360 HummockError::other(format!(
361 "request rpc list_tables for meta failed: {}",
362 e.as_report()
363 ))
364 })?;
365
366 let mut guard = self.table_id_to_catalog.write();
367 for table_id in table_ids {
368 if let Some(table) = state_tables.remove(&table_id) {
369 let table_id = table.id;
370 let key_extractor = FilterKeyExtractorImpl::from_table(&table);
371 let vnode = table.vnode_count();
372 let watermark_serde = build_watermark_col_serde(&table);
373 let value_watermark_serde = build_value_watermark_col_serde(&table);
374 guard.insert(table_id, table);
375 multi_filter_key_extractor.register(table_id, key_extractor);
377
378 table_id_to_vnode.insert(table_id, vnode);
380
381 table_id_to_watermark_serde.insert(table_id, watermark_serde);
383 table_id_to_value_watermark_serde
384 .insert(table_id, value_watermark_serde.map(Arc::new));
385 }
386 }
387 }
388
389 Ok(Arc::new(CompactionCatalogAgent::new(
390 FilterKeyExtractorImpl::Multi(multi_filter_key_extractor),
391 table_id_to_vnode,
392 table_id_to_watermark_serde,
393 table_id_to_value_watermark_serde,
394 )))
395 }
396
397 pub fn build_compaction_catalog_agent(
399 table_catalogs: HashMap<StateTableId, Table>,
400 ) -> CompactionCatalogAgentRef {
401 let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
402 let mut table_id_to_vnode = HashMap::new();
403 let mut table_id_to_watermark_serde = HashMap::new();
404 let mut value_table_id_to_watermark_serde = HashMap::new();
405 for (table_id, table_catalog) in table_catalogs {
406 multi_filter_key_extractor
408 .register(table_id, FilterKeyExtractorImpl::from_table(&table_catalog));
409
410 table_id_to_vnode.insert(table_id, table_catalog.vnode_count());
412
413 table_id_to_watermark_serde.insert(table_id, build_watermark_col_serde(&table_catalog));
415 value_table_id_to_watermark_serde.insert(
416 table_id,
417 build_value_watermark_col_serde(&table_catalog).map(Arc::new),
418 );
419 }
420
421 Arc::new(CompactionCatalogAgent::new(
422 FilterKeyExtractorImpl::Multi(multi_filter_key_extractor),
423 table_id_to_vnode,
424 table_id_to_watermark_serde,
425 value_table_id_to_watermark_serde,
426 ))
427 }
428}
429
430pub struct CompactionCatalogAgent {
434 filter_key_extractor_manager: FilterKeyExtractorImpl,
435 table_id_to_vnode: HashMap<StateTableId, usize>,
436 table_id_to_watermark_serde:
439 HashMap<StateTableId, Option<(OrderedRowSerde, OrderedRowSerde, usize)>>,
440 value_table_id_to_watermark_serde: HashMap<StateTableId, Option<ValueWatermarkColumnSerdeRef>>,
441}
442
443impl CompactionCatalogAgent {
444 pub fn new(
445 filter_key_extractor_manager: FilterKeyExtractorImpl,
446 table_id_to_vnode: HashMap<StateTableId, usize>,
447 table_id_to_watermark_serde: HashMap<
448 StateTableId,
449 Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
450 >,
451 value_table_id_to_watermark_serde: HashMap<
452 StateTableId,
453 Option<ValueWatermarkColumnSerdeRef>,
454 >,
455 ) -> Self {
456 Self {
457 filter_key_extractor_manager,
458 table_id_to_vnode,
459 table_id_to_watermark_serde,
460 value_table_id_to_watermark_serde,
461 }
462 }
463
464 pub fn dummy() -> Self {
465 Self {
466 filter_key_extractor_manager: FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
467 table_id_to_vnode: Default::default(),
468 table_id_to_watermark_serde: Default::default(),
469 value_table_id_to_watermark_serde: Default::default(),
470 }
471 }
472
473 pub fn for_test(table_ids: Vec<impl Into<StateTableId>>) -> Arc<Self> {
474 let full_key_filter_key_extractor =
475 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
476
477 let table_id_to_vnode: HashMap<TableId, usize> = table_ids
478 .into_iter()
479 .map(|table_id| (table_id.into(), VirtualNode::COUNT_FOR_TEST))
480 .collect();
481
482 let table_id_to_watermark_serde = table_id_to_vnode
483 .keys()
484 .map(|table_id| (*table_id, None))
485 .collect();
486
487 let value_table_id_to_watermark_serde = table_id_to_vnode
488 .keys()
489 .map(|table_id| (*table_id, None))
490 .collect();
491
492 Arc::new(CompactionCatalogAgent::new(
493 full_key_filter_key_extractor,
494 table_id_to_vnode,
495 table_id_to_watermark_serde,
496 value_table_id_to_watermark_serde,
497 ))
498 }
499}
500
501impl CompactionCatalogAgent {
502 pub fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
503 self.filter_key_extractor_manager.extract(user_key)
504 }
505
506 pub fn vnode_count(&self, table_id: StateTableId) -> usize {
507 *self.table_id_to_vnode.get(&table_id).unwrap_or_else(|| {
508 panic!(
509 "table_id not found {} all_table_ids {:?}",
510 table_id,
511 self.table_id_to_vnode.keys()
512 )
513 })
514 }
515
516 pub fn watermark_serde(
517 &self,
518 table_id: StateTableId,
519 ) -> Option<(OrderedRowSerde, OrderedRowSerde, usize)> {
520 self.table_id_to_watermark_serde
521 .get(&table_id)
522 .unwrap_or_else(|| {
523 panic!(
524 "table_id not found {} all_table_ids {:?}",
525 table_id,
526 self.table_id_to_watermark_serde.keys()
527 )
528 })
529 .clone()
530 }
531
532 pub fn value_watermark_serde(
533 &self,
534 table_id: StateTableId,
535 ) -> Option<ValueWatermarkColumnSerdeRef> {
536 self.value_table_id_to_watermark_serde
537 .get(&table_id)
538 .unwrap_or_else(|| {
539 panic!(
540 "table_id not found {} all_table_ids {:?}",
541 table_id,
542 self.value_table_id_to_watermark_serde.keys()
543 )
544 })
545 .clone()
546 }
547
548 pub fn table_id_to_vnode_ref(&self) -> &HashMap<StateTableId, usize> {
549 &self.table_id_to_vnode
550 }
551
552 pub fn table_ids(&self) -> impl Iterator<Item = StateTableId> + '_ {
553 self.table_id_to_vnode.keys().cloned()
554 }
555}
556
557pub type CompactionCatalogManagerRef = Arc<CompactionCatalogManager>;
558pub type CompactionCatalogAgentRef = Arc<CompactionCatalogAgent>;
559
560fn build_watermark_col_serde(
561 table_catalog: &Table,
562) -> Option<(OrderedRowSerde, OrderedRowSerde, usize)> {
563 let clean_watermark_index_in_pk = table_catalog.get_clean_watermark_index_in_pk_compat();
565
566 match clean_watermark_index_in_pk {
567 None => {
568 None
571 }
572
573 Some(clean_watermark_index_in_pk) => {
574 use risingwave_common::types::DataType;
575 let table_columns: Vec<ColumnDesc> = table_catalog
576 .columns
577 .iter()
578 .map(|col| col.column_desc.as_ref().unwrap().into())
579 .collect();
580
581 let pk_data_types: Vec<DataType> = table_catalog
582 .pk
583 .iter()
584 .map(|col_order| {
585 table_columns[col_order.column_index as usize]
586 .data_type
587 .clone()
588 })
589 .collect();
590
591 let pk_order_types = table_catalog
592 .pk
593 .iter()
594 .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
595 .collect_vec();
596
597 assert_eq!(pk_data_types.len(), pk_order_types.len());
598 let pk_serde = OrderedRowSerde::new(pk_data_types, pk_order_types);
599 let watermark_col_serde = pk_serde.index(clean_watermark_index_in_pk).into_owned();
600 Some((pk_serde, watermark_col_serde, clean_watermark_index_in_pk))
601 }
602 }
603}
604
605fn build_value_watermark_col_serde(table_catalog: &Table) -> Option<ValueWatermarkColumnSerde> {
606 pub fn try_get_non_pk_clean_watermark_column_index(table: &Table) -> Option<usize> {
608 table
609 .get_clean_watermark_column_indices()
610 .iter()
611 .filter_map(|&col_idx| {
612 if table
613 .pk
614 .iter()
615 .any(|col_order| col_order.column_index == col_idx)
616 {
617 return None;
618 }
619 Some(col_idx as usize)
620 })
621 .at_most_one()
622 .unwrap()
623 }
624
625 let clean_watermark_index = try_get_non_pk_clean_watermark_column_index(table_catalog)?;
626 Some(ValueWatermarkColumnSerde::new(
627 table_catalog,
628 clean_watermark_index,
629 ))
630}
631
632pub struct ValueWatermarkColumnSerde {
633 row_serde: EitherSerde,
635 watermark_index_in_de_row: usize,
637 watermark_column_mem_encoding_order: OrderType,
638}
639
640pub type ValueWatermarkColumnSerdeRef = Arc<ValueWatermarkColumnSerde>;
641
642impl ValueWatermarkColumnSerde {
643 fn new(table_catalog: &Table, clean_watermark_index: usize) -> Self {
644 let table_columns: Vec<ColumnDesc> = table_catalog
645 .columns
646 .iter()
647 .map(|col| col.column_desc.as_ref().unwrap().into())
648 .collect();
649 let pk_order_type = table_catalog
650 .pk
651 .iter()
652 .filter_map(|col_order| {
653 if col_order.column_index as usize == clean_watermark_index {
654 return Some(OrderType::from_protobuf(
655 col_order.get_order_type().unwrap(),
656 ));
657 }
658 None
659 })
660 .at_most_one()
661 .unwrap();
662 let watermark_column_mem_encoding_order = match pk_order_type {
664 Some(o) => o,
665 None => OrderType::ascending(),
667 };
668 let Some(watermark_index_in_value_indices) = table_catalog
670 .value_indices
671 .iter()
672 .position(|p| *p as usize == clean_watermark_index)
673 else {
674 panic!(
675 "Watermark index {} not found in value_indices {:?}.",
676 clean_watermark_index, table_catalog.value_indices
677 );
678 };
679 let (row_serde, watermark_index_in_de_row) = if table_catalog.version.is_none() {
680 let row_serde = BasicSerde::new(
681 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
682 Arc::from(table_columns.into_boxed_slice()),
683 )
684 .into();
685 (row_serde, watermark_index_in_value_indices)
686 } else {
687 let row_serde = ColumnAwareSerde::new(
688 Arc::from_iter(iter::once(clean_watermark_index)),
689 Arc::from(table_columns.into_boxed_slice()),
690 )
691 .into();
692 (row_serde, 0)
694 };
695 Self {
696 row_serde,
697 watermark_index_in_de_row,
698 watermark_column_mem_encoding_order,
699 }
700 }
701
702 pub fn deserialize(&self, encoded_bytes: &[u8]) -> HummockResult<Option<Vec<u8>>> {
703 let mut row = self
704 .row_serde
705 .deserialize(encoded_bytes)
706 .map_err(HummockError::decode_error)?;
707 if self.watermark_index_in_de_row >= row.len() {
708 return Ok(None);
710 }
711 let datum = std::mem::take(&mut row[self.watermark_index_in_de_row]);
712 let bytes = memcmp_encoding::encode_value(datum, self.watermark_column_mem_encoding_order)
714 .map_err(HummockError::encode_error)?;
715 Ok(Some(bytes.into()))
716 }
717}
718
719#[cfg(test)]
720mod tests {
721 use std::mem;
722
723 use bytes::{BufMut, BytesMut};
724 use itertools::Itertools;
725 use risingwave_common::catalog::ColumnDesc;
726 use risingwave_common::hash::VirtualNode;
727 use risingwave_common::row::OwnedRow;
728 use risingwave_common::types::DataType;
729 use risingwave_common::types::ScalarImpl::{self};
730 use risingwave_common::util::row_serde::OrderedRowSerde;
731 use risingwave_common::util::sort_util::OrderType;
732 use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN;
733 use risingwave_pb::catalog::table::{PbEngine, TableType};
734 use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable};
735 use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType};
736 use risingwave_pb::plan_common::PbColumnCatalog;
737
738 use super::{DummyFilterKeyExtractor, FilterKeyExtractor, SchemaFilterKeyExtractor};
739 use crate::compaction_catalog_manager::{
740 FilterKeyExtractorImpl, FullKeyFilterKeyExtractor, MultiFilterKeyExtractor,
741 };
742 const fn dummy_vnode() -> [u8; VirtualNode::SIZE] {
743 VirtualNode::from_index(233).to_be_bytes()
744 }
745
746 #[test]
747 fn test_default_filter_key_extractor() {
748 let dummy_filter_key_extractor = DummyFilterKeyExtractor;
749 let full_key = "full_key".as_bytes();
750 let output_key = dummy_filter_key_extractor.extract(full_key);
751
752 assert_eq!("".as_bytes(), output_key);
753
754 let full_key_filter_key_extractor = FullKeyFilterKeyExtractor;
755 let output_key = full_key_filter_key_extractor.extract(full_key);
756
757 assert_eq!(full_key, output_key);
758 }
759
760 fn build_table_with_prefix_column_num(column_count: u32) -> PbTable {
761 PbTable {
762 id: 0.into(),
763 schema_id: 0.into(),
764 database_id: 0.into(),
765 name: "test".to_owned(),
766 table_type: TableType::Table as i32,
767 columns: vec![
768 PbColumnCatalog {
769 column_desc: Some(
770 (&ColumnDesc::named("_row_id", 0.into(), DataType::Int64)).into(),
771 ),
772 is_hidden: true,
773 },
774 PbColumnCatalog {
775 column_desc: Some(
776 (&ColumnDesc::named("col_1", 0.into(), DataType::Int64)).into(),
777 ),
778 is_hidden: false,
779 },
780 PbColumnCatalog {
781 column_desc: Some(
782 (&ColumnDesc::named("col_2", 0.into(), DataType::Float64)).into(),
783 ),
784 is_hidden: false,
785 },
786 PbColumnCatalog {
787 column_desc: Some(
788 (&ColumnDesc::named("col_3", 0.into(), DataType::Varchar)).into(),
789 ),
790 is_hidden: false,
791 },
792 ],
793 pk: vec![
794 PbColumnOrder {
795 column_index: 1,
796 order_type: Some(PbOrderType {
797 direction: PbDirection::Ascending as _,
798 nulls_are: PbNullsAre::Largest as _,
799 }),
800 },
801 PbColumnOrder {
802 column_index: 3,
803 order_type: Some(PbOrderType {
804 direction: PbDirection::Ascending as _,
805 nulls_are: PbNullsAre::Largest as _,
806 }),
807 },
808 ],
809 stream_key: vec![0],
810 distribution_key: (0..column_count as i32).collect_vec(),
811 optional_associated_source_id: None,
812 append_only: false,
813 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
814 retention_seconds: Some(300),
815 fragment_id: 0.into(),
816 dml_fragment_id: None,
817 initialized_at_epoch: None,
818 vnode_col_index: None,
819 row_id_index: Some(0),
820 value_indices: vec![0],
821 definition: "".into(),
822 handle_pk_conflict_behavior: 0,
823 version_column_indices: vec![],
824 read_prefix_len_hint: 1,
825 version: None,
826 watermark_indices: vec![],
827 dist_key_in_pk: vec![],
828 cardinality: None,
829 created_at_epoch: None,
830 #[expect(deprecated)]
831 cleaned_by_watermark: false,
832 stream_job_status: PbStreamJobStatus::Created.into(),
833 create_type: PbCreateType::Foreground.into(),
834 description: None,
835 #[expect(deprecated)]
836 incoming_sinks: Default::default(),
837 initialized_at_cluster_version: None,
838 created_at_cluster_version: None,
839 cdc_table_id: None,
840 maybe_vnode_count: None,
841 webhook_info: None,
842 job_id: None,
843 engine: Some(PbEngine::Hummock as i32),
844 #[expect(deprecated)]
845 clean_watermark_index_in_pk: None,
846 clean_watermark_indices: vec![],
847 refreshable: false,
848 vector_index_info: None,
849 cdc_table_type: None,
850 }
851 }
852
853 #[test]
854 fn test_zero_read_prefix_len_uses_dummy_extractor() {
855 let mut prost_table = build_table_with_prefix_column_num(1);
856 prost_table.read_prefix_len_hint = 0;
857
858 let extractor = FilterKeyExtractorImpl::from_table(&prost_table);
859 let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
860 let schema = vec![DataType::Int64, DataType::Varchar];
861 let serializer = OrderedRowSerde::new(schema, order_types);
862 let row = OwnedRow::new(vec![
863 Some(ScalarImpl::Int64(100)),
864 Some(ScalarImpl::Utf8("abc".into())),
865 ]);
866 let mut row_bytes = vec![];
867 serializer.serialize(&row, &mut row_bytes);
868
869 let table_prefix = {
870 let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
871 buf.put_u32(1);
872 buf.to_vec()
873 };
874 let vnode_prefix = &dummy_vnode()[..];
875 let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
876
877 assert!(extractor.extract(&full_key).is_empty());
878 }
879
880 #[test]
881 fn test_schema_filter_key_extractor() {
882 let prost_table = build_table_with_prefix_column_num(1);
883 let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
884
885 let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
886 let schema = vec![DataType::Int64, DataType::Varchar];
887 let serializer = OrderedRowSerde::new(schema, order_types);
888 let row = OwnedRow::new(vec![
889 Some(ScalarImpl::Int64(100)),
890 Some(ScalarImpl::Utf8("abc".into())),
891 ]);
892 let mut row_bytes = vec![];
893 serializer.serialize(&row, &mut row_bytes);
894
895 let table_prefix = {
896 let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
897 buf.put_u32(1);
898 buf.to_vec()
899 };
900
901 let vnode_prefix = &dummy_vnode()[..];
902
903 let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
904 let output_key = schema_filter_key_extractor.extract(&full_key);
905 assert_eq!(1 + mem::size_of::<i64>(), output_key.len());
906 }
907
908 #[test]
909 fn test_multi_filter_key_extractor() {
910 let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
911 {
912 let prost_table = build_table_with_prefix_column_num(1);
914 let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
915 multi_filter_key_extractor.register(
916 1.into(),
917 FilterKeyExtractorImpl::Schema(schema_filter_key_extractor),
918 );
919 let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
920 let schema = vec![DataType::Int64, DataType::Varchar];
921 let serializer = OrderedRowSerde::new(schema, order_types);
922 let row = OwnedRow::new(vec![
923 Some(ScalarImpl::Int64(100)),
924 Some(ScalarImpl::Utf8("abc".into())),
925 ]);
926 let mut row_bytes = vec![];
927 serializer.serialize(&row, &mut row_bytes);
928
929 let table_prefix = {
930 let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
931 buf.put_u32(1);
932 buf.to_vec()
933 };
934
935 let vnode_prefix = &dummy_vnode()[..];
936
937 let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
938 let output_key = multi_filter_key_extractor.extract(&full_key);
939
940 let data_types = vec![DataType::Int64];
941 let order_types = vec![OrderType::ascending()];
942 let deserializer = OrderedRowSerde::new(data_types, order_types);
943
944 let pk_prefix_len = deserializer.deserialize_prefix_len(&row_bytes, 1).unwrap();
945 assert_eq!(pk_prefix_len, output_key.len());
946 }
947
948 {
949 let prost_table = build_table_with_prefix_column_num(2);
951 let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
952 multi_filter_key_extractor.register(
953 2.into(),
954 FilterKeyExtractorImpl::Schema(schema_filter_key_extractor),
955 );
956 let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
957 let schema = vec![DataType::Int64, DataType::Varchar];
958 let serializer = OrderedRowSerde::new(schema, order_types);
959 let row = OwnedRow::new(vec![
960 Some(ScalarImpl::Int64(100)),
961 Some(ScalarImpl::Utf8("abc".into())),
962 ]);
963 let mut row_bytes = vec![];
964 serializer.serialize(&row, &mut row_bytes);
965
966 let table_prefix = {
967 let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
968 buf.put_u32(2);
969 buf.to_vec()
970 };
971
972 let vnode_prefix = &dummy_vnode()[..];
973
974 let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
975 let output_key = multi_filter_key_extractor.extract(&full_key);
976
977 let data_types = vec![DataType::Int64, DataType::Varchar];
978 let order_types = vec![OrderType::ascending(), OrderType::ascending()];
979 let deserializer = OrderedRowSerde::new(data_types, order_types);
980
981 let pk_prefix_len = deserializer.deserialize_prefix_len(&row_bytes, 1).unwrap();
982
983 assert_eq!(pk_prefix_len, output_key.len());
984 }
985 }
986
987 #[tokio::test]
988 async fn test_compaction_catalog_manager_exception() {
989 let compaction_catalog_manager = super::CompactionCatalogManager::default();
990
991 {
992 let ret = compaction_catalog_manager.acquire(vec![]).await;
993 assert!(ret.is_err());
994 if let Err(e) = ret {
995 assert_eq!(e.to_string(), "Other error: table_id_set is empty");
996 }
997 }
998
999 {
1000 let ret = compaction_catalog_manager.acquire(vec![1.into()]).await;
1002 assert!(ret.is_err());
1003 if let Err(e) = ret {
1004 assert_eq!(
1005 e.to_string(),
1006 "Other error: request rpc list_tables for meta failed: fake accessor does not support fetch remote table"
1007 );
1008 }
1009 }
1010 }
1011}