risingwave_storage/
compaction_catalog_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use itertools::Itertools;
20use parking_lot::RwLock;
21use risingwave_common::catalog::ColumnDesc;
22use risingwave_common::hash::{VirtualNode, VnodeCountCompat};
23use risingwave_common::util::row_serde::OrderedRowSerde;
24use risingwave_common::util::sort_util::OrderType;
25use risingwave_hummock_sdk::compaction_group::StateTableId;
26use risingwave_hummock_sdk::key::{TABLE_PREFIX_LEN, get_table_id};
27use risingwave_pb::catalog::Table;
28use risingwave_rpc_client::MetaClient;
29use risingwave_rpc_client::error::{Result as RpcResult, RpcError};
30use thiserror_ext::AsReport;
31
32use crate::hummock::{HummockError, HummockResult};
33
34/// `FilterKeyExtractor` generally used to extract key which will store in BloomFilter
35pub trait FilterKeyExtractor: Send + Sync {
36    fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8];
37}
38
39pub enum FilterKeyExtractorImpl {
40    Schema(SchemaFilterKeyExtractor),
41    FullKey(FullKeyFilterKeyExtractor),
42    Dummy(DummyFilterKeyExtractor),
43    Multi(MultiFilterKeyExtractor),
44    FixedLength(FixedLengthFilterKeyExtractor),
45}
46
47impl FilterKeyExtractorImpl {
48    pub fn from_table(table_catalog: &Table) -> Self {
49        let read_prefix_len = table_catalog.get_read_prefix_len_hint() as usize;
50
51        if read_prefix_len == 0 || read_prefix_len > table_catalog.get_pk().len() {
52            // for now frontend had not infer the table_id_to_filter_key_extractor, so we
53            // use FullKeyFilterKeyExtractor
54            FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)
55        } else {
56            FilterKeyExtractorImpl::Schema(SchemaFilterKeyExtractor::new(table_catalog))
57        }
58    }
59}
60
61macro_rules! impl_filter_key_extractor {
62    ($( { $variant_name:ident } ),*) => {
63        impl FilterKeyExtractorImpl {
64            pub fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8]{
65                match self {
66                    $( Self::$variant_name(inner) => inner.extract(full_key), )*
67                }
68            }
69        }
70    }
71
72}
73
74macro_rules! for_all_filter_key_extractor_variants {
75    ($macro:ident) => {
76        $macro! {
77            { Schema },
78            { FullKey },
79            { Dummy },
80            { Multi },
81            { FixedLength }
82        }
83    };
84}
85
86for_all_filter_key_extractor_variants! { impl_filter_key_extractor }
87
88#[derive(Default)]
89pub struct FullKeyFilterKeyExtractor;
90
91impl FilterKeyExtractor for FullKeyFilterKeyExtractor {
92    fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
93        user_key
94    }
95}
96
97#[derive(Default)]
98pub struct DummyFilterKeyExtractor;
99impl FilterKeyExtractor for DummyFilterKeyExtractor {
100    fn extract<'a>(&self, _full_key: &'a [u8]) -> &'a [u8] {
101        &[]
102    }
103}
104
105/// [`SchemaFilterKeyExtractor`] build from `table_catalog` and extract a `full_key` to prefix for
106#[derive(Default)]
107pub struct FixedLengthFilterKeyExtractor {
108    fixed_length: usize,
109}
110
111impl FilterKeyExtractor for FixedLengthFilterKeyExtractor {
112    fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8] {
113        &full_key[0..self.fixed_length]
114    }
115}
116
117impl FixedLengthFilterKeyExtractor {
118    pub fn new(fixed_length: usize) -> Self {
119        Self { fixed_length }
120    }
121}
122
123/// [`SchemaFilterKeyExtractor`] build from `table_catalog` and transform a `full_key` to prefix for
124/// `prefix_bloom_filter`
125pub struct SchemaFilterKeyExtractor {
126    /// Each stateful operator has its own read pattern, partly using prefix scan.
127    /// Prefix key length can be decoded through its `DataType` and `OrderType` which obtained from
128    /// `TableCatalog`. `read_pattern_prefix_column` means the count of column to decode prefix
129    /// from storage key.
130    read_prefix_len: usize,
131    deserializer: OrderedRowSerde,
132    // TODO:need some bench test for same prefix case like join (if we need a prefix_cache for same
133    // prefix_key)
134}
135
136impl FilterKeyExtractor for SchemaFilterKeyExtractor {
137    fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8] {
138        if full_key.len() < TABLE_PREFIX_LEN + VirtualNode::SIZE {
139            return &[];
140        }
141
142        let (_table_prefix, key) = full_key.split_at(TABLE_PREFIX_LEN);
143        let (_vnode_prefix, pk) = key.split_at(VirtualNode::SIZE);
144
145        // if the key with table_id deserializer fail from schema, that should panic here for early
146        // detection.
147
148        let bloom_filter_key_len = self
149            .deserializer
150            .deserialize_prefix_len(pk, self.read_prefix_len)
151            .unwrap();
152
153        let end_position = TABLE_PREFIX_LEN + VirtualNode::SIZE + bloom_filter_key_len;
154        &full_key[TABLE_PREFIX_LEN + VirtualNode::SIZE..end_position]
155    }
156}
157
158impl SchemaFilterKeyExtractor {
159    pub fn new(table_catalog: &Table) -> Self {
160        let pk_indices: Vec<usize> = table_catalog
161            .pk
162            .iter()
163            .map(|col_order| col_order.column_index as usize)
164            .collect();
165
166        let read_prefix_len = table_catalog.get_read_prefix_len_hint() as usize;
167
168        let data_types = pk_indices
169            .iter()
170            .map(|column_idx| &table_catalog.columns[*column_idx])
171            .map(|col| ColumnDesc::from(col.column_desc.as_ref().unwrap()).data_type)
172            .collect();
173
174        let order_types: Vec<OrderType> = table_catalog
175            .pk
176            .iter()
177            .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
178            .collect();
179
180        Self {
181            read_prefix_len,
182            deserializer: OrderedRowSerde::new(data_types, order_types),
183        }
184    }
185}
186
187#[derive(Default)]
188pub struct MultiFilterKeyExtractor {
189    id_to_filter_key_extractor: HashMap<u32, FilterKeyExtractorImpl>,
190}
191
192impl MultiFilterKeyExtractor {
193    pub fn register(&mut self, table_id: u32, filter_key_extractor: FilterKeyExtractorImpl) {
194        self.id_to_filter_key_extractor
195            .insert(table_id, filter_key_extractor);
196    }
197
198    pub fn size(&self) -> usize {
199        self.id_to_filter_key_extractor.len()
200    }
201
202    pub fn get_existing_table_ids(&self) -> HashSet<u32> {
203        self.id_to_filter_key_extractor.keys().cloned().collect()
204    }
205}
206
207impl Debug for MultiFilterKeyExtractor {
208    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209        write!(f, "MultiFilterKeyExtractor size {} ", self.size())
210    }
211}
212
213impl FilterKeyExtractor for MultiFilterKeyExtractor {
214    fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8] {
215        if full_key.len() < TABLE_PREFIX_LEN + VirtualNode::SIZE {
216            return full_key;
217        }
218
219        let table_id = get_table_id(full_key);
220        self.id_to_filter_key_extractor
221            .get(&table_id)
222            .unwrap()
223            .extract(full_key)
224    }
225}
226
227#[async_trait::async_trait]
228pub trait StateTableAccessor: Send + Sync {
229    async fn get_tables(&self, table_ids: &[u32]) -> RpcResult<HashMap<u32, Table>>;
230}
231
232#[derive(Default)]
233pub struct FakeRemoteTableAccessor {}
234
235pub struct RemoteTableAccessor {
236    meta_client: MetaClient,
237}
238
239impl RemoteTableAccessor {
240    pub fn new(meta_client: MetaClient) -> Self {
241        Self { meta_client }
242    }
243}
244
245#[async_trait::async_trait]
246impl StateTableAccessor for RemoteTableAccessor {
247    async fn get_tables(&self, table_ids: &[u32]) -> RpcResult<HashMap<u32, Table>> {
248        self.meta_client.get_tables(table_ids, true).await
249    }
250}
251
252#[async_trait::async_trait]
253impl StateTableAccessor for FakeRemoteTableAccessor {
254    async fn get_tables(&self, _table_ids: &[u32]) -> RpcResult<HashMap<u32, Table>> {
255        Err(RpcError::Internal(anyhow::anyhow!(
256            "fake accessor does not support fetch remote table"
257        )))
258    }
259}
260
261/// `CompactionCatalogManager` is a manager to manage all `Table` which used in compaction
262pub struct CompactionCatalogManager {
263    // `table_id_to_catalog` is a map to store all `Table` which used in compaction
264    table_id_to_catalog: RwLock<HashMap<StateTableId, Table>>,
265    // `table_accessor` is a accessor to fetch `Table` from meta when the table not found
266    table_accessor: Box<dyn StateTableAccessor>,
267}
268
269impl Default for CompactionCatalogManager {
270    fn default() -> Self {
271        Self::new(Box::<FakeRemoteTableAccessor>::default())
272    }
273}
274
275impl CompactionCatalogManager {
276    pub fn new(table_accessor: Box<dyn StateTableAccessor>) -> Self {
277        Self {
278            table_id_to_catalog: Default::default(),
279            table_accessor,
280        }
281    }
282}
283
284impl CompactionCatalogManager {
285    /// `update` is used to update `Table` in `table_id_to_catalog` from notification
286    pub fn update(&self, table_id: u32, catalog: Table) {
287        self.table_id_to_catalog.write().insert(table_id, catalog);
288    }
289
290    /// `sync` is used to sync all `Table` in `table_id_to_catalog` from notification whole snapshot
291    pub fn sync(&self, catalog_map: HashMap<u32, Table>) {
292        let mut guard = self.table_id_to_catalog.write();
293        guard.clear();
294        guard.extend(catalog_map);
295    }
296
297    /// `remove` is used to remove `Table` in `table_id_to_catalog` by `table_id`
298    pub fn remove(&self, table_id: u32) {
299        self.table_id_to_catalog.write().remove(&table_id);
300    }
301
302    /// `acquire` is used to acquire `CompactionCatalogAgent` by `table_ids`
303    /// if the table not found in `table_id_to_catalog`, it will fetch from meta
304    pub async fn acquire(
305        &self,
306        mut table_ids: Vec<StateTableId>,
307    ) -> HummockResult<CompactionCatalogAgentRef> {
308        if table_ids.is_empty() {
309            // table_id_set is empty
310            // the table in sst has been deleted
311
312            // use full key as default
313            return Err(HummockError::other("table_id_set is empty"));
314        }
315
316        let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
317        let mut table_id_to_vnode = HashMap::new();
318        let mut table_id_to_watermark_serde = HashMap::new();
319
320        {
321            let guard = self.table_id_to_catalog.read();
322            table_ids.retain(|table_id| match guard.get(table_id) {
323                Some(table_catalog) => {
324                    // filter-key-extractor
325                    multi_filter_key_extractor
326                        .register(*table_id, FilterKeyExtractorImpl::from_table(table_catalog));
327
328                    // vnode
329                    table_id_to_vnode.insert(*table_id, table_catalog.vnode_count());
330
331                    // watermark
332                    table_id_to_watermark_serde
333                        .insert(*table_id, build_watermark_col_serde(table_catalog));
334
335                    false
336                }
337
338                None => true,
339            });
340        }
341
342        if !table_ids.is_empty() {
343            let mut state_tables =
344                self.table_accessor
345                    .get_tables(&table_ids)
346                    .await
347                    .map_err(|e| {
348                        HummockError::other(format!(
349                            "request rpc list_tables for meta failed: {}",
350                            e.as_report()
351                        ))
352                    })?;
353
354            let mut guard = self.table_id_to_catalog.write();
355            for table_id in table_ids {
356                if let Some(table) = state_tables.remove(&table_id) {
357                    let table_id = table.id;
358                    let key_extractor = FilterKeyExtractorImpl::from_table(&table);
359                    let vnode = table.vnode_count();
360                    let watermark_serde = build_watermark_col_serde(&table);
361                    guard.insert(table_id, table);
362                    // filter-key-extractor
363                    multi_filter_key_extractor.register(table_id, key_extractor);
364
365                    // vnode
366                    table_id_to_vnode.insert(table_id, vnode);
367
368                    // watermark
369                    table_id_to_watermark_serde.insert(table_id, watermark_serde);
370                }
371            }
372        }
373
374        Ok(Arc::new(CompactionCatalogAgent::new(
375            FilterKeyExtractorImpl::Multi(multi_filter_key_extractor),
376            table_id_to_vnode,
377            table_id_to_watermark_serde,
378        )))
379    }
380
381    /// `build_compaction_catalog_agent` is used to build `CompactionCatalogAgent` by `table_catalogs`
382    pub fn build_compaction_catalog_agent(
383        table_catalogs: HashMap<StateTableId, Table>,
384    ) -> CompactionCatalogAgentRef {
385        let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
386        let mut table_id_to_vnode = HashMap::new();
387        let mut table_id_to_watermark_serde = HashMap::new();
388        for (table_id, table_catalog) in table_catalogs {
389            // filter-key-extractor
390            multi_filter_key_extractor
391                .register(table_id, FilterKeyExtractorImpl::from_table(&table_catalog));
392
393            // vnode
394            table_id_to_vnode.insert(table_id, table_catalog.vnode_count());
395
396            // watermark
397            table_id_to_watermark_serde.insert(table_id, build_watermark_col_serde(&table_catalog));
398        }
399
400        Arc::new(CompactionCatalogAgent::new(
401            FilterKeyExtractorImpl::Multi(multi_filter_key_extractor),
402            table_id_to_vnode,
403            table_id_to_watermark_serde,
404        ))
405    }
406}
407
408/// `CompactionCatalogAgent` is a wrapper of `filter_key_extractor_manager` and `table_id_to_vnode`
409/// The `CompactionCatalogAgent` belongs to a compaction task call, which we will build from the `table_ids` contained in a compact task and use it during the compaction.
410/// The `CompactionCatalogAgent` can act as a agent for the `CompactionCatalogManager`, providing `extract` and `vnode_count` capabilities.
411pub struct CompactionCatalogAgent {
412    filter_key_extractor_manager: FilterKeyExtractorImpl,
413    table_id_to_vnode: HashMap<StateTableId, usize>,
414    // table_id ->(pk_prefix_serde, clean_watermark_col_serde, watermark_col_idx)
415    // cache for reduce serde build
416    table_id_to_watermark_serde:
417        HashMap<StateTableId, Option<(OrderedRowSerde, OrderedRowSerde, usize)>>,
418}
419
420impl CompactionCatalogAgent {
421    pub fn new(
422        filter_key_extractor_manager: FilterKeyExtractorImpl,
423        table_id_to_vnode: HashMap<StateTableId, usize>,
424        table_id_to_watermark_serde: HashMap<
425            StateTableId,
426            Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
427        >,
428    ) -> Self {
429        Self {
430            filter_key_extractor_manager,
431            table_id_to_vnode,
432            table_id_to_watermark_serde,
433        }
434    }
435
436    pub fn dummy() -> Self {
437        Self {
438            filter_key_extractor_manager: FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
439            table_id_to_vnode: Default::default(),
440            table_id_to_watermark_serde: Default::default(),
441        }
442    }
443
444    pub fn for_test(table_ids: Vec<StateTableId>) -> Arc<Self> {
445        let full_key_filter_key_extractor =
446            FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
447
448        let table_id_to_vnode: HashMap<u32, usize> = table_ids
449            .into_iter()
450            .map(|table_id| (table_id, VirtualNode::COUNT_FOR_TEST))
451            .collect();
452
453        let table_id_to_watermark_serde = table_id_to_vnode
454            .keys()
455            .map(|table_id| (*table_id, None))
456            .collect();
457
458        Arc::new(CompactionCatalogAgent::new(
459            full_key_filter_key_extractor,
460            table_id_to_vnode,
461            table_id_to_watermark_serde,
462        ))
463    }
464}
465
466impl CompactionCatalogAgent {
467    pub fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8] {
468        self.filter_key_extractor_manager.extract(full_key)
469    }
470
471    pub fn vnode_count(&self, table_id: StateTableId) -> usize {
472        *self.table_id_to_vnode.get(&table_id).unwrap_or_else(|| {
473            panic!(
474                "table_id not found {} all_table_ids {:?}",
475                table_id,
476                self.table_id_to_vnode.keys()
477            )
478        })
479    }
480
481    pub fn watermark_serde(
482        &self,
483        table_id: StateTableId,
484    ) -> Option<(OrderedRowSerde, OrderedRowSerde, usize)> {
485        self.table_id_to_watermark_serde
486            .get(&table_id)
487            .unwrap_or_else(|| {
488                panic!(
489                    "table_id not found {} all_table_ids {:?}",
490                    table_id,
491                    self.table_id_to_watermark_serde.keys()
492                )
493            })
494            .clone()
495    }
496
497    pub fn table_id_to_vnode_ref(&self) -> &HashMap<StateTableId, usize> {
498        &self.table_id_to_vnode
499    }
500
501    pub fn table_ids(&self) -> impl Iterator<Item = StateTableId> + '_ {
502        self.table_id_to_vnode.keys().cloned()
503    }
504}
505
506pub type CompactionCatalogManagerRef = Arc<CompactionCatalogManager>;
507pub type CompactionCatalogAgentRef = Arc<CompactionCatalogAgent>;
508
509fn build_watermark_col_serde(
510    table_catalog: &Table,
511) -> Option<(OrderedRowSerde, OrderedRowSerde, usize)> {
512    match table_catalog.clean_watermark_index_in_pk {
513        None => {
514            // non watermark table or watermark column is the first column (pk_prefix_watermark)
515            None
516        }
517
518        Some(clean_watermark_index_in_pk) => {
519            use risingwave_common::types::DataType;
520            let table_columns: Vec<ColumnDesc> = table_catalog
521                .columns
522                .iter()
523                .map(|col| col.column_desc.as_ref().unwrap().into())
524                .collect();
525
526            let pk_data_types: Vec<DataType> = table_catalog
527                .pk
528                .iter()
529                .map(|col_order| {
530                    table_columns[col_order.column_index as usize]
531                        .data_type
532                        .clone()
533                })
534                .collect();
535
536            let pk_order_types = table_catalog
537                .pk
538                .iter()
539                .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
540                .collect_vec();
541
542            assert_eq!(pk_data_types.len(), pk_order_types.len());
543            let pk_serde = OrderedRowSerde::new(pk_data_types, pk_order_types);
544            let watermark_col_serde = pk_serde
545                .index(clean_watermark_index_in_pk as usize)
546                .into_owned();
547            Some((
548                pk_serde,
549                watermark_col_serde,
550                clean_watermark_index_in_pk as usize,
551            ))
552        }
553    }
554}
555
556#[cfg(test)]
557mod tests {
558    use std::mem;
559
560    use bytes::{BufMut, BytesMut};
561    use itertools::Itertools;
562    use risingwave_common::catalog::ColumnDesc;
563    use risingwave_common::hash::VirtualNode;
564    use risingwave_common::row::OwnedRow;
565    use risingwave_common::types::DataType;
566    use risingwave_common::types::ScalarImpl::{self};
567    use risingwave_common::util::row_serde::OrderedRowSerde;
568    use risingwave_common::util::sort_util::OrderType;
569    use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN;
570    use risingwave_pb::catalog::table::{PbEngine, TableType};
571    use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable};
572    use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType};
573    use risingwave_pb::plan_common::PbColumnCatalog;
574
575    use super::{DummyFilterKeyExtractor, FilterKeyExtractor, SchemaFilterKeyExtractor};
576    use crate::compaction_catalog_manager::{
577        FilterKeyExtractorImpl, FullKeyFilterKeyExtractor, MultiFilterKeyExtractor,
578    };
579    const fn dummy_vnode() -> [u8; VirtualNode::SIZE] {
580        VirtualNode::from_index(233).to_be_bytes()
581    }
582
583    #[test]
584    fn test_default_filter_key_extractor() {
585        let dummy_filter_key_extractor = DummyFilterKeyExtractor;
586        let full_key = "full_key".as_bytes();
587        let output_key = dummy_filter_key_extractor.extract(full_key);
588
589        assert_eq!("".as_bytes(), output_key);
590
591        let full_key_filter_key_extractor = FullKeyFilterKeyExtractor;
592        let output_key = full_key_filter_key_extractor.extract(full_key);
593
594        assert_eq!(full_key, output_key);
595    }
596
597    fn build_table_with_prefix_column_num(column_count: u32) -> PbTable {
598        PbTable {
599            id: 0,
600            schema_id: 0,
601            database_id: 0,
602            name: "test".to_owned(),
603            table_type: TableType::Table as i32,
604            columns: vec![
605                PbColumnCatalog {
606                    column_desc: Some(
607                        (&ColumnDesc::named("_row_id", 0.into(), DataType::Int64)).into(),
608                    ),
609                    is_hidden: true,
610                },
611                PbColumnCatalog {
612                    column_desc: Some(
613                        (&ColumnDesc::named("col_1", 0.into(), DataType::Int64)).into(),
614                    ),
615                    is_hidden: false,
616                },
617                PbColumnCatalog {
618                    column_desc: Some(
619                        (&ColumnDesc::named("col_2", 0.into(), DataType::Float64)).into(),
620                    ),
621                    is_hidden: false,
622                },
623                PbColumnCatalog {
624                    column_desc: Some(
625                        (&ColumnDesc::named("col_3", 0.into(), DataType::Varchar)).into(),
626                    ),
627                    is_hidden: false,
628                },
629            ],
630            pk: vec![
631                PbColumnOrder {
632                    column_index: 1,
633                    order_type: Some(PbOrderType {
634                        direction: PbDirection::Ascending as _,
635                        nulls_are: PbNullsAre::Largest as _,
636                    }),
637                },
638                PbColumnOrder {
639                    column_index: 3,
640                    order_type: Some(PbOrderType {
641                        direction: PbDirection::Ascending as _,
642                        nulls_are: PbNullsAre::Largest as _,
643                    }),
644                },
645            ],
646            stream_key: vec![0],
647            dependent_relations: vec![],
648            distribution_key: (0..column_count as i32).collect_vec(),
649            optional_associated_source_id: None,
650            append_only: false,
651            owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
652            retention_seconds: Some(300),
653            fragment_id: 0,
654            dml_fragment_id: None,
655            initialized_at_epoch: None,
656            vnode_col_index: None,
657            row_id_index: Some(0),
658            value_indices: vec![0],
659            definition: "".into(),
660            handle_pk_conflict_behavior: 0,
661            version_column_index: None,
662            read_prefix_len_hint: 1,
663            version: None,
664            watermark_indices: vec![],
665            dist_key_in_pk: vec![],
666            cardinality: None,
667            created_at_epoch: None,
668            cleaned_by_watermark: false,
669            stream_job_status: PbStreamJobStatus::Created.into(),
670            create_type: PbCreateType::Foreground.into(),
671            description: None,
672            incoming_sinks: vec![],
673            initialized_at_cluster_version: None,
674            created_at_cluster_version: None,
675            cdc_table_id: None,
676            maybe_vnode_count: None,
677            webhook_info: None,
678            job_id: None,
679            engine: Some(PbEngine::Hummock as i32),
680            clean_watermark_index_in_pk: None,
681        }
682    }
683
684    #[test]
685    fn test_schema_filter_key_extractor() {
686        let prost_table = build_table_with_prefix_column_num(1);
687        let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
688
689        let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
690        let schema = vec![DataType::Int64, DataType::Varchar];
691        let serializer = OrderedRowSerde::new(schema, order_types);
692        let row = OwnedRow::new(vec![
693            Some(ScalarImpl::Int64(100)),
694            Some(ScalarImpl::Utf8("abc".into())),
695        ]);
696        let mut row_bytes = vec![];
697        serializer.serialize(&row, &mut row_bytes);
698
699        let table_prefix = {
700            let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
701            buf.put_u32(1);
702            buf.to_vec()
703        };
704
705        let vnode_prefix = &dummy_vnode()[..];
706
707        let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
708        let output_key = schema_filter_key_extractor.extract(&full_key);
709        assert_eq!(1 + mem::size_of::<i64>(), output_key.len());
710    }
711
712    #[test]
713    fn test_multi_filter_key_extractor() {
714        let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
715        {
716            // test table_id 1
717            let prost_table = build_table_with_prefix_column_num(1);
718            let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
719            multi_filter_key_extractor.register(
720                1,
721                FilterKeyExtractorImpl::Schema(schema_filter_key_extractor),
722            );
723            let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
724            let schema = vec![DataType::Int64, DataType::Varchar];
725            let serializer = OrderedRowSerde::new(schema, order_types);
726            let row = OwnedRow::new(vec![
727                Some(ScalarImpl::Int64(100)),
728                Some(ScalarImpl::Utf8("abc".into())),
729            ]);
730            let mut row_bytes = vec![];
731            serializer.serialize(&row, &mut row_bytes);
732
733            let table_prefix = {
734                let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
735                buf.put_u32(1);
736                buf.to_vec()
737            };
738
739            let vnode_prefix = &dummy_vnode()[..];
740
741            let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
742            let output_key = multi_filter_key_extractor.extract(&full_key);
743
744            let data_types = vec![DataType::Int64];
745            let order_types = vec![OrderType::ascending()];
746            let deserializer = OrderedRowSerde::new(data_types, order_types);
747
748            let pk_prefix_len = deserializer.deserialize_prefix_len(&row_bytes, 1).unwrap();
749            assert_eq!(pk_prefix_len, output_key.len());
750        }
751
752        {
753            // test table_id 1
754            let prost_table = build_table_with_prefix_column_num(2);
755            let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
756            multi_filter_key_extractor.register(
757                2,
758                FilterKeyExtractorImpl::Schema(schema_filter_key_extractor),
759            );
760            let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
761            let schema = vec![DataType::Int64, DataType::Varchar];
762            let serializer = OrderedRowSerde::new(schema, order_types);
763            let row = OwnedRow::new(vec![
764                Some(ScalarImpl::Int64(100)),
765                Some(ScalarImpl::Utf8("abc".into())),
766            ]);
767            let mut row_bytes = vec![];
768            serializer.serialize(&row, &mut row_bytes);
769
770            let table_prefix = {
771                let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
772                buf.put_u32(2);
773                buf.to_vec()
774            };
775
776            let vnode_prefix = &dummy_vnode()[..];
777
778            let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
779            let output_key = multi_filter_key_extractor.extract(&full_key);
780
781            let data_types = vec![DataType::Int64, DataType::Varchar];
782            let order_types = vec![OrderType::ascending(), OrderType::ascending()];
783            let deserializer = OrderedRowSerde::new(data_types, order_types);
784
785            let pk_prefix_len = deserializer.deserialize_prefix_len(&row_bytes, 1).unwrap();
786
787            assert_eq!(pk_prefix_len, output_key.len());
788        }
789    }
790
791    #[tokio::test]
792    async fn test_compaction_catalog_manager_exception() {
793        let compaction_catalog_manager = super::CompactionCatalogManager::default();
794
795        {
796            let ret = compaction_catalog_manager.acquire(vec![]).await;
797            assert!(ret.is_err());
798            if let Err(e) = ret {
799                assert_eq!(e.to_string(), "Other error: table_id_set is empty");
800            }
801        }
802
803        {
804            // network error with FakeRemoteTableAccessor
805            let ret = compaction_catalog_manager.acquire(vec![1]).await;
806            assert!(ret.is_err());
807            if let Err(e) = ret {
808                assert_eq!(
809                    e.to_string(),
810                    "Other error: request rpc list_tables for meta failed: fake accessor does not support fetch remote table"
811                );
812            }
813        }
814    }
815}