1use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::iter;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RwLock;
22use risingwave_common::catalog::{ColumnDesc, TableId};
23use risingwave_common::hash::{VirtualNode, VnodeCountCompat};
24use risingwave_common::util::memcmp_encoding;
25use risingwave_common::util::row_serde::OrderedRowSerde;
26use risingwave_common::util::sort_util::OrderType;
27use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
28use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
29use risingwave_hummock_sdk::compaction_group::StateTableId;
30use risingwave_hummock_sdk::key::{TABLE_PREFIX_LEN, get_table_id};
31use risingwave_pb::catalog::Table;
32use risingwave_rpc_client::MetaClient;
33use risingwave_rpc_client::error::{Result as RpcResult, RpcError};
34use thiserror_ext::AsReport;
35
36use crate::hummock::{HummockError, HummockResult};
37use crate::row_serde::value_serde::ValueRowSerdeNew;
38
39pub trait FilterKeyExtractor: Send + Sync {
41 fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8];
42}
43
44pub enum FilterKeyExtractorImpl {
45 Schema(SchemaFilterKeyExtractor),
46 FullKey(FullKeyFilterKeyExtractor),
47 Dummy(DummyFilterKeyExtractor),
48 Multi(MultiFilterKeyExtractor),
49 FixedLength(FixedLengthFilterKeyExtractor),
50}
51
52impl FilterKeyExtractorImpl {
53 pub fn from_table(table_catalog: &Table) -> Self {
54 let read_prefix_len = table_catalog.get_read_prefix_len_hint() as usize;
55
56 if read_prefix_len == 0 || read_prefix_len > table_catalog.get_pk().len() {
57 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)
60 } else {
61 FilterKeyExtractorImpl::Schema(SchemaFilterKeyExtractor::new(table_catalog))
62 }
63 }
64}
65
66macro_rules! impl_filter_key_extractor {
67 ($( { $variant_name:ident } ),*) => {
68 impl FilterKeyExtractorImpl {
69 pub fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8]{
70 match self {
71 $( Self::$variant_name(inner) => inner.extract(full_key), )*
72 }
73 }
74 }
75 }
76
77}
78
79macro_rules! for_all_filter_key_extractor_variants {
80 ($macro:ident) => {
81 $macro! {
82 { Schema },
83 { FullKey },
84 { Dummy },
85 { Multi },
86 { FixedLength }
87 }
88 };
89}
90
91for_all_filter_key_extractor_variants! { impl_filter_key_extractor }
92
93#[derive(Default)]
94pub struct FullKeyFilterKeyExtractor;
95
96impl FilterKeyExtractor for FullKeyFilterKeyExtractor {
97 fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
98 user_key
99 }
100}
101
102#[derive(Default)]
103pub struct DummyFilterKeyExtractor;
104impl FilterKeyExtractor for DummyFilterKeyExtractor {
105 fn extract<'a>(&self, _full_key: &'a [u8]) -> &'a [u8] {
106 &[]
107 }
108}
109
110#[derive(Default)]
112pub struct FixedLengthFilterKeyExtractor {
113 fixed_length: usize,
114}
115
116impl FilterKeyExtractor for FixedLengthFilterKeyExtractor {
117 fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8] {
118 &full_key[0..self.fixed_length]
119 }
120}
121
122impl FixedLengthFilterKeyExtractor {
123 pub fn new(fixed_length: usize) -> Self {
124 Self { fixed_length }
125 }
126}
127
128pub struct SchemaFilterKeyExtractor {
131 read_prefix_len: usize,
136 deserializer: OrderedRowSerde,
137 }
140
141impl FilterKeyExtractor for SchemaFilterKeyExtractor {
142 fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8] {
143 if full_key.len() < TABLE_PREFIX_LEN + VirtualNode::SIZE {
144 return &[];
145 }
146
147 let (_table_prefix, key) = full_key.split_at(TABLE_PREFIX_LEN);
148 let (_vnode_prefix, pk) = key.split_at(VirtualNode::SIZE);
149
150 let bloom_filter_key_len = self
154 .deserializer
155 .deserialize_prefix_len(pk, self.read_prefix_len)
156 .unwrap();
157
158 let end_position = TABLE_PREFIX_LEN + VirtualNode::SIZE + bloom_filter_key_len;
159 &full_key[TABLE_PREFIX_LEN + VirtualNode::SIZE..end_position]
160 }
161}
162
163impl SchemaFilterKeyExtractor {
164 pub fn new(table_catalog: &Table) -> Self {
165 let pk_indices: Vec<usize> = table_catalog
166 .pk
167 .iter()
168 .map(|col_order| col_order.column_index as usize)
169 .collect();
170
171 let read_prefix_len = table_catalog.get_read_prefix_len_hint() as usize;
172
173 let data_types = pk_indices
174 .iter()
175 .map(|column_idx| &table_catalog.columns[*column_idx])
176 .map(|col| ColumnDesc::from(col.column_desc.as_ref().unwrap()).data_type)
177 .collect();
178
179 let order_types: Vec<OrderType> = table_catalog
180 .pk
181 .iter()
182 .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
183 .collect();
184
185 Self {
186 read_prefix_len,
187 deserializer: OrderedRowSerde::new(data_types, order_types),
188 }
189 }
190}
191
192#[derive(Default)]
193pub struct MultiFilterKeyExtractor {
194 id_to_filter_key_extractor: HashMap<TableId, FilterKeyExtractorImpl>,
195}
196
197impl MultiFilterKeyExtractor {
198 pub fn register(&mut self, table_id: TableId, filter_key_extractor: FilterKeyExtractorImpl) {
199 self.id_to_filter_key_extractor
200 .insert(table_id, filter_key_extractor);
201 }
202
203 pub fn size(&self) -> usize {
204 self.id_to_filter_key_extractor.len()
205 }
206
207 pub fn get_existing_table_ids(&self) -> HashSet<TableId> {
208 self.id_to_filter_key_extractor.keys().cloned().collect()
209 }
210}
211
212impl Debug for MultiFilterKeyExtractor {
213 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214 write!(f, "MultiFilterKeyExtractor size {} ", self.size())
215 }
216}
217
218impl FilterKeyExtractor for MultiFilterKeyExtractor {
219 fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8] {
220 if full_key.len() < TABLE_PREFIX_LEN + VirtualNode::SIZE {
221 return full_key;
222 }
223
224 let table_id = get_table_id(full_key);
225 self.id_to_filter_key_extractor
226 .get(&table_id)
227 .unwrap()
228 .extract(full_key)
229 }
230}
231
232#[async_trait::async_trait]
233pub trait StateTableAccessor: Send + Sync {
234 async fn get_tables(&self, table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>>;
235}
236
237#[derive(Default)]
238pub struct FakeRemoteTableAccessor {}
239
240pub struct RemoteTableAccessor {
241 meta_client: MetaClient,
242}
243
244impl RemoteTableAccessor {
245 pub fn new(meta_client: MetaClient) -> Self {
246 Self { meta_client }
247 }
248}
249
250#[async_trait::async_trait]
251impl StateTableAccessor for RemoteTableAccessor {
252 async fn get_tables(&self, table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>> {
253 self.meta_client.get_tables(table_ids.to_vec(), true).await
254 }
255}
256
257#[async_trait::async_trait]
258impl StateTableAccessor for FakeRemoteTableAccessor {
259 async fn get_tables(&self, _table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>> {
260 Err(RpcError::Internal(anyhow::anyhow!(
261 "fake accessor does not support fetch remote table"
262 )))
263 }
264}
265
266pub struct CompactionCatalogManager {
268 table_id_to_catalog: RwLock<HashMap<StateTableId, Table>>,
270 table_accessor: Box<dyn StateTableAccessor>,
272}
273
274impl Default for CompactionCatalogManager {
275 fn default() -> Self {
276 Self::new(Box::<FakeRemoteTableAccessor>::default())
277 }
278}
279
280impl CompactionCatalogManager {
281 pub fn new(table_accessor: Box<dyn StateTableAccessor>) -> Self {
282 Self {
283 table_id_to_catalog: Default::default(),
284 table_accessor,
285 }
286 }
287}
288
289impl CompactionCatalogManager {
290 pub fn update(&self, table_id: TableId, catalog: Table) {
292 self.table_id_to_catalog.write().insert(table_id, catalog);
293 }
294
295 pub fn sync(&self, catalog_map: HashMap<TableId, Table>) {
297 let mut guard = self.table_id_to_catalog.write();
298 guard.clear();
299 guard.extend(catalog_map);
300 }
301
302 pub fn remove(&self, table_id: TableId) {
304 self.table_id_to_catalog.write().remove(&table_id);
305 }
306
307 pub async fn acquire(
310 &self,
311 mut table_ids: Vec<StateTableId>,
312 ) -> HummockResult<CompactionCatalogAgentRef> {
313 if table_ids.is_empty() {
314 return Err(HummockError::other("table_id_set is empty"));
319 }
320
321 let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
322 let mut table_id_to_vnode = HashMap::new();
323 let mut table_id_to_watermark_serde = HashMap::new();
324 let mut table_id_to_value_watermark_serde = HashMap::new();
325
326 {
327 let guard = self.table_id_to_catalog.read();
328 table_ids.retain(|table_id| match guard.get(table_id) {
329 Some(table_catalog) => {
330 multi_filter_key_extractor
332 .register(*table_id, FilterKeyExtractorImpl::from_table(table_catalog));
333
334 table_id_to_vnode.insert(*table_id, table_catalog.vnode_count());
336
337 table_id_to_watermark_serde
339 .insert(*table_id, build_watermark_col_serde(table_catalog));
340 table_id_to_value_watermark_serde.insert(
341 *table_id,
342 build_value_watermark_col_serde(table_catalog).map(Arc::new),
343 );
344
345 false
346 }
347
348 None => true,
349 });
350 }
351
352 if !table_ids.is_empty() {
353 let mut state_tables =
354 self.table_accessor
355 .get_tables(&table_ids)
356 .await
357 .map_err(|e| {
358 HummockError::other(format!(
359 "request rpc list_tables for meta failed: {}",
360 e.as_report()
361 ))
362 })?;
363
364 let mut guard = self.table_id_to_catalog.write();
365 for table_id in table_ids {
366 if let Some(table) = state_tables.remove(&table_id) {
367 let table_id = table.id;
368 let key_extractor = FilterKeyExtractorImpl::from_table(&table);
369 let vnode = table.vnode_count();
370 let watermark_serde = build_watermark_col_serde(&table);
371 let value_watermark_serde = build_value_watermark_col_serde(&table);
372 guard.insert(table_id, table);
373 multi_filter_key_extractor.register(table_id, key_extractor);
375
376 table_id_to_vnode.insert(table_id, vnode);
378
379 table_id_to_watermark_serde.insert(table_id, watermark_serde);
381 table_id_to_value_watermark_serde
382 .insert(table_id, value_watermark_serde.map(Arc::new));
383 }
384 }
385 }
386
387 Ok(Arc::new(CompactionCatalogAgent::new(
388 FilterKeyExtractorImpl::Multi(multi_filter_key_extractor),
389 table_id_to_vnode,
390 table_id_to_watermark_serde,
391 table_id_to_value_watermark_serde,
392 )))
393 }
394
395 pub fn build_compaction_catalog_agent(
397 table_catalogs: HashMap<StateTableId, Table>,
398 ) -> CompactionCatalogAgentRef {
399 let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
400 let mut table_id_to_vnode = HashMap::new();
401 let mut table_id_to_watermark_serde = HashMap::new();
402 let mut value_table_id_to_watermark_serde = HashMap::new();
403 for (table_id, table_catalog) in table_catalogs {
404 multi_filter_key_extractor
406 .register(table_id, FilterKeyExtractorImpl::from_table(&table_catalog));
407
408 table_id_to_vnode.insert(table_id, table_catalog.vnode_count());
410
411 table_id_to_watermark_serde.insert(table_id, build_watermark_col_serde(&table_catalog));
413 value_table_id_to_watermark_serde.insert(
414 table_id,
415 build_value_watermark_col_serde(&table_catalog).map(Arc::new),
416 );
417 }
418
419 Arc::new(CompactionCatalogAgent::new(
420 FilterKeyExtractorImpl::Multi(multi_filter_key_extractor),
421 table_id_to_vnode,
422 table_id_to_watermark_serde,
423 value_table_id_to_watermark_serde,
424 ))
425 }
426}
427
428pub struct CompactionCatalogAgent {
432 filter_key_extractor_manager: FilterKeyExtractorImpl,
433 table_id_to_vnode: HashMap<StateTableId, usize>,
434 table_id_to_watermark_serde:
437 HashMap<StateTableId, Option<(OrderedRowSerde, OrderedRowSerde, usize)>>,
438 value_table_id_to_watermark_serde: HashMap<StateTableId, Option<ValueWatermarkColumnSerdeRef>>,
439}
440
441impl CompactionCatalogAgent {
442 pub fn new(
443 filter_key_extractor_manager: FilterKeyExtractorImpl,
444 table_id_to_vnode: HashMap<StateTableId, usize>,
445 table_id_to_watermark_serde: HashMap<
446 StateTableId,
447 Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
448 >,
449 value_table_id_to_watermark_serde: HashMap<
450 StateTableId,
451 Option<ValueWatermarkColumnSerdeRef>,
452 >,
453 ) -> Self {
454 Self {
455 filter_key_extractor_manager,
456 table_id_to_vnode,
457 table_id_to_watermark_serde,
458 value_table_id_to_watermark_serde,
459 }
460 }
461
462 pub fn dummy() -> Self {
463 Self {
464 filter_key_extractor_manager: FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
465 table_id_to_vnode: Default::default(),
466 table_id_to_watermark_serde: Default::default(),
467 value_table_id_to_watermark_serde: Default::default(),
468 }
469 }
470
471 pub fn for_test(table_ids: Vec<impl Into<StateTableId>>) -> Arc<Self> {
472 let full_key_filter_key_extractor =
473 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
474
475 let table_id_to_vnode: HashMap<TableId, usize> = table_ids
476 .into_iter()
477 .map(|table_id| (table_id.into(), VirtualNode::COUNT_FOR_TEST))
478 .collect();
479
480 let table_id_to_watermark_serde = table_id_to_vnode
481 .keys()
482 .map(|table_id| (*table_id, None))
483 .collect();
484
485 let value_table_id_to_watermark_serde = table_id_to_vnode
486 .keys()
487 .map(|table_id| (*table_id, None))
488 .collect();
489
490 Arc::new(CompactionCatalogAgent::new(
491 full_key_filter_key_extractor,
492 table_id_to_vnode,
493 table_id_to_watermark_serde,
494 value_table_id_to_watermark_serde,
495 ))
496 }
497}
498
499impl CompactionCatalogAgent {
500 pub fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8] {
501 self.filter_key_extractor_manager.extract(full_key)
502 }
503
504 pub fn vnode_count(&self, table_id: StateTableId) -> usize {
505 *self.table_id_to_vnode.get(&table_id).unwrap_or_else(|| {
506 panic!(
507 "table_id not found {} all_table_ids {:?}",
508 table_id,
509 self.table_id_to_vnode.keys()
510 )
511 })
512 }
513
514 pub fn watermark_serde(
515 &self,
516 table_id: StateTableId,
517 ) -> Option<(OrderedRowSerde, OrderedRowSerde, usize)> {
518 self.table_id_to_watermark_serde
519 .get(&table_id)
520 .unwrap_or_else(|| {
521 panic!(
522 "table_id not found {} all_table_ids {:?}",
523 table_id,
524 self.table_id_to_watermark_serde.keys()
525 )
526 })
527 .clone()
528 }
529
530 pub fn value_watermark_serde(
531 &self,
532 table_id: StateTableId,
533 ) -> Option<ValueWatermarkColumnSerdeRef> {
534 self.value_table_id_to_watermark_serde
535 .get(&table_id)
536 .unwrap_or_else(|| {
537 panic!(
538 "table_id not found {} all_table_ids {:?}",
539 table_id,
540 self.value_table_id_to_watermark_serde.keys()
541 )
542 })
543 .clone()
544 }
545
546 pub fn table_id_to_vnode_ref(&self) -> &HashMap<StateTableId, usize> {
547 &self.table_id_to_vnode
548 }
549
550 pub fn table_ids(&self) -> impl Iterator<Item = StateTableId> + '_ {
551 self.table_id_to_vnode.keys().cloned()
552 }
553}
554
555pub type CompactionCatalogManagerRef = Arc<CompactionCatalogManager>;
556pub type CompactionCatalogAgentRef = Arc<CompactionCatalogAgent>;
557
558fn build_watermark_col_serde(
559 table_catalog: &Table,
560) -> Option<(OrderedRowSerde, OrderedRowSerde, usize)> {
561 let clean_watermark_index_in_pk = table_catalog.get_clean_watermark_index_in_pk_compat();
563
564 match clean_watermark_index_in_pk {
565 None => {
566 None
569 }
570
571 Some(clean_watermark_index_in_pk) => {
572 use risingwave_common::types::DataType;
573 let table_columns: Vec<ColumnDesc> = table_catalog
574 .columns
575 .iter()
576 .map(|col| col.column_desc.as_ref().unwrap().into())
577 .collect();
578
579 let pk_data_types: Vec<DataType> = table_catalog
580 .pk
581 .iter()
582 .map(|col_order| {
583 table_columns[col_order.column_index as usize]
584 .data_type
585 .clone()
586 })
587 .collect();
588
589 let pk_order_types = table_catalog
590 .pk
591 .iter()
592 .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
593 .collect_vec();
594
595 assert_eq!(pk_data_types.len(), pk_order_types.len());
596 let pk_serde = OrderedRowSerde::new(pk_data_types, pk_order_types);
597 let watermark_col_serde = pk_serde.index(clean_watermark_index_in_pk).into_owned();
598 Some((pk_serde, watermark_col_serde, clean_watermark_index_in_pk))
599 }
600 }
601}
602
603fn build_value_watermark_col_serde(table_catalog: &Table) -> Option<ValueWatermarkColumnSerde> {
604 pub fn try_get_non_pk_clean_watermark_column_index(table: &Table) -> Option<usize> {
606 table
607 .get_clean_watermark_column_indices()
608 .iter()
609 .filter_map(|&col_idx| {
610 if table
611 .pk
612 .iter()
613 .any(|col_order| col_order.column_index == col_idx)
614 {
615 return None;
616 }
617 Some(col_idx as usize)
618 })
619 .at_most_one()
620 .unwrap()
621 }
622
623 let clean_watermark_index = try_get_non_pk_clean_watermark_column_index(table_catalog)?;
624 Some(ValueWatermarkColumnSerde::new(
625 table_catalog,
626 clean_watermark_index,
627 ))
628}
629
630pub struct ValueWatermarkColumnSerde {
631 row_serde: EitherSerde,
633 watermark_index_in_de_row: usize,
635 watermark_column_mem_encoding_order: OrderType,
636}
637
638pub type ValueWatermarkColumnSerdeRef = Arc<ValueWatermarkColumnSerde>;
639
640impl ValueWatermarkColumnSerde {
641 fn new(table_catalog: &Table, clean_watermark_index: usize) -> Self {
642 let table_columns: Vec<ColumnDesc> = table_catalog
643 .columns
644 .iter()
645 .map(|col| col.column_desc.as_ref().unwrap().into())
646 .collect();
647 let pk_order_type = table_catalog
648 .pk
649 .iter()
650 .filter_map(|col_order| {
651 if col_order.column_index as usize == clean_watermark_index {
652 return Some(OrderType::from_protobuf(
653 col_order.get_order_type().unwrap(),
654 ));
655 }
656 None
657 })
658 .at_most_one()
659 .unwrap();
660 let watermark_column_mem_encoding_order = match pk_order_type {
662 Some(o) => o,
663 None => OrderType::ascending(),
665 };
666 let Some(watermark_index_in_value_indices) = table_catalog
668 .value_indices
669 .iter()
670 .position(|p| *p as usize == clean_watermark_index)
671 else {
672 panic!(
673 "Watermark index {} not found in value_indices {:?}.",
674 clean_watermark_index, table_catalog.value_indices
675 );
676 };
677 let (row_serde, watermark_index_in_de_row) = if table_catalog.version.is_none() {
678 let row_serde = BasicSerde::new(
679 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
680 Arc::from(table_columns.into_boxed_slice()),
681 )
682 .into();
683 (row_serde, watermark_index_in_value_indices)
684 } else {
685 let row_serde = ColumnAwareSerde::new(
686 Arc::from_iter(iter::once(clean_watermark_index)),
687 Arc::from(table_columns.into_boxed_slice()),
688 )
689 .into();
690 (row_serde, 0)
692 };
693 Self {
694 row_serde,
695 watermark_index_in_de_row,
696 watermark_column_mem_encoding_order,
697 }
698 }
699
700 pub fn deserialize(&self, encoded_bytes: &[u8]) -> HummockResult<Option<Vec<u8>>> {
701 let mut row = self
702 .row_serde
703 .deserialize(encoded_bytes)
704 .map_err(HummockError::decode_error)?;
705 if self.watermark_index_in_de_row >= row.len() {
706 return Ok(None);
708 }
709 let datum = std::mem::take(&mut row[self.watermark_index_in_de_row]);
710 let bytes = memcmp_encoding::encode_value(datum, self.watermark_column_mem_encoding_order)
712 .map_err(HummockError::encode_error)?;
713 Ok(Some(bytes.into()))
714 }
715}
716
717#[cfg(test)]
718mod tests {
719 use std::mem;
720
721 use bytes::{BufMut, BytesMut};
722 use itertools::Itertools;
723 use risingwave_common::catalog::ColumnDesc;
724 use risingwave_common::hash::VirtualNode;
725 use risingwave_common::row::OwnedRow;
726 use risingwave_common::types::DataType;
727 use risingwave_common::types::ScalarImpl::{self};
728 use risingwave_common::util::row_serde::OrderedRowSerde;
729 use risingwave_common::util::sort_util::OrderType;
730 use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN;
731 use risingwave_pb::catalog::table::{PbEngine, TableType};
732 use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable};
733 use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType};
734 use risingwave_pb::plan_common::PbColumnCatalog;
735
736 use super::{DummyFilterKeyExtractor, FilterKeyExtractor, SchemaFilterKeyExtractor};
737 use crate::compaction_catalog_manager::{
738 FilterKeyExtractorImpl, FullKeyFilterKeyExtractor, MultiFilterKeyExtractor,
739 };
740 const fn dummy_vnode() -> [u8; VirtualNode::SIZE] {
741 VirtualNode::from_index(233).to_be_bytes()
742 }
743
744 #[test]
745 fn test_default_filter_key_extractor() {
746 let dummy_filter_key_extractor = DummyFilterKeyExtractor;
747 let full_key = "full_key".as_bytes();
748 let output_key = dummy_filter_key_extractor.extract(full_key);
749
750 assert_eq!("".as_bytes(), output_key);
751
752 let full_key_filter_key_extractor = FullKeyFilterKeyExtractor;
753 let output_key = full_key_filter_key_extractor.extract(full_key);
754
755 assert_eq!(full_key, output_key);
756 }
757
758 fn build_table_with_prefix_column_num(column_count: u32) -> PbTable {
759 PbTable {
760 id: 0.into(),
761 schema_id: 0.into(),
762 database_id: 0.into(),
763 name: "test".to_owned(),
764 table_type: TableType::Table as i32,
765 columns: vec![
766 PbColumnCatalog {
767 column_desc: Some(
768 (&ColumnDesc::named("_row_id", 0.into(), DataType::Int64)).into(),
769 ),
770 is_hidden: true,
771 },
772 PbColumnCatalog {
773 column_desc: Some(
774 (&ColumnDesc::named("col_1", 0.into(), DataType::Int64)).into(),
775 ),
776 is_hidden: false,
777 },
778 PbColumnCatalog {
779 column_desc: Some(
780 (&ColumnDesc::named("col_2", 0.into(), DataType::Float64)).into(),
781 ),
782 is_hidden: false,
783 },
784 PbColumnCatalog {
785 column_desc: Some(
786 (&ColumnDesc::named("col_3", 0.into(), DataType::Varchar)).into(),
787 ),
788 is_hidden: false,
789 },
790 ],
791 pk: vec![
792 PbColumnOrder {
793 column_index: 1,
794 order_type: Some(PbOrderType {
795 direction: PbDirection::Ascending as _,
796 nulls_are: PbNullsAre::Largest as _,
797 }),
798 },
799 PbColumnOrder {
800 column_index: 3,
801 order_type: Some(PbOrderType {
802 direction: PbDirection::Ascending as _,
803 nulls_are: PbNullsAre::Largest as _,
804 }),
805 },
806 ],
807 stream_key: vec![0],
808 distribution_key: (0..column_count as i32).collect_vec(),
809 optional_associated_source_id: None,
810 append_only: false,
811 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
812 retention_seconds: Some(300),
813 fragment_id: 0.into(),
814 dml_fragment_id: None,
815 initialized_at_epoch: None,
816 vnode_col_index: None,
817 row_id_index: Some(0),
818 value_indices: vec![0],
819 definition: "".into(),
820 handle_pk_conflict_behavior: 0,
821 version_column_indices: vec![],
822 read_prefix_len_hint: 1,
823 version: None,
824 watermark_indices: vec![],
825 dist_key_in_pk: vec![],
826 cardinality: None,
827 created_at_epoch: None,
828 #[expect(deprecated)]
829 cleaned_by_watermark: false,
830 stream_job_status: PbStreamJobStatus::Created.into(),
831 create_type: PbCreateType::Foreground.into(),
832 description: None,
833 #[expect(deprecated)]
834 incoming_sinks: Default::default(),
835 initialized_at_cluster_version: None,
836 created_at_cluster_version: None,
837 cdc_table_id: None,
838 maybe_vnode_count: None,
839 webhook_info: None,
840 job_id: None,
841 engine: Some(PbEngine::Hummock as i32),
842 #[expect(deprecated)]
843 clean_watermark_index_in_pk: None,
844 clean_watermark_indices: vec![],
845 refreshable: false,
846 vector_index_info: None,
847 cdc_table_type: None,
848 }
849 }
850
851 #[test]
852 fn test_schema_filter_key_extractor() {
853 let prost_table = build_table_with_prefix_column_num(1);
854 let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
855
856 let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
857 let schema = vec![DataType::Int64, DataType::Varchar];
858 let serializer = OrderedRowSerde::new(schema, order_types);
859 let row = OwnedRow::new(vec![
860 Some(ScalarImpl::Int64(100)),
861 Some(ScalarImpl::Utf8("abc".into())),
862 ]);
863 let mut row_bytes = vec![];
864 serializer.serialize(&row, &mut row_bytes);
865
866 let table_prefix = {
867 let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
868 buf.put_u32(1);
869 buf.to_vec()
870 };
871
872 let vnode_prefix = &dummy_vnode()[..];
873
874 let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
875 let output_key = schema_filter_key_extractor.extract(&full_key);
876 assert_eq!(1 + mem::size_of::<i64>(), output_key.len());
877 }
878
879 #[test]
880 fn test_multi_filter_key_extractor() {
881 let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
882 {
883 let prost_table = build_table_with_prefix_column_num(1);
885 let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
886 multi_filter_key_extractor.register(
887 1.into(),
888 FilterKeyExtractorImpl::Schema(schema_filter_key_extractor),
889 );
890 let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
891 let schema = vec![DataType::Int64, DataType::Varchar];
892 let serializer = OrderedRowSerde::new(schema, order_types);
893 let row = OwnedRow::new(vec![
894 Some(ScalarImpl::Int64(100)),
895 Some(ScalarImpl::Utf8("abc".into())),
896 ]);
897 let mut row_bytes = vec![];
898 serializer.serialize(&row, &mut row_bytes);
899
900 let table_prefix = {
901 let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
902 buf.put_u32(1);
903 buf.to_vec()
904 };
905
906 let vnode_prefix = &dummy_vnode()[..];
907
908 let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
909 let output_key = multi_filter_key_extractor.extract(&full_key);
910
911 let data_types = vec![DataType::Int64];
912 let order_types = vec![OrderType::ascending()];
913 let deserializer = OrderedRowSerde::new(data_types, order_types);
914
915 let pk_prefix_len = deserializer.deserialize_prefix_len(&row_bytes, 1).unwrap();
916 assert_eq!(pk_prefix_len, output_key.len());
917 }
918
919 {
920 let prost_table = build_table_with_prefix_column_num(2);
922 let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
923 multi_filter_key_extractor.register(
924 2.into(),
925 FilterKeyExtractorImpl::Schema(schema_filter_key_extractor),
926 );
927 let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
928 let schema = vec![DataType::Int64, DataType::Varchar];
929 let serializer = OrderedRowSerde::new(schema, order_types);
930 let row = OwnedRow::new(vec![
931 Some(ScalarImpl::Int64(100)),
932 Some(ScalarImpl::Utf8("abc".into())),
933 ]);
934 let mut row_bytes = vec![];
935 serializer.serialize(&row, &mut row_bytes);
936
937 let table_prefix = {
938 let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
939 buf.put_u32(2);
940 buf.to_vec()
941 };
942
943 let vnode_prefix = &dummy_vnode()[..];
944
945 let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
946 let output_key = multi_filter_key_extractor.extract(&full_key);
947
948 let data_types = vec![DataType::Int64, DataType::Varchar];
949 let order_types = vec![OrderType::ascending(), OrderType::ascending()];
950 let deserializer = OrderedRowSerde::new(data_types, order_types);
951
952 let pk_prefix_len = deserializer.deserialize_prefix_len(&row_bytes, 1).unwrap();
953
954 assert_eq!(pk_prefix_len, output_key.len());
955 }
956 }
957
958 #[tokio::test]
959 async fn test_compaction_catalog_manager_exception() {
960 let compaction_catalog_manager = super::CompactionCatalogManager::default();
961
962 {
963 let ret = compaction_catalog_manager.acquire(vec![]).await;
964 assert!(ret.is_err());
965 if let Err(e) = ret {
966 assert_eq!(e.to_string(), "Other error: table_id_set is empty");
967 }
968 }
969
970 {
971 let ret = compaction_catalog_manager.acquire(vec![1.into()]).await;
973 assert!(ret.is_err());
974 if let Err(e) = ret {
975 assert_eq!(
976 e.to_string(),
977 "Other error: request rpc list_tables for meta failed: fake accessor does not support fetch remote table"
978 );
979 }
980 }
981 }
982}