1use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use itertools::Itertools;
20use parking_lot::RwLock;
21use risingwave_common::catalog::ColumnDesc;
22use risingwave_common::hash::{VirtualNode, VnodeCountCompat};
23use risingwave_common::util::row_serde::OrderedRowSerde;
24use risingwave_common::util::sort_util::OrderType;
25use risingwave_hummock_sdk::compaction_group::StateTableId;
26use risingwave_hummock_sdk::key::{TABLE_PREFIX_LEN, get_table_id};
27use risingwave_pb::catalog::Table;
28use risingwave_rpc_client::MetaClient;
29use risingwave_rpc_client::error::{Result as RpcResult, RpcError};
30use thiserror_ext::AsReport;
31
32use crate::hummock::{HummockError, HummockResult};
33
34pub trait FilterKeyExtractor: Send + Sync {
36 fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8];
37}
38
39pub enum FilterKeyExtractorImpl {
40 Schema(SchemaFilterKeyExtractor),
41 FullKey(FullKeyFilterKeyExtractor),
42 Dummy(DummyFilterKeyExtractor),
43 Multi(MultiFilterKeyExtractor),
44 FixedLength(FixedLengthFilterKeyExtractor),
45}
46
47impl FilterKeyExtractorImpl {
48 pub fn from_table(table_catalog: &Table) -> Self {
49 let read_prefix_len = table_catalog.get_read_prefix_len_hint() as usize;
50
51 if read_prefix_len == 0 || read_prefix_len > table_catalog.get_pk().len() {
52 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)
55 } else {
56 FilterKeyExtractorImpl::Schema(SchemaFilterKeyExtractor::new(table_catalog))
57 }
58 }
59}
60
61macro_rules! impl_filter_key_extractor {
62 ($( { $variant_name:ident } ),*) => {
63 impl FilterKeyExtractorImpl {
64 pub fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8]{
65 match self {
66 $( Self::$variant_name(inner) => inner.extract(full_key), )*
67 }
68 }
69 }
70 }
71
72}
73
74macro_rules! for_all_filter_key_extractor_variants {
75 ($macro:ident) => {
76 $macro! {
77 { Schema },
78 { FullKey },
79 { Dummy },
80 { Multi },
81 { FixedLength }
82 }
83 };
84}
85
86for_all_filter_key_extractor_variants! { impl_filter_key_extractor }
87
88#[derive(Default)]
89pub struct FullKeyFilterKeyExtractor;
90
91impl FilterKeyExtractor for FullKeyFilterKeyExtractor {
92 fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
93 user_key
94 }
95}
96
97#[derive(Default)]
98pub struct DummyFilterKeyExtractor;
99impl FilterKeyExtractor for DummyFilterKeyExtractor {
100 fn extract<'a>(&self, _full_key: &'a [u8]) -> &'a [u8] {
101 &[]
102 }
103}
104
105#[derive(Default)]
107pub struct FixedLengthFilterKeyExtractor {
108 fixed_length: usize,
109}
110
111impl FilterKeyExtractor for FixedLengthFilterKeyExtractor {
112 fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8] {
113 &full_key[0..self.fixed_length]
114 }
115}
116
117impl FixedLengthFilterKeyExtractor {
118 pub fn new(fixed_length: usize) -> Self {
119 Self { fixed_length }
120 }
121}
122
123pub struct SchemaFilterKeyExtractor {
126 read_prefix_len: usize,
131 deserializer: OrderedRowSerde,
132 }
135
136impl FilterKeyExtractor for SchemaFilterKeyExtractor {
137 fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8] {
138 if full_key.len() < TABLE_PREFIX_LEN + VirtualNode::SIZE {
139 return &[];
140 }
141
142 let (_table_prefix, key) = full_key.split_at(TABLE_PREFIX_LEN);
143 let (_vnode_prefix, pk) = key.split_at(VirtualNode::SIZE);
144
145 let bloom_filter_key_len = self
149 .deserializer
150 .deserialize_prefix_len(pk, self.read_prefix_len)
151 .unwrap();
152
153 let end_position = TABLE_PREFIX_LEN + VirtualNode::SIZE + bloom_filter_key_len;
154 &full_key[TABLE_PREFIX_LEN + VirtualNode::SIZE..end_position]
155 }
156}
157
158impl SchemaFilterKeyExtractor {
159 pub fn new(table_catalog: &Table) -> Self {
160 let pk_indices: Vec<usize> = table_catalog
161 .pk
162 .iter()
163 .map(|col_order| col_order.column_index as usize)
164 .collect();
165
166 let read_prefix_len = table_catalog.get_read_prefix_len_hint() as usize;
167
168 let data_types = pk_indices
169 .iter()
170 .map(|column_idx| &table_catalog.columns[*column_idx])
171 .map(|col| ColumnDesc::from(col.column_desc.as_ref().unwrap()).data_type)
172 .collect();
173
174 let order_types: Vec<OrderType> = table_catalog
175 .pk
176 .iter()
177 .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
178 .collect();
179
180 Self {
181 read_prefix_len,
182 deserializer: OrderedRowSerde::new(data_types, order_types),
183 }
184 }
185}
186
187#[derive(Default)]
188pub struct MultiFilterKeyExtractor {
189 id_to_filter_key_extractor: HashMap<u32, FilterKeyExtractorImpl>,
190}
191
192impl MultiFilterKeyExtractor {
193 pub fn register(&mut self, table_id: u32, filter_key_extractor: FilterKeyExtractorImpl) {
194 self.id_to_filter_key_extractor
195 .insert(table_id, filter_key_extractor);
196 }
197
198 pub fn size(&self) -> usize {
199 self.id_to_filter_key_extractor.len()
200 }
201
202 pub fn get_existing_table_ids(&self) -> HashSet<u32> {
203 self.id_to_filter_key_extractor.keys().cloned().collect()
204 }
205}
206
207impl Debug for MultiFilterKeyExtractor {
208 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209 write!(f, "MultiFilterKeyExtractor size {} ", self.size())
210 }
211}
212
213impl FilterKeyExtractor for MultiFilterKeyExtractor {
214 fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8] {
215 if full_key.len() < TABLE_PREFIX_LEN + VirtualNode::SIZE {
216 return full_key;
217 }
218
219 let table_id = get_table_id(full_key);
220 self.id_to_filter_key_extractor
221 .get(&table_id)
222 .unwrap()
223 .extract(full_key)
224 }
225}
226
227#[async_trait::async_trait]
228pub trait StateTableAccessor: Send + Sync {
229 async fn get_tables(&self, table_ids: &[u32]) -> RpcResult<HashMap<u32, Table>>;
230}
231
232#[derive(Default)]
233pub struct FakeRemoteTableAccessor {}
234
235pub struct RemoteTableAccessor {
236 meta_client: MetaClient,
237}
238
239impl RemoteTableAccessor {
240 pub fn new(meta_client: MetaClient) -> Self {
241 Self { meta_client }
242 }
243}
244
245#[async_trait::async_trait]
246impl StateTableAccessor for RemoteTableAccessor {
247 async fn get_tables(&self, table_ids: &[u32]) -> RpcResult<HashMap<u32, Table>> {
248 self.meta_client.get_tables(table_ids, true).await
249 }
250}
251
252#[async_trait::async_trait]
253impl StateTableAccessor for FakeRemoteTableAccessor {
254 async fn get_tables(&self, _table_ids: &[u32]) -> RpcResult<HashMap<u32, Table>> {
255 Err(RpcError::Internal(anyhow::anyhow!(
256 "fake accessor does not support fetch remote table"
257 )))
258 }
259}
260
261pub struct CompactionCatalogManager {
263 table_id_to_catalog: RwLock<HashMap<StateTableId, Table>>,
265 table_accessor: Box<dyn StateTableAccessor>,
267}
268
269impl Default for CompactionCatalogManager {
270 fn default() -> Self {
271 Self::new(Box::<FakeRemoteTableAccessor>::default())
272 }
273}
274
275impl CompactionCatalogManager {
276 pub fn new(table_accessor: Box<dyn StateTableAccessor>) -> Self {
277 Self {
278 table_id_to_catalog: Default::default(),
279 table_accessor,
280 }
281 }
282}
283
284impl CompactionCatalogManager {
285 pub fn update(&self, table_id: u32, catalog: Table) {
287 self.table_id_to_catalog.write().insert(table_id, catalog);
288 }
289
290 pub fn sync(&self, catalog_map: HashMap<u32, Table>) {
292 let mut guard = self.table_id_to_catalog.write();
293 guard.clear();
294 guard.extend(catalog_map);
295 }
296
297 pub fn remove(&self, table_id: u32) {
299 self.table_id_to_catalog.write().remove(&table_id);
300 }
301
302 pub async fn acquire(
305 &self,
306 mut table_ids: Vec<StateTableId>,
307 ) -> HummockResult<CompactionCatalogAgentRef> {
308 if table_ids.is_empty() {
309 return Err(HummockError::other("table_id_set is empty"));
314 }
315
316 let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
317 let mut table_id_to_vnode = HashMap::new();
318 let mut table_id_to_watermark_serde = HashMap::new();
319
320 {
321 let guard = self.table_id_to_catalog.read();
322 table_ids.retain(|table_id| match guard.get(table_id) {
323 Some(table_catalog) => {
324 multi_filter_key_extractor
326 .register(*table_id, FilterKeyExtractorImpl::from_table(table_catalog));
327
328 table_id_to_vnode.insert(*table_id, table_catalog.vnode_count());
330
331 table_id_to_watermark_serde
333 .insert(*table_id, build_watermark_col_serde(table_catalog));
334
335 false
336 }
337
338 None => true,
339 });
340 }
341
342 if !table_ids.is_empty() {
343 let mut state_tables =
344 self.table_accessor
345 .get_tables(&table_ids)
346 .await
347 .map_err(|e| {
348 HummockError::other(format!(
349 "request rpc list_tables for meta failed: {}",
350 e.as_report()
351 ))
352 })?;
353
354 let mut guard = self.table_id_to_catalog.write();
355 for table_id in table_ids {
356 if let Some(table) = state_tables.remove(&table_id) {
357 let table_id = table.id;
358 let key_extractor = FilterKeyExtractorImpl::from_table(&table);
359 let vnode = table.vnode_count();
360 let watermark_serde = build_watermark_col_serde(&table);
361 guard.insert(table_id, table);
362 multi_filter_key_extractor.register(table_id, key_extractor);
364
365 table_id_to_vnode.insert(table_id, vnode);
367
368 table_id_to_watermark_serde.insert(table_id, watermark_serde);
370 }
371 }
372 }
373
374 Ok(Arc::new(CompactionCatalogAgent::new(
375 FilterKeyExtractorImpl::Multi(multi_filter_key_extractor),
376 table_id_to_vnode,
377 table_id_to_watermark_serde,
378 )))
379 }
380
381 pub fn build_compaction_catalog_agent(
383 table_catalogs: HashMap<StateTableId, Table>,
384 ) -> CompactionCatalogAgentRef {
385 let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
386 let mut table_id_to_vnode = HashMap::new();
387 let mut table_id_to_watermark_serde = HashMap::new();
388 for (table_id, table_catalog) in table_catalogs {
389 multi_filter_key_extractor
391 .register(table_id, FilterKeyExtractorImpl::from_table(&table_catalog));
392
393 table_id_to_vnode.insert(table_id, table_catalog.vnode_count());
395
396 table_id_to_watermark_serde.insert(table_id, build_watermark_col_serde(&table_catalog));
398 }
399
400 Arc::new(CompactionCatalogAgent::new(
401 FilterKeyExtractorImpl::Multi(multi_filter_key_extractor),
402 table_id_to_vnode,
403 table_id_to_watermark_serde,
404 ))
405 }
406}
407
408pub struct CompactionCatalogAgent {
412 filter_key_extractor_manager: FilterKeyExtractorImpl,
413 table_id_to_vnode: HashMap<StateTableId, usize>,
414 table_id_to_watermark_serde:
417 HashMap<StateTableId, Option<(OrderedRowSerde, OrderedRowSerde, usize)>>,
418}
419
420impl CompactionCatalogAgent {
421 pub fn new(
422 filter_key_extractor_manager: FilterKeyExtractorImpl,
423 table_id_to_vnode: HashMap<StateTableId, usize>,
424 table_id_to_watermark_serde: HashMap<
425 StateTableId,
426 Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
427 >,
428 ) -> Self {
429 Self {
430 filter_key_extractor_manager,
431 table_id_to_vnode,
432 table_id_to_watermark_serde,
433 }
434 }
435
436 pub fn dummy() -> Self {
437 Self {
438 filter_key_extractor_manager: FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
439 table_id_to_vnode: Default::default(),
440 table_id_to_watermark_serde: Default::default(),
441 }
442 }
443
444 pub fn for_test(table_ids: Vec<StateTableId>) -> Arc<Self> {
445 let full_key_filter_key_extractor =
446 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
447
448 let table_id_to_vnode: HashMap<u32, usize> = table_ids
449 .into_iter()
450 .map(|table_id| (table_id, VirtualNode::COUNT_FOR_TEST))
451 .collect();
452
453 let table_id_to_watermark_serde = table_id_to_vnode
454 .keys()
455 .map(|table_id| (*table_id, None))
456 .collect();
457
458 Arc::new(CompactionCatalogAgent::new(
459 full_key_filter_key_extractor,
460 table_id_to_vnode,
461 table_id_to_watermark_serde,
462 ))
463 }
464}
465
466impl CompactionCatalogAgent {
467 pub fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8] {
468 self.filter_key_extractor_manager.extract(full_key)
469 }
470
471 pub fn vnode_count(&self, table_id: StateTableId) -> usize {
472 *self.table_id_to_vnode.get(&table_id).unwrap_or_else(|| {
473 panic!(
474 "table_id not found {} all_table_ids {:?}",
475 table_id,
476 self.table_id_to_vnode.keys()
477 )
478 })
479 }
480
481 pub fn watermark_serde(
482 &self,
483 table_id: StateTableId,
484 ) -> Option<(OrderedRowSerde, OrderedRowSerde, usize)> {
485 self.table_id_to_watermark_serde
486 .get(&table_id)
487 .unwrap_or_else(|| {
488 panic!(
489 "table_id not found {} all_table_ids {:?}",
490 table_id,
491 self.table_id_to_watermark_serde.keys()
492 )
493 })
494 .clone()
495 }
496
497 pub fn table_id_to_vnode_ref(&self) -> &HashMap<StateTableId, usize> {
498 &self.table_id_to_vnode
499 }
500
501 pub fn table_ids(&self) -> impl Iterator<Item = StateTableId> + '_ {
502 self.table_id_to_vnode.keys().cloned()
503 }
504}
505
506pub type CompactionCatalogManagerRef = Arc<CompactionCatalogManager>;
507pub type CompactionCatalogAgentRef = Arc<CompactionCatalogAgent>;
508
509fn build_watermark_col_serde(
510 table_catalog: &Table,
511) -> Option<(OrderedRowSerde, OrderedRowSerde, usize)> {
512 match table_catalog.clean_watermark_index_in_pk {
513 None => {
514 None
516 }
517
518 Some(clean_watermark_index_in_pk) => {
519 use risingwave_common::types::DataType;
520 let table_columns: Vec<ColumnDesc> = table_catalog
521 .columns
522 .iter()
523 .map(|col| col.column_desc.as_ref().unwrap().into())
524 .collect();
525
526 let pk_data_types: Vec<DataType> = table_catalog
527 .pk
528 .iter()
529 .map(|col_order| {
530 table_columns[col_order.column_index as usize]
531 .data_type
532 .clone()
533 })
534 .collect();
535
536 let pk_order_types = table_catalog
537 .pk
538 .iter()
539 .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
540 .collect_vec();
541
542 assert_eq!(pk_data_types.len(), pk_order_types.len());
543 let pk_serde = OrderedRowSerde::new(pk_data_types, pk_order_types);
544 let watermark_col_serde = pk_serde
545 .index(clean_watermark_index_in_pk as usize)
546 .into_owned();
547 Some((
548 pk_serde,
549 watermark_col_serde,
550 clean_watermark_index_in_pk as usize,
551 ))
552 }
553 }
554}
555
556#[cfg(test)]
557mod tests {
558 use std::mem;
559
560 use bytes::{BufMut, BytesMut};
561 use itertools::Itertools;
562 use risingwave_common::catalog::ColumnDesc;
563 use risingwave_common::hash::VirtualNode;
564 use risingwave_common::row::OwnedRow;
565 use risingwave_common::types::DataType;
566 use risingwave_common::types::ScalarImpl::{self};
567 use risingwave_common::util::row_serde::OrderedRowSerde;
568 use risingwave_common::util::sort_util::OrderType;
569 use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN;
570 use risingwave_pb::catalog::table::{PbEngine, TableType};
571 use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable};
572 use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType};
573 use risingwave_pb::plan_common::PbColumnCatalog;
574
575 use super::{DummyFilterKeyExtractor, FilterKeyExtractor, SchemaFilterKeyExtractor};
576 use crate::compaction_catalog_manager::{
577 FilterKeyExtractorImpl, FullKeyFilterKeyExtractor, MultiFilterKeyExtractor,
578 };
579 const fn dummy_vnode() -> [u8; VirtualNode::SIZE] {
580 VirtualNode::from_index(233).to_be_bytes()
581 }
582
583 #[test]
584 fn test_default_filter_key_extractor() {
585 let dummy_filter_key_extractor = DummyFilterKeyExtractor;
586 let full_key = "full_key".as_bytes();
587 let output_key = dummy_filter_key_extractor.extract(full_key);
588
589 assert_eq!("".as_bytes(), output_key);
590
591 let full_key_filter_key_extractor = FullKeyFilterKeyExtractor;
592 let output_key = full_key_filter_key_extractor.extract(full_key);
593
594 assert_eq!(full_key, output_key);
595 }
596
597 fn build_table_with_prefix_column_num(column_count: u32) -> PbTable {
598 PbTable {
599 id: 0,
600 schema_id: 0,
601 database_id: 0,
602 name: "test".to_owned(),
603 table_type: TableType::Table as i32,
604 columns: vec![
605 PbColumnCatalog {
606 column_desc: Some(
607 (&ColumnDesc::named("_row_id", 0.into(), DataType::Int64)).into(),
608 ),
609 is_hidden: true,
610 },
611 PbColumnCatalog {
612 column_desc: Some(
613 (&ColumnDesc::named("col_1", 0.into(), DataType::Int64)).into(),
614 ),
615 is_hidden: false,
616 },
617 PbColumnCatalog {
618 column_desc: Some(
619 (&ColumnDesc::named("col_2", 0.into(), DataType::Float64)).into(),
620 ),
621 is_hidden: false,
622 },
623 PbColumnCatalog {
624 column_desc: Some(
625 (&ColumnDesc::named("col_3", 0.into(), DataType::Varchar)).into(),
626 ),
627 is_hidden: false,
628 },
629 ],
630 pk: vec![
631 PbColumnOrder {
632 column_index: 1,
633 order_type: Some(PbOrderType {
634 direction: PbDirection::Ascending as _,
635 nulls_are: PbNullsAre::Largest as _,
636 }),
637 },
638 PbColumnOrder {
639 column_index: 3,
640 order_type: Some(PbOrderType {
641 direction: PbDirection::Ascending as _,
642 nulls_are: PbNullsAre::Largest as _,
643 }),
644 },
645 ],
646 stream_key: vec![0],
647 dependent_relations: vec![],
648 distribution_key: (0..column_count as i32).collect_vec(),
649 optional_associated_source_id: None,
650 append_only: false,
651 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
652 retention_seconds: Some(300),
653 fragment_id: 0,
654 dml_fragment_id: None,
655 initialized_at_epoch: None,
656 vnode_col_index: None,
657 row_id_index: Some(0),
658 value_indices: vec![0],
659 definition: "".into(),
660 handle_pk_conflict_behavior: 0,
661 version_column_index: None,
662 read_prefix_len_hint: 1,
663 version: None,
664 watermark_indices: vec![],
665 dist_key_in_pk: vec![],
666 cardinality: None,
667 created_at_epoch: None,
668 cleaned_by_watermark: false,
669 stream_job_status: PbStreamJobStatus::Created.into(),
670 create_type: PbCreateType::Foreground.into(),
671 description: None,
672 incoming_sinks: vec![],
673 initialized_at_cluster_version: None,
674 created_at_cluster_version: None,
675 cdc_table_id: None,
676 maybe_vnode_count: None,
677 webhook_info: None,
678 job_id: None,
679 engine: Some(PbEngine::Hummock as i32),
680 clean_watermark_index_in_pk: None,
681 }
682 }
683
684 #[test]
685 fn test_schema_filter_key_extractor() {
686 let prost_table = build_table_with_prefix_column_num(1);
687 let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
688
689 let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
690 let schema = vec![DataType::Int64, DataType::Varchar];
691 let serializer = OrderedRowSerde::new(schema, order_types);
692 let row = OwnedRow::new(vec![
693 Some(ScalarImpl::Int64(100)),
694 Some(ScalarImpl::Utf8("abc".into())),
695 ]);
696 let mut row_bytes = vec![];
697 serializer.serialize(&row, &mut row_bytes);
698
699 let table_prefix = {
700 let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
701 buf.put_u32(1);
702 buf.to_vec()
703 };
704
705 let vnode_prefix = &dummy_vnode()[..];
706
707 let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
708 let output_key = schema_filter_key_extractor.extract(&full_key);
709 assert_eq!(1 + mem::size_of::<i64>(), output_key.len());
710 }
711
712 #[test]
713 fn test_multi_filter_key_extractor() {
714 let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
715 {
716 let prost_table = build_table_with_prefix_column_num(1);
718 let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
719 multi_filter_key_extractor.register(
720 1,
721 FilterKeyExtractorImpl::Schema(schema_filter_key_extractor),
722 );
723 let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
724 let schema = vec![DataType::Int64, DataType::Varchar];
725 let serializer = OrderedRowSerde::new(schema, order_types);
726 let row = OwnedRow::new(vec![
727 Some(ScalarImpl::Int64(100)),
728 Some(ScalarImpl::Utf8("abc".into())),
729 ]);
730 let mut row_bytes = vec![];
731 serializer.serialize(&row, &mut row_bytes);
732
733 let table_prefix = {
734 let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
735 buf.put_u32(1);
736 buf.to_vec()
737 };
738
739 let vnode_prefix = &dummy_vnode()[..];
740
741 let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
742 let output_key = multi_filter_key_extractor.extract(&full_key);
743
744 let data_types = vec![DataType::Int64];
745 let order_types = vec![OrderType::ascending()];
746 let deserializer = OrderedRowSerde::new(data_types, order_types);
747
748 let pk_prefix_len = deserializer.deserialize_prefix_len(&row_bytes, 1).unwrap();
749 assert_eq!(pk_prefix_len, output_key.len());
750 }
751
752 {
753 let prost_table = build_table_with_prefix_column_num(2);
755 let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
756 multi_filter_key_extractor.register(
757 2,
758 FilterKeyExtractorImpl::Schema(schema_filter_key_extractor),
759 );
760 let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
761 let schema = vec![DataType::Int64, DataType::Varchar];
762 let serializer = OrderedRowSerde::new(schema, order_types);
763 let row = OwnedRow::new(vec![
764 Some(ScalarImpl::Int64(100)),
765 Some(ScalarImpl::Utf8("abc".into())),
766 ]);
767 let mut row_bytes = vec![];
768 serializer.serialize(&row, &mut row_bytes);
769
770 let table_prefix = {
771 let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
772 buf.put_u32(2);
773 buf.to_vec()
774 };
775
776 let vnode_prefix = &dummy_vnode()[..];
777
778 let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
779 let output_key = multi_filter_key_extractor.extract(&full_key);
780
781 let data_types = vec![DataType::Int64, DataType::Varchar];
782 let order_types = vec![OrderType::ascending(), OrderType::ascending()];
783 let deserializer = OrderedRowSerde::new(data_types, order_types);
784
785 let pk_prefix_len = deserializer.deserialize_prefix_len(&row_bytes, 1).unwrap();
786
787 assert_eq!(pk_prefix_len, output_key.len());
788 }
789 }
790
791 #[tokio::test]
792 async fn test_compaction_catalog_manager_exception() {
793 let compaction_catalog_manager = super::CompactionCatalogManager::default();
794
795 {
796 let ret = compaction_catalog_manager.acquire(vec![]).await;
797 assert!(ret.is_err());
798 if let Err(e) = ret {
799 assert_eq!(e.to_string(), "Other error: table_id_set is empty");
800 }
801 }
802
803 {
804 let ret = compaction_catalog_manager.acquire(vec![1]).await;
806 assert!(ret.is_err());
807 if let Err(e) = ret {
808 assert_eq!(
809 e.to_string(),
810 "Other error: request rpc list_tables for meta failed: fake accessor does not support fetch remote table"
811 );
812 }
813 }
814 }
815}