1use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use itertools::Itertools;
20use parking_lot::RwLock;
21use risingwave_common::catalog::{ColumnDesc, TableId};
22use risingwave_common::hash::{VirtualNode, VnodeCountCompat};
23use risingwave_common::util::row_serde::OrderedRowSerde;
24use risingwave_common::util::sort_util::OrderType;
25use risingwave_hummock_sdk::compaction_group::StateTableId;
26use risingwave_hummock_sdk::key::{TABLE_PREFIX_LEN, get_table_id};
27use risingwave_pb::catalog::Table;
28use risingwave_rpc_client::MetaClient;
29use risingwave_rpc_client::error::{Result as RpcResult, RpcError};
30use thiserror_ext::AsReport;
31
32use crate::hummock::{HummockError, HummockResult};
33
34pub trait FilterKeyExtractor: Send + Sync {
36 fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8];
37}
38
39pub enum FilterKeyExtractorImpl {
40 Schema(SchemaFilterKeyExtractor),
41 FullKey(FullKeyFilterKeyExtractor),
42 Dummy(DummyFilterKeyExtractor),
43 Multi(MultiFilterKeyExtractor),
44 FixedLength(FixedLengthFilterKeyExtractor),
45}
46
47impl FilterKeyExtractorImpl {
48 pub fn from_table(table_catalog: &Table) -> Self {
49 let read_prefix_len = table_catalog.get_read_prefix_len_hint() as usize;
50
51 if read_prefix_len == 0 || read_prefix_len > table_catalog.get_pk().len() {
52 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)
55 } else {
56 FilterKeyExtractorImpl::Schema(SchemaFilterKeyExtractor::new(table_catalog))
57 }
58 }
59}
60
61macro_rules! impl_filter_key_extractor {
62 ($( { $variant_name:ident } ),*) => {
63 impl FilterKeyExtractorImpl {
64 pub fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8]{
65 match self {
66 $( Self::$variant_name(inner) => inner.extract(full_key), )*
67 }
68 }
69 }
70 }
71
72}
73
74macro_rules! for_all_filter_key_extractor_variants {
75 ($macro:ident) => {
76 $macro! {
77 { Schema },
78 { FullKey },
79 { Dummy },
80 { Multi },
81 { FixedLength }
82 }
83 };
84}
85
86for_all_filter_key_extractor_variants! { impl_filter_key_extractor }
87
88#[derive(Default)]
89pub struct FullKeyFilterKeyExtractor;
90
91impl FilterKeyExtractor for FullKeyFilterKeyExtractor {
92 fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
93 user_key
94 }
95}
96
97#[derive(Default)]
98pub struct DummyFilterKeyExtractor;
99impl FilterKeyExtractor for DummyFilterKeyExtractor {
100 fn extract<'a>(&self, _full_key: &'a [u8]) -> &'a [u8] {
101 &[]
102 }
103}
104
105#[derive(Default)]
107pub struct FixedLengthFilterKeyExtractor {
108 fixed_length: usize,
109}
110
111impl FilterKeyExtractor for FixedLengthFilterKeyExtractor {
112 fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8] {
113 &full_key[0..self.fixed_length]
114 }
115}
116
117impl FixedLengthFilterKeyExtractor {
118 pub fn new(fixed_length: usize) -> Self {
119 Self { fixed_length }
120 }
121}
122
123pub struct SchemaFilterKeyExtractor {
126 read_prefix_len: usize,
131 deserializer: OrderedRowSerde,
132 }
135
136impl FilterKeyExtractor for SchemaFilterKeyExtractor {
137 fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8] {
138 if full_key.len() < TABLE_PREFIX_LEN + VirtualNode::SIZE {
139 return &[];
140 }
141
142 let (_table_prefix, key) = full_key.split_at(TABLE_PREFIX_LEN);
143 let (_vnode_prefix, pk) = key.split_at(VirtualNode::SIZE);
144
145 let bloom_filter_key_len = self
149 .deserializer
150 .deserialize_prefix_len(pk, self.read_prefix_len)
151 .unwrap();
152
153 let end_position = TABLE_PREFIX_LEN + VirtualNode::SIZE + bloom_filter_key_len;
154 &full_key[TABLE_PREFIX_LEN + VirtualNode::SIZE..end_position]
155 }
156}
157
158impl SchemaFilterKeyExtractor {
159 pub fn new(table_catalog: &Table) -> Self {
160 let pk_indices: Vec<usize> = table_catalog
161 .pk
162 .iter()
163 .map(|col_order| col_order.column_index as usize)
164 .collect();
165
166 let read_prefix_len = table_catalog.get_read_prefix_len_hint() as usize;
167
168 let data_types = pk_indices
169 .iter()
170 .map(|column_idx| &table_catalog.columns[*column_idx])
171 .map(|col| ColumnDesc::from(col.column_desc.as_ref().unwrap()).data_type)
172 .collect();
173
174 let order_types: Vec<OrderType> = table_catalog
175 .pk
176 .iter()
177 .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
178 .collect();
179
180 Self {
181 read_prefix_len,
182 deserializer: OrderedRowSerde::new(data_types, order_types),
183 }
184 }
185}
186
187#[derive(Default)]
188pub struct MultiFilterKeyExtractor {
189 id_to_filter_key_extractor: HashMap<TableId, FilterKeyExtractorImpl>,
190}
191
192impl MultiFilterKeyExtractor {
193 pub fn register(&mut self, table_id: TableId, filter_key_extractor: FilterKeyExtractorImpl) {
194 self.id_to_filter_key_extractor
195 .insert(table_id, filter_key_extractor);
196 }
197
198 pub fn size(&self) -> usize {
199 self.id_to_filter_key_extractor.len()
200 }
201
202 pub fn get_existing_table_ids(&self) -> HashSet<TableId> {
203 self.id_to_filter_key_extractor.keys().cloned().collect()
204 }
205}
206
207impl Debug for MultiFilterKeyExtractor {
208 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209 write!(f, "MultiFilterKeyExtractor size {} ", self.size())
210 }
211}
212
213impl FilterKeyExtractor for MultiFilterKeyExtractor {
214 fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8] {
215 if full_key.len() < TABLE_PREFIX_LEN + VirtualNode::SIZE {
216 return full_key;
217 }
218
219 let table_id = get_table_id(full_key).into();
220 self.id_to_filter_key_extractor
221 .get(&table_id)
222 .unwrap()
223 .extract(full_key)
224 }
225}
226
227#[async_trait::async_trait]
228pub trait StateTableAccessor: Send + Sync {
229 async fn get_tables(&self, table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>>;
230}
231
232#[derive(Default)]
233pub struct FakeRemoteTableAccessor {}
234
235pub struct RemoteTableAccessor {
236 meta_client: MetaClient,
237}
238
239impl RemoteTableAccessor {
240 pub fn new(meta_client: MetaClient) -> Self {
241 Self { meta_client }
242 }
243}
244
245#[async_trait::async_trait]
246impl StateTableAccessor for RemoteTableAccessor {
247 async fn get_tables(&self, table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>> {
248 self.meta_client.get_tables(table_ids.to_vec(), true).await
249 }
250}
251
252#[async_trait::async_trait]
253impl StateTableAccessor for FakeRemoteTableAccessor {
254 async fn get_tables(&self, _table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>> {
255 Err(RpcError::Internal(anyhow::anyhow!(
256 "fake accessor does not support fetch remote table"
257 )))
258 }
259}
260
261pub struct CompactionCatalogManager {
263 table_id_to_catalog: RwLock<HashMap<StateTableId, Table>>,
265 table_accessor: Box<dyn StateTableAccessor>,
267}
268
269impl Default for CompactionCatalogManager {
270 fn default() -> Self {
271 Self::new(Box::<FakeRemoteTableAccessor>::default())
272 }
273}
274
275impl CompactionCatalogManager {
276 pub fn new(table_accessor: Box<dyn StateTableAccessor>) -> Self {
277 Self {
278 table_id_to_catalog: Default::default(),
279 table_accessor,
280 }
281 }
282}
283
284impl CompactionCatalogManager {
285 pub fn update(&self, table_id: TableId, catalog: Table) {
287 self.table_id_to_catalog.write().insert(table_id, catalog);
288 }
289
290 pub fn sync(&self, catalog_map: HashMap<TableId, Table>) {
292 let mut guard = self.table_id_to_catalog.write();
293 guard.clear();
294 guard.extend(catalog_map);
295 }
296
297 pub fn remove(&self, table_id: TableId) {
299 self.table_id_to_catalog.write().remove(&table_id);
300 }
301
302 pub async fn acquire(
305 &self,
306 mut table_ids: Vec<StateTableId>,
307 ) -> HummockResult<CompactionCatalogAgentRef> {
308 if table_ids.is_empty() {
309 return Err(HummockError::other("table_id_set is empty"));
314 }
315
316 let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
317 let mut table_id_to_vnode = HashMap::new();
318 let mut table_id_to_watermark_serde = HashMap::new();
319
320 {
321 let guard = self.table_id_to_catalog.read();
322 table_ids.retain(|table_id| match guard.get(table_id) {
323 Some(table_catalog) => {
324 multi_filter_key_extractor
326 .register(*table_id, FilterKeyExtractorImpl::from_table(table_catalog));
327
328 table_id_to_vnode.insert(*table_id, table_catalog.vnode_count());
330
331 table_id_to_watermark_serde
333 .insert(*table_id, build_watermark_col_serde(table_catalog));
334
335 false
336 }
337
338 None => true,
339 });
340 }
341
342 if !table_ids.is_empty() {
343 let mut state_tables =
344 self.table_accessor
345 .get_tables(&table_ids)
346 .await
347 .map_err(|e| {
348 HummockError::other(format!(
349 "request rpc list_tables for meta failed: {}",
350 e.as_report()
351 ))
352 })?;
353
354 let mut guard = self.table_id_to_catalog.write();
355 for table_id in table_ids {
356 if let Some(table) = state_tables.remove(&table_id) {
357 let table_id = table.id;
358 let key_extractor = FilterKeyExtractorImpl::from_table(&table);
359 let vnode = table.vnode_count();
360 let watermark_serde = build_watermark_col_serde(&table);
361 guard.insert(table_id, table);
362 multi_filter_key_extractor.register(table_id, key_extractor);
364
365 table_id_to_vnode.insert(table_id, vnode);
367
368 table_id_to_watermark_serde.insert(table_id, watermark_serde);
370 }
371 }
372 }
373
374 Ok(Arc::new(CompactionCatalogAgent::new(
375 FilterKeyExtractorImpl::Multi(multi_filter_key_extractor),
376 table_id_to_vnode,
377 table_id_to_watermark_serde,
378 )))
379 }
380
381 pub fn build_compaction_catalog_agent(
383 table_catalogs: HashMap<StateTableId, Table>,
384 ) -> CompactionCatalogAgentRef {
385 let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
386 let mut table_id_to_vnode = HashMap::new();
387 let mut table_id_to_watermark_serde = HashMap::new();
388 for (table_id, table_catalog) in table_catalogs {
389 multi_filter_key_extractor
391 .register(table_id, FilterKeyExtractorImpl::from_table(&table_catalog));
392
393 table_id_to_vnode.insert(table_id, table_catalog.vnode_count());
395
396 table_id_to_watermark_serde.insert(table_id, build_watermark_col_serde(&table_catalog));
398 }
399
400 Arc::new(CompactionCatalogAgent::new(
401 FilterKeyExtractorImpl::Multi(multi_filter_key_extractor),
402 table_id_to_vnode,
403 table_id_to_watermark_serde,
404 ))
405 }
406}
407
408pub struct CompactionCatalogAgent {
412 filter_key_extractor_manager: FilterKeyExtractorImpl,
413 table_id_to_vnode: HashMap<StateTableId, usize>,
414 table_id_to_watermark_serde:
417 HashMap<StateTableId, Option<(OrderedRowSerde, OrderedRowSerde, usize)>>,
418}
419
420impl CompactionCatalogAgent {
421 pub fn new(
422 filter_key_extractor_manager: FilterKeyExtractorImpl,
423 table_id_to_vnode: HashMap<StateTableId, usize>,
424 table_id_to_watermark_serde: HashMap<
425 StateTableId,
426 Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
427 >,
428 ) -> Self {
429 Self {
430 filter_key_extractor_manager,
431 table_id_to_vnode,
432 table_id_to_watermark_serde,
433 }
434 }
435
436 pub fn dummy() -> Self {
437 Self {
438 filter_key_extractor_manager: FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
439 table_id_to_vnode: Default::default(),
440 table_id_to_watermark_serde: Default::default(),
441 }
442 }
443
444 pub fn for_test(table_ids: Vec<impl Into<StateTableId>>) -> Arc<Self> {
445 let full_key_filter_key_extractor =
446 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
447
448 let table_id_to_vnode: HashMap<TableId, usize> = table_ids
449 .into_iter()
450 .map(|table_id| (table_id.into(), VirtualNode::COUNT_FOR_TEST))
451 .collect();
452
453 let table_id_to_watermark_serde = table_id_to_vnode
454 .keys()
455 .map(|table_id| (*table_id, None))
456 .collect();
457
458 Arc::new(CompactionCatalogAgent::new(
459 full_key_filter_key_extractor,
460 table_id_to_vnode,
461 table_id_to_watermark_serde,
462 ))
463 }
464}
465
466impl CompactionCatalogAgent {
467 pub fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8] {
468 self.filter_key_extractor_manager.extract(full_key)
469 }
470
471 pub fn vnode_count(&self, table_id: StateTableId) -> usize {
472 *self.table_id_to_vnode.get(&table_id).unwrap_or_else(|| {
473 panic!(
474 "table_id not found {} all_table_ids {:?}",
475 table_id,
476 self.table_id_to_vnode.keys()
477 )
478 })
479 }
480
481 pub fn watermark_serde(
482 &self,
483 table_id: StateTableId,
484 ) -> Option<(OrderedRowSerde, OrderedRowSerde, usize)> {
485 self.table_id_to_watermark_serde
486 .get(&table_id)
487 .unwrap_or_else(|| {
488 panic!(
489 "table_id not found {} all_table_ids {:?}",
490 table_id,
491 self.table_id_to_watermark_serde.keys()
492 )
493 })
494 .clone()
495 }
496
497 pub fn table_id_to_vnode_ref(&self) -> &HashMap<StateTableId, usize> {
498 &self.table_id_to_vnode
499 }
500
501 pub fn table_ids(&self) -> impl Iterator<Item = StateTableId> + '_ {
502 self.table_id_to_vnode.keys().cloned()
503 }
504}
505
506pub type CompactionCatalogManagerRef = Arc<CompactionCatalogManager>;
507pub type CompactionCatalogAgentRef = Arc<CompactionCatalogAgent>;
508
509fn build_watermark_col_serde(
510 table_catalog: &Table,
511) -> Option<(OrderedRowSerde, OrderedRowSerde, usize)> {
512 let clean_watermark_index_in_pk = table_catalog.get_clean_watermark_index_in_pk_compat();
514
515 match clean_watermark_index_in_pk {
516 None => {
517 None
519 }
520
521 Some(clean_watermark_index_in_pk) => {
522 use risingwave_common::types::DataType;
523 let table_columns: Vec<ColumnDesc> = table_catalog
524 .columns
525 .iter()
526 .map(|col| col.column_desc.as_ref().unwrap().into())
527 .collect();
528
529 let pk_data_types: Vec<DataType> = table_catalog
530 .pk
531 .iter()
532 .map(|col_order| {
533 table_columns[col_order.column_index as usize]
534 .data_type
535 .clone()
536 })
537 .collect();
538
539 let pk_order_types = table_catalog
540 .pk
541 .iter()
542 .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
543 .collect_vec();
544
545 assert_eq!(pk_data_types.len(), pk_order_types.len());
546 let pk_serde = OrderedRowSerde::new(pk_data_types, pk_order_types);
547 let watermark_col_serde = pk_serde.index(clean_watermark_index_in_pk).into_owned();
548 Some((pk_serde, watermark_col_serde, clean_watermark_index_in_pk))
549 }
550 }
551}
552
553#[cfg(test)]
554mod tests {
555 use std::mem;
556
557 use bytes::{BufMut, BytesMut};
558 use itertools::Itertools;
559 use risingwave_common::catalog::ColumnDesc;
560 use risingwave_common::hash::VirtualNode;
561 use risingwave_common::row::OwnedRow;
562 use risingwave_common::types::DataType;
563 use risingwave_common::types::ScalarImpl::{self};
564 use risingwave_common::util::row_serde::OrderedRowSerde;
565 use risingwave_common::util::sort_util::OrderType;
566 use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN;
567 use risingwave_pb::catalog::table::{PbEngine, TableType};
568 use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable};
569 use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType};
570 use risingwave_pb::plan_common::PbColumnCatalog;
571
572 use super::{DummyFilterKeyExtractor, FilterKeyExtractor, SchemaFilterKeyExtractor};
573 use crate::compaction_catalog_manager::{
574 FilterKeyExtractorImpl, FullKeyFilterKeyExtractor, MultiFilterKeyExtractor,
575 };
576 const fn dummy_vnode() -> [u8; VirtualNode::SIZE] {
577 VirtualNode::from_index(233).to_be_bytes()
578 }
579
580 #[test]
581 fn test_default_filter_key_extractor() {
582 let dummy_filter_key_extractor = DummyFilterKeyExtractor;
583 let full_key = "full_key".as_bytes();
584 let output_key = dummy_filter_key_extractor.extract(full_key);
585
586 assert_eq!("".as_bytes(), output_key);
587
588 let full_key_filter_key_extractor = FullKeyFilterKeyExtractor;
589 let output_key = full_key_filter_key_extractor.extract(full_key);
590
591 assert_eq!(full_key, output_key);
592 }
593
594 fn build_table_with_prefix_column_num(column_count: u32) -> PbTable {
595 PbTable {
596 id: 0.into(),
597 schema_id: 0.into(),
598 database_id: 0.into(),
599 name: "test".to_owned(),
600 table_type: TableType::Table as i32,
601 columns: vec![
602 PbColumnCatalog {
603 column_desc: Some(
604 (&ColumnDesc::named("_row_id", 0.into(), DataType::Int64)).into(),
605 ),
606 is_hidden: true,
607 },
608 PbColumnCatalog {
609 column_desc: Some(
610 (&ColumnDesc::named("col_1", 0.into(), DataType::Int64)).into(),
611 ),
612 is_hidden: false,
613 },
614 PbColumnCatalog {
615 column_desc: Some(
616 (&ColumnDesc::named("col_2", 0.into(), DataType::Float64)).into(),
617 ),
618 is_hidden: false,
619 },
620 PbColumnCatalog {
621 column_desc: Some(
622 (&ColumnDesc::named("col_3", 0.into(), DataType::Varchar)).into(),
623 ),
624 is_hidden: false,
625 },
626 ],
627 pk: vec![
628 PbColumnOrder {
629 column_index: 1,
630 order_type: Some(PbOrderType {
631 direction: PbDirection::Ascending as _,
632 nulls_are: PbNullsAre::Largest as _,
633 }),
634 },
635 PbColumnOrder {
636 column_index: 3,
637 order_type: Some(PbOrderType {
638 direction: PbDirection::Ascending as _,
639 nulls_are: PbNullsAre::Largest as _,
640 }),
641 },
642 ],
643 stream_key: vec![0],
644 distribution_key: (0..column_count as i32).collect_vec(),
645 optional_associated_source_id: None,
646 append_only: false,
647 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
648 retention_seconds: Some(300),
649 fragment_id: 0.into(),
650 dml_fragment_id: None,
651 initialized_at_epoch: None,
652 vnode_col_index: None,
653 row_id_index: Some(0),
654 value_indices: vec![0],
655 definition: "".into(),
656 handle_pk_conflict_behavior: 0,
657 version_column_indices: vec![],
658 read_prefix_len_hint: 1,
659 version: None,
660 watermark_indices: vec![],
661 dist_key_in_pk: vec![],
662 cardinality: None,
663 created_at_epoch: None,
664 cleaned_by_watermark: false,
665 stream_job_status: PbStreamJobStatus::Created.into(),
666 create_type: PbCreateType::Foreground.into(),
667 description: None,
668 #[expect(deprecated)]
669 incoming_sinks: Default::default(),
670 initialized_at_cluster_version: None,
671 created_at_cluster_version: None,
672 cdc_table_id: None,
673 maybe_vnode_count: None,
674 webhook_info: None,
675 job_id: None,
676 engine: Some(PbEngine::Hummock as i32),
677 #[expect(deprecated)]
678 clean_watermark_index_in_pk: None,
679 clean_watermark_indices: vec![],
680 refreshable: false,
681 vector_index_info: None,
682 cdc_table_type: None,
683 }
684 }
685
686 #[test]
687 fn test_schema_filter_key_extractor() {
688 let prost_table = build_table_with_prefix_column_num(1);
689 let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
690
691 let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
692 let schema = vec![DataType::Int64, DataType::Varchar];
693 let serializer = OrderedRowSerde::new(schema, order_types);
694 let row = OwnedRow::new(vec![
695 Some(ScalarImpl::Int64(100)),
696 Some(ScalarImpl::Utf8("abc".into())),
697 ]);
698 let mut row_bytes = vec![];
699 serializer.serialize(&row, &mut row_bytes);
700
701 let table_prefix = {
702 let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
703 buf.put_u32(1);
704 buf.to_vec()
705 };
706
707 let vnode_prefix = &dummy_vnode()[..];
708
709 let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
710 let output_key = schema_filter_key_extractor.extract(&full_key);
711 assert_eq!(1 + mem::size_of::<i64>(), output_key.len());
712 }
713
714 #[test]
715 fn test_multi_filter_key_extractor() {
716 let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
717 {
718 let prost_table = build_table_with_prefix_column_num(1);
720 let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
721 multi_filter_key_extractor.register(
722 1.into(),
723 FilterKeyExtractorImpl::Schema(schema_filter_key_extractor),
724 );
725 let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
726 let schema = vec![DataType::Int64, DataType::Varchar];
727 let serializer = OrderedRowSerde::new(schema, order_types);
728 let row = OwnedRow::new(vec![
729 Some(ScalarImpl::Int64(100)),
730 Some(ScalarImpl::Utf8("abc".into())),
731 ]);
732 let mut row_bytes = vec![];
733 serializer.serialize(&row, &mut row_bytes);
734
735 let table_prefix = {
736 let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
737 buf.put_u32(1);
738 buf.to_vec()
739 };
740
741 let vnode_prefix = &dummy_vnode()[..];
742
743 let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
744 let output_key = multi_filter_key_extractor.extract(&full_key);
745
746 let data_types = vec![DataType::Int64];
747 let order_types = vec![OrderType::ascending()];
748 let deserializer = OrderedRowSerde::new(data_types, order_types);
749
750 let pk_prefix_len = deserializer.deserialize_prefix_len(&row_bytes, 1).unwrap();
751 assert_eq!(pk_prefix_len, output_key.len());
752 }
753
754 {
755 let prost_table = build_table_with_prefix_column_num(2);
757 let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
758 multi_filter_key_extractor.register(
759 2.into(),
760 FilterKeyExtractorImpl::Schema(schema_filter_key_extractor),
761 );
762 let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
763 let schema = vec![DataType::Int64, DataType::Varchar];
764 let serializer = OrderedRowSerde::new(schema, order_types);
765 let row = OwnedRow::new(vec![
766 Some(ScalarImpl::Int64(100)),
767 Some(ScalarImpl::Utf8("abc".into())),
768 ]);
769 let mut row_bytes = vec![];
770 serializer.serialize(&row, &mut row_bytes);
771
772 let table_prefix = {
773 let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
774 buf.put_u32(2);
775 buf.to_vec()
776 };
777
778 let vnode_prefix = &dummy_vnode()[..];
779
780 let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
781 let output_key = multi_filter_key_extractor.extract(&full_key);
782
783 let data_types = vec![DataType::Int64, DataType::Varchar];
784 let order_types = vec![OrderType::ascending(), OrderType::ascending()];
785 let deserializer = OrderedRowSerde::new(data_types, order_types);
786
787 let pk_prefix_len = deserializer.deserialize_prefix_len(&row_bytes, 1).unwrap();
788
789 assert_eq!(pk_prefix_len, output_key.len());
790 }
791 }
792
793 #[tokio::test]
794 async fn test_compaction_catalog_manager_exception() {
795 let compaction_catalog_manager = super::CompactionCatalogManager::default();
796
797 {
798 let ret = compaction_catalog_manager.acquire(vec![]).await;
799 assert!(ret.is_err());
800 if let Err(e) = ret {
801 assert_eq!(e.to_string(), "Other error: table_id_set is empty");
802 }
803 }
804
805 {
806 let ret = compaction_catalog_manager.acquire(vec![1.into()]).await;
808 assert!(ret.is_err());
809 if let Err(e) = ret {
810 assert_eq!(
811 e.to_string(),
812 "Other error: request rpc list_tables for meta failed: fake accessor does not support fetch remote table"
813 );
814 }
815 }
816 }
817}