risingwave_storage/
compaction_catalog_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use itertools::Itertools;
20use parking_lot::RwLock;
21use risingwave_common::catalog::{ColumnDesc, TableId};
22use risingwave_common::hash::{VirtualNode, VnodeCountCompat};
23use risingwave_common::util::row_serde::OrderedRowSerde;
24use risingwave_common::util::sort_util::OrderType;
25use risingwave_hummock_sdk::compaction_group::StateTableId;
26use risingwave_hummock_sdk::key::{TABLE_PREFIX_LEN, get_table_id};
27use risingwave_pb::catalog::Table;
28use risingwave_rpc_client::MetaClient;
29use risingwave_rpc_client::error::{Result as RpcResult, RpcError};
30use thiserror_ext::AsReport;
31
32use crate::hummock::{HummockError, HummockResult};
33
34/// `FilterKeyExtractor` generally used to extract key which will store in BloomFilter
35pub trait FilterKeyExtractor: Send + Sync {
36    fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8];
37}
38
39pub enum FilterKeyExtractorImpl {
40    Schema(SchemaFilterKeyExtractor),
41    FullKey(FullKeyFilterKeyExtractor),
42    Dummy(DummyFilterKeyExtractor),
43    Multi(MultiFilterKeyExtractor),
44    FixedLength(FixedLengthFilterKeyExtractor),
45}
46
47impl FilterKeyExtractorImpl {
48    pub fn from_table(table_catalog: &Table) -> Self {
49        let read_prefix_len = table_catalog.get_read_prefix_len_hint() as usize;
50
51        if read_prefix_len == 0 || read_prefix_len > table_catalog.get_pk().len() {
52            // for now frontend had not infer the table_id_to_filter_key_extractor, so we
53            // use FullKeyFilterKeyExtractor
54            FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)
55        } else {
56            FilterKeyExtractorImpl::Schema(SchemaFilterKeyExtractor::new(table_catalog))
57        }
58    }
59}
60
61macro_rules! impl_filter_key_extractor {
62    ($( { $variant_name:ident } ),*) => {
63        impl FilterKeyExtractorImpl {
64            pub fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8]{
65                match self {
66                    $( Self::$variant_name(inner) => inner.extract(full_key), )*
67                }
68            }
69        }
70    }
71
72}
73
74macro_rules! for_all_filter_key_extractor_variants {
75    ($macro:ident) => {
76        $macro! {
77            { Schema },
78            { FullKey },
79            { Dummy },
80            { Multi },
81            { FixedLength }
82        }
83    };
84}
85
86for_all_filter_key_extractor_variants! { impl_filter_key_extractor }
87
88#[derive(Default)]
89pub struct FullKeyFilterKeyExtractor;
90
91impl FilterKeyExtractor for FullKeyFilterKeyExtractor {
92    fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
93        user_key
94    }
95}
96
97#[derive(Default)]
98pub struct DummyFilterKeyExtractor;
99impl FilterKeyExtractor for DummyFilterKeyExtractor {
100    fn extract<'a>(&self, _full_key: &'a [u8]) -> &'a [u8] {
101        &[]
102    }
103}
104
105/// [`SchemaFilterKeyExtractor`] build from `table_catalog` and extract a `full_key` to prefix for
106#[derive(Default)]
107pub struct FixedLengthFilterKeyExtractor {
108    fixed_length: usize,
109}
110
111impl FilterKeyExtractor for FixedLengthFilterKeyExtractor {
112    fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8] {
113        &full_key[0..self.fixed_length]
114    }
115}
116
117impl FixedLengthFilterKeyExtractor {
118    pub fn new(fixed_length: usize) -> Self {
119        Self { fixed_length }
120    }
121}
122
123/// [`SchemaFilterKeyExtractor`] build from `table_catalog` and transform a `full_key` to prefix for
124/// `prefix_bloom_filter`
125pub struct SchemaFilterKeyExtractor {
126    /// Each stateful operator has its own read pattern, partly using prefix scan.
127    /// Prefix key length can be decoded through its `DataType` and `OrderType` which obtained from
128    /// `TableCatalog`. `read_pattern_prefix_column` means the count of column to decode prefix
129    /// from storage key.
130    read_prefix_len: usize,
131    deserializer: OrderedRowSerde,
132    // TODO:need some bench test for same prefix case like join (if we need a prefix_cache for same
133    // prefix_key)
134}
135
136impl FilterKeyExtractor for SchemaFilterKeyExtractor {
137    fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8] {
138        if full_key.len() < TABLE_PREFIX_LEN + VirtualNode::SIZE {
139            return &[];
140        }
141
142        let (_table_prefix, key) = full_key.split_at(TABLE_PREFIX_LEN);
143        let (_vnode_prefix, pk) = key.split_at(VirtualNode::SIZE);
144
145        // if the key with table_id deserializer fail from schema, that should panic here for early
146        // detection.
147
148        let bloom_filter_key_len = self
149            .deserializer
150            .deserialize_prefix_len(pk, self.read_prefix_len)
151            .unwrap();
152
153        let end_position = TABLE_PREFIX_LEN + VirtualNode::SIZE + bloom_filter_key_len;
154        &full_key[TABLE_PREFIX_LEN + VirtualNode::SIZE..end_position]
155    }
156}
157
158impl SchemaFilterKeyExtractor {
159    pub fn new(table_catalog: &Table) -> Self {
160        let pk_indices: Vec<usize> = table_catalog
161            .pk
162            .iter()
163            .map(|col_order| col_order.column_index as usize)
164            .collect();
165
166        let read_prefix_len = table_catalog.get_read_prefix_len_hint() as usize;
167
168        let data_types = pk_indices
169            .iter()
170            .map(|column_idx| &table_catalog.columns[*column_idx])
171            .map(|col| ColumnDesc::from(col.column_desc.as_ref().unwrap()).data_type)
172            .collect();
173
174        let order_types: Vec<OrderType> = table_catalog
175            .pk
176            .iter()
177            .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
178            .collect();
179
180        Self {
181            read_prefix_len,
182            deserializer: OrderedRowSerde::new(data_types, order_types),
183        }
184    }
185}
186
187#[derive(Default)]
188pub struct MultiFilterKeyExtractor {
189    id_to_filter_key_extractor: HashMap<TableId, FilterKeyExtractorImpl>,
190}
191
192impl MultiFilterKeyExtractor {
193    pub fn register(&mut self, table_id: TableId, filter_key_extractor: FilterKeyExtractorImpl) {
194        self.id_to_filter_key_extractor
195            .insert(table_id, filter_key_extractor);
196    }
197
198    pub fn size(&self) -> usize {
199        self.id_to_filter_key_extractor.len()
200    }
201
202    pub fn get_existing_table_ids(&self) -> HashSet<TableId> {
203        self.id_to_filter_key_extractor.keys().cloned().collect()
204    }
205}
206
207impl Debug for MultiFilterKeyExtractor {
208    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209        write!(f, "MultiFilterKeyExtractor size {} ", self.size())
210    }
211}
212
213impl FilterKeyExtractor for MultiFilterKeyExtractor {
214    fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8] {
215        if full_key.len() < TABLE_PREFIX_LEN + VirtualNode::SIZE {
216            return full_key;
217        }
218
219        let table_id = get_table_id(full_key).into();
220        self.id_to_filter_key_extractor
221            .get(&table_id)
222            .unwrap()
223            .extract(full_key)
224    }
225}
226
227#[async_trait::async_trait]
228pub trait StateTableAccessor: Send + Sync {
229    async fn get_tables(&self, table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>>;
230}
231
232#[derive(Default)]
233pub struct FakeRemoteTableAccessor {}
234
235pub struct RemoteTableAccessor {
236    meta_client: MetaClient,
237}
238
239impl RemoteTableAccessor {
240    pub fn new(meta_client: MetaClient) -> Self {
241        Self { meta_client }
242    }
243}
244
245#[async_trait::async_trait]
246impl StateTableAccessor for RemoteTableAccessor {
247    async fn get_tables(&self, table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>> {
248        self.meta_client.get_tables(table_ids.to_vec(), true).await
249    }
250}
251
252#[async_trait::async_trait]
253impl StateTableAccessor for FakeRemoteTableAccessor {
254    async fn get_tables(&self, _table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>> {
255        Err(RpcError::Internal(anyhow::anyhow!(
256            "fake accessor does not support fetch remote table"
257        )))
258    }
259}
260
261/// `CompactionCatalogManager` is a manager to manage all `Table` which used in compaction
262pub struct CompactionCatalogManager {
263    // `table_id_to_catalog` is a map to store all `Table` which used in compaction
264    table_id_to_catalog: RwLock<HashMap<StateTableId, Table>>,
265    // `table_accessor` is a accessor to fetch `Table` from meta when the table not found
266    table_accessor: Box<dyn StateTableAccessor>,
267}
268
269impl Default for CompactionCatalogManager {
270    fn default() -> Self {
271        Self::new(Box::<FakeRemoteTableAccessor>::default())
272    }
273}
274
275impl CompactionCatalogManager {
276    pub fn new(table_accessor: Box<dyn StateTableAccessor>) -> Self {
277        Self {
278            table_id_to_catalog: Default::default(),
279            table_accessor,
280        }
281    }
282}
283
284impl CompactionCatalogManager {
285    /// `update` is used to update `Table` in `table_id_to_catalog` from notification
286    pub fn update(&self, table_id: TableId, catalog: Table) {
287        self.table_id_to_catalog.write().insert(table_id, catalog);
288    }
289
290    /// `sync` is used to sync all `Table` in `table_id_to_catalog` from notification whole snapshot
291    pub fn sync(&self, catalog_map: HashMap<TableId, Table>) {
292        let mut guard = self.table_id_to_catalog.write();
293        guard.clear();
294        guard.extend(catalog_map);
295    }
296
297    /// `remove` is used to remove `Table` in `table_id_to_catalog` by `table_id`
298    pub fn remove(&self, table_id: TableId) {
299        self.table_id_to_catalog.write().remove(&table_id);
300    }
301
302    /// `acquire` is used to acquire `CompactionCatalogAgent` by `table_ids`
303    /// if the table not found in `table_id_to_catalog`, it will fetch from meta
304    pub async fn acquire(
305        &self,
306        mut table_ids: Vec<StateTableId>,
307    ) -> HummockResult<CompactionCatalogAgentRef> {
308        if table_ids.is_empty() {
309            // table_id_set is empty
310            // the table in sst has been deleted
311
312            // use full key as default
313            return Err(HummockError::other("table_id_set is empty"));
314        }
315
316        let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
317        let mut table_id_to_vnode = HashMap::new();
318        let mut table_id_to_watermark_serde = HashMap::new();
319
320        {
321            let guard = self.table_id_to_catalog.read();
322            table_ids.retain(|table_id| match guard.get(table_id) {
323                Some(table_catalog) => {
324                    // filter-key-extractor
325                    multi_filter_key_extractor
326                        .register(*table_id, FilterKeyExtractorImpl::from_table(table_catalog));
327
328                    // vnode
329                    table_id_to_vnode.insert(*table_id, table_catalog.vnode_count());
330
331                    // watermark
332                    table_id_to_watermark_serde
333                        .insert(*table_id, build_watermark_col_serde(table_catalog));
334
335                    false
336                }
337
338                None => true,
339            });
340        }
341
342        if !table_ids.is_empty() {
343            let mut state_tables =
344                self.table_accessor
345                    .get_tables(&table_ids)
346                    .await
347                    .map_err(|e| {
348                        HummockError::other(format!(
349                            "request rpc list_tables for meta failed: {}",
350                            e.as_report()
351                        ))
352                    })?;
353
354            let mut guard = self.table_id_to_catalog.write();
355            for table_id in table_ids {
356                if let Some(table) = state_tables.remove(&table_id) {
357                    let table_id = table.id;
358                    let key_extractor = FilterKeyExtractorImpl::from_table(&table);
359                    let vnode = table.vnode_count();
360                    let watermark_serde = build_watermark_col_serde(&table);
361                    guard.insert(table_id, table);
362                    // filter-key-extractor
363                    multi_filter_key_extractor.register(table_id, key_extractor);
364
365                    // vnode
366                    table_id_to_vnode.insert(table_id, vnode);
367
368                    // watermark
369                    table_id_to_watermark_serde.insert(table_id, watermark_serde);
370                }
371            }
372        }
373
374        Ok(Arc::new(CompactionCatalogAgent::new(
375            FilterKeyExtractorImpl::Multi(multi_filter_key_extractor),
376            table_id_to_vnode,
377            table_id_to_watermark_serde,
378        )))
379    }
380
381    /// `build_compaction_catalog_agent` is used to build `CompactionCatalogAgent` by `table_catalogs`
382    pub fn build_compaction_catalog_agent(
383        table_catalogs: HashMap<StateTableId, Table>,
384    ) -> CompactionCatalogAgentRef {
385        let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
386        let mut table_id_to_vnode = HashMap::new();
387        let mut table_id_to_watermark_serde = HashMap::new();
388        for (table_id, table_catalog) in table_catalogs {
389            // filter-key-extractor
390            multi_filter_key_extractor
391                .register(table_id, FilterKeyExtractorImpl::from_table(&table_catalog));
392
393            // vnode
394            table_id_to_vnode.insert(table_id, table_catalog.vnode_count());
395
396            // watermark
397            table_id_to_watermark_serde.insert(table_id, build_watermark_col_serde(&table_catalog));
398        }
399
400        Arc::new(CompactionCatalogAgent::new(
401            FilterKeyExtractorImpl::Multi(multi_filter_key_extractor),
402            table_id_to_vnode,
403            table_id_to_watermark_serde,
404        ))
405    }
406}
407
408/// `CompactionCatalogAgent` is a wrapper of `filter_key_extractor_manager` and `table_id_to_vnode`
409/// The `CompactionCatalogAgent` belongs to a compaction task call, which we will build from the `table_ids` contained in a compact task and use it during the compaction.
410/// The `CompactionCatalogAgent` can act as a agent for the `CompactionCatalogManager`, providing `extract` and `vnode_count` capabilities.
411pub struct CompactionCatalogAgent {
412    filter_key_extractor_manager: FilterKeyExtractorImpl,
413    table_id_to_vnode: HashMap<StateTableId, usize>,
414    // table_id ->(pk_prefix_serde, clean_watermark_col_serde, watermark_col_idx)
415    // cache for reduce serde build
416    table_id_to_watermark_serde:
417        HashMap<StateTableId, Option<(OrderedRowSerde, OrderedRowSerde, usize)>>,
418}
419
420impl CompactionCatalogAgent {
421    pub fn new(
422        filter_key_extractor_manager: FilterKeyExtractorImpl,
423        table_id_to_vnode: HashMap<StateTableId, usize>,
424        table_id_to_watermark_serde: HashMap<
425            StateTableId,
426            Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
427        >,
428    ) -> Self {
429        Self {
430            filter_key_extractor_manager,
431            table_id_to_vnode,
432            table_id_to_watermark_serde,
433        }
434    }
435
436    pub fn dummy() -> Self {
437        Self {
438            filter_key_extractor_manager: FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
439            table_id_to_vnode: Default::default(),
440            table_id_to_watermark_serde: Default::default(),
441        }
442    }
443
444    pub fn for_test(table_ids: Vec<impl Into<StateTableId>>) -> Arc<Self> {
445        let full_key_filter_key_extractor =
446            FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
447
448        let table_id_to_vnode: HashMap<TableId, usize> = table_ids
449            .into_iter()
450            .map(|table_id| (table_id.into(), VirtualNode::COUNT_FOR_TEST))
451            .collect();
452
453        let table_id_to_watermark_serde = table_id_to_vnode
454            .keys()
455            .map(|table_id| (*table_id, None))
456            .collect();
457
458        Arc::new(CompactionCatalogAgent::new(
459            full_key_filter_key_extractor,
460            table_id_to_vnode,
461            table_id_to_watermark_serde,
462        ))
463    }
464}
465
466impl CompactionCatalogAgent {
467    pub fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8] {
468        self.filter_key_extractor_manager.extract(full_key)
469    }
470
471    pub fn vnode_count(&self, table_id: StateTableId) -> usize {
472        *self.table_id_to_vnode.get(&table_id).unwrap_or_else(|| {
473            panic!(
474                "table_id not found {} all_table_ids {:?}",
475                table_id,
476                self.table_id_to_vnode.keys()
477            )
478        })
479    }
480
481    pub fn watermark_serde(
482        &self,
483        table_id: StateTableId,
484    ) -> Option<(OrderedRowSerde, OrderedRowSerde, usize)> {
485        self.table_id_to_watermark_serde
486            .get(&table_id)
487            .unwrap_or_else(|| {
488                panic!(
489                    "table_id not found {} all_table_ids {:?}",
490                    table_id,
491                    self.table_id_to_watermark_serde.keys()
492                )
493            })
494            .clone()
495    }
496
497    pub fn table_id_to_vnode_ref(&self) -> &HashMap<StateTableId, usize> {
498        &self.table_id_to_vnode
499    }
500
501    pub fn table_ids(&self) -> impl Iterator<Item = StateTableId> + '_ {
502        self.table_id_to_vnode.keys().cloned()
503    }
504}
505
506pub type CompactionCatalogManagerRef = Arc<CompactionCatalogManager>;
507pub type CompactionCatalogAgentRef = Arc<CompactionCatalogAgent>;
508
509fn build_watermark_col_serde(
510    table_catalog: &Table,
511) -> Option<(OrderedRowSerde, OrderedRowSerde, usize)> {
512    // Get clean watermark PK index using the helper method
513    let clean_watermark_index_in_pk = table_catalog.get_clean_watermark_index_in_pk_compat();
514
515    match clean_watermark_index_in_pk {
516        None => {
517            // non watermark table or watermark column is the first column (pk_prefix_watermark)
518            None
519        }
520
521        Some(clean_watermark_index_in_pk) => {
522            use risingwave_common::types::DataType;
523            let table_columns: Vec<ColumnDesc> = table_catalog
524                .columns
525                .iter()
526                .map(|col| col.column_desc.as_ref().unwrap().into())
527                .collect();
528
529            let pk_data_types: Vec<DataType> = table_catalog
530                .pk
531                .iter()
532                .map(|col_order| {
533                    table_columns[col_order.column_index as usize]
534                        .data_type
535                        .clone()
536                })
537                .collect();
538
539            let pk_order_types = table_catalog
540                .pk
541                .iter()
542                .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
543                .collect_vec();
544
545            assert_eq!(pk_data_types.len(), pk_order_types.len());
546            let pk_serde = OrderedRowSerde::new(pk_data_types, pk_order_types);
547            let watermark_col_serde = pk_serde.index(clean_watermark_index_in_pk).into_owned();
548            Some((pk_serde, watermark_col_serde, clean_watermark_index_in_pk))
549        }
550    }
551}
552
553#[cfg(test)]
554mod tests {
555    use std::mem;
556
557    use bytes::{BufMut, BytesMut};
558    use itertools::Itertools;
559    use risingwave_common::catalog::ColumnDesc;
560    use risingwave_common::hash::VirtualNode;
561    use risingwave_common::row::OwnedRow;
562    use risingwave_common::types::DataType;
563    use risingwave_common::types::ScalarImpl::{self};
564    use risingwave_common::util::row_serde::OrderedRowSerde;
565    use risingwave_common::util::sort_util::OrderType;
566    use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN;
567    use risingwave_pb::catalog::table::{PbEngine, TableType};
568    use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable};
569    use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType};
570    use risingwave_pb::plan_common::PbColumnCatalog;
571
572    use super::{DummyFilterKeyExtractor, FilterKeyExtractor, SchemaFilterKeyExtractor};
573    use crate::compaction_catalog_manager::{
574        FilterKeyExtractorImpl, FullKeyFilterKeyExtractor, MultiFilterKeyExtractor,
575    };
576    const fn dummy_vnode() -> [u8; VirtualNode::SIZE] {
577        VirtualNode::from_index(233).to_be_bytes()
578    }
579
580    #[test]
581    fn test_default_filter_key_extractor() {
582        let dummy_filter_key_extractor = DummyFilterKeyExtractor;
583        let full_key = "full_key".as_bytes();
584        let output_key = dummy_filter_key_extractor.extract(full_key);
585
586        assert_eq!("".as_bytes(), output_key);
587
588        let full_key_filter_key_extractor = FullKeyFilterKeyExtractor;
589        let output_key = full_key_filter_key_extractor.extract(full_key);
590
591        assert_eq!(full_key, output_key);
592    }
593
594    fn build_table_with_prefix_column_num(column_count: u32) -> PbTable {
595        PbTable {
596            id: 0.into(),
597            schema_id: 0.into(),
598            database_id: 0.into(),
599            name: "test".to_owned(),
600            table_type: TableType::Table as i32,
601            columns: vec![
602                PbColumnCatalog {
603                    column_desc: Some(
604                        (&ColumnDesc::named("_row_id", 0.into(), DataType::Int64)).into(),
605                    ),
606                    is_hidden: true,
607                },
608                PbColumnCatalog {
609                    column_desc: Some(
610                        (&ColumnDesc::named("col_1", 0.into(), DataType::Int64)).into(),
611                    ),
612                    is_hidden: false,
613                },
614                PbColumnCatalog {
615                    column_desc: Some(
616                        (&ColumnDesc::named("col_2", 0.into(), DataType::Float64)).into(),
617                    ),
618                    is_hidden: false,
619                },
620                PbColumnCatalog {
621                    column_desc: Some(
622                        (&ColumnDesc::named("col_3", 0.into(), DataType::Varchar)).into(),
623                    ),
624                    is_hidden: false,
625                },
626            ],
627            pk: vec![
628                PbColumnOrder {
629                    column_index: 1,
630                    order_type: Some(PbOrderType {
631                        direction: PbDirection::Ascending as _,
632                        nulls_are: PbNullsAre::Largest as _,
633                    }),
634                },
635                PbColumnOrder {
636                    column_index: 3,
637                    order_type: Some(PbOrderType {
638                        direction: PbDirection::Ascending as _,
639                        nulls_are: PbNullsAre::Largest as _,
640                    }),
641                },
642            ],
643            stream_key: vec![0],
644            distribution_key: (0..column_count as i32).collect_vec(),
645            optional_associated_source_id: None,
646            append_only: false,
647            owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
648            retention_seconds: Some(300),
649            fragment_id: 0.into(),
650            dml_fragment_id: None,
651            initialized_at_epoch: None,
652            vnode_col_index: None,
653            row_id_index: Some(0),
654            value_indices: vec![0],
655            definition: "".into(),
656            handle_pk_conflict_behavior: 0,
657            version_column_indices: vec![],
658            read_prefix_len_hint: 1,
659            version: None,
660            watermark_indices: vec![],
661            dist_key_in_pk: vec![],
662            cardinality: None,
663            created_at_epoch: None,
664            cleaned_by_watermark: false,
665            stream_job_status: PbStreamJobStatus::Created.into(),
666            create_type: PbCreateType::Foreground.into(),
667            description: None,
668            #[expect(deprecated)]
669            incoming_sinks: Default::default(),
670            initialized_at_cluster_version: None,
671            created_at_cluster_version: None,
672            cdc_table_id: None,
673            maybe_vnode_count: None,
674            webhook_info: None,
675            job_id: None,
676            engine: Some(PbEngine::Hummock as i32),
677            #[expect(deprecated)]
678            clean_watermark_index_in_pk: None,
679            clean_watermark_indices: vec![],
680            refreshable: false,
681            vector_index_info: None,
682            cdc_table_type: None,
683        }
684    }
685
686    #[test]
687    fn test_schema_filter_key_extractor() {
688        let prost_table = build_table_with_prefix_column_num(1);
689        let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
690
691        let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
692        let schema = vec![DataType::Int64, DataType::Varchar];
693        let serializer = OrderedRowSerde::new(schema, order_types);
694        let row = OwnedRow::new(vec![
695            Some(ScalarImpl::Int64(100)),
696            Some(ScalarImpl::Utf8("abc".into())),
697        ]);
698        let mut row_bytes = vec![];
699        serializer.serialize(&row, &mut row_bytes);
700
701        let table_prefix = {
702            let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
703            buf.put_u32(1);
704            buf.to_vec()
705        };
706
707        let vnode_prefix = &dummy_vnode()[..];
708
709        let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
710        let output_key = schema_filter_key_extractor.extract(&full_key);
711        assert_eq!(1 + mem::size_of::<i64>(), output_key.len());
712    }
713
714    #[test]
715    fn test_multi_filter_key_extractor() {
716        let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
717        {
718            // test table_id 1
719            let prost_table = build_table_with_prefix_column_num(1);
720            let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
721            multi_filter_key_extractor.register(
722                1.into(),
723                FilterKeyExtractorImpl::Schema(schema_filter_key_extractor),
724            );
725            let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
726            let schema = vec![DataType::Int64, DataType::Varchar];
727            let serializer = OrderedRowSerde::new(schema, order_types);
728            let row = OwnedRow::new(vec![
729                Some(ScalarImpl::Int64(100)),
730                Some(ScalarImpl::Utf8("abc".into())),
731            ]);
732            let mut row_bytes = vec![];
733            serializer.serialize(&row, &mut row_bytes);
734
735            let table_prefix = {
736                let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
737                buf.put_u32(1);
738                buf.to_vec()
739            };
740
741            let vnode_prefix = &dummy_vnode()[..];
742
743            let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
744            let output_key = multi_filter_key_extractor.extract(&full_key);
745
746            let data_types = vec![DataType::Int64];
747            let order_types = vec![OrderType::ascending()];
748            let deserializer = OrderedRowSerde::new(data_types, order_types);
749
750            let pk_prefix_len = deserializer.deserialize_prefix_len(&row_bytes, 1).unwrap();
751            assert_eq!(pk_prefix_len, output_key.len());
752        }
753
754        {
755            // test table_id 1
756            let prost_table = build_table_with_prefix_column_num(2);
757            let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
758            multi_filter_key_extractor.register(
759                2.into(),
760                FilterKeyExtractorImpl::Schema(schema_filter_key_extractor),
761            );
762            let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
763            let schema = vec![DataType::Int64, DataType::Varchar];
764            let serializer = OrderedRowSerde::new(schema, order_types);
765            let row = OwnedRow::new(vec![
766                Some(ScalarImpl::Int64(100)),
767                Some(ScalarImpl::Utf8("abc".into())),
768            ]);
769            let mut row_bytes = vec![];
770            serializer.serialize(&row, &mut row_bytes);
771
772            let table_prefix = {
773                let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
774                buf.put_u32(2);
775                buf.to_vec()
776            };
777
778            let vnode_prefix = &dummy_vnode()[..];
779
780            let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
781            let output_key = multi_filter_key_extractor.extract(&full_key);
782
783            let data_types = vec![DataType::Int64, DataType::Varchar];
784            let order_types = vec![OrderType::ascending(), OrderType::ascending()];
785            let deserializer = OrderedRowSerde::new(data_types, order_types);
786
787            let pk_prefix_len = deserializer.deserialize_prefix_len(&row_bytes, 1).unwrap();
788
789            assert_eq!(pk_prefix_len, output_key.len());
790        }
791    }
792
793    #[tokio::test]
794    async fn test_compaction_catalog_manager_exception() {
795        let compaction_catalog_manager = super::CompactionCatalogManager::default();
796
797        {
798            let ret = compaction_catalog_manager.acquire(vec![]).await;
799            assert!(ret.is_err());
800            if let Err(e) = ret {
801                assert_eq!(e.to_string(), "Other error: table_id_set is empty");
802            }
803        }
804
805        {
806            // network error with FakeRemoteTableAccessor
807            let ret = compaction_catalog_manager.acquire(vec![1.into()]).await;
808            assert!(ret.is_err());
809            if let Err(e) = ret {
810                assert_eq!(
811                    e.to_string(),
812                    "Other error: request rpc list_tables for meta failed: fake accessor does not support fetch remote table"
813                );
814            }
815        }
816    }
817}