risingwave_storage/
compaction_catalog_manager.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::iter;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RwLock;
22use risingwave_common::catalog::{ColumnDesc, TableId};
23use risingwave_common::hash::{VirtualNode, VnodeCountCompat};
24use risingwave_common::util::memcmp_encoding;
25use risingwave_common::util::row_serde::OrderedRowSerde;
26use risingwave_common::util::sort_util::OrderType;
27use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
28use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
29use risingwave_hummock_sdk::compaction_group::StateTableId;
30use risingwave_hummock_sdk::key::{TABLE_PREFIX_LEN, get_table_id};
31use risingwave_pb::catalog::Table;
32use risingwave_rpc_client::MetaClient;
33use risingwave_rpc_client::error::{Result as RpcResult, RpcError};
34use thiserror_ext::AsReport;
35
36use crate::hummock::{HummockError, HummockResult};
37use crate::row_serde::value_serde::ValueRowSerdeNew;
38
39/// `FilterKeyExtractor` generally used to extract key which will store in BloomFilter
40pub trait FilterKeyExtractor: Send + Sync {
41    fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8];
42}
43
44pub enum FilterKeyExtractorImpl {
45    Schema(SchemaFilterKeyExtractor),
46    FullKey(FullKeyFilterKeyExtractor),
47    Dummy(DummyFilterKeyExtractor),
48    Multi(MultiFilterKeyExtractor),
49    FixedLength(FixedLengthFilterKeyExtractor),
50}
51
52impl FilterKeyExtractorImpl {
53    pub fn from_table(table_catalog: &Table) -> Self {
54        let read_prefix_len = table_catalog.get_read_prefix_len_hint() as usize;
55
56        if read_prefix_len == 0 {
57            FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor)
58        } else if read_prefix_len > table_catalog.get_pk().len() {
59            // for now frontend had not infer the table_id_to_filter_key_extractor, so we
60            // use FullKeyFilterKeyExtractor
61            FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)
62        } else {
63            FilterKeyExtractorImpl::Schema(SchemaFilterKeyExtractor::new(table_catalog))
64        }
65    }
66}
67
68macro_rules! impl_filter_key_extractor {
69    ($( { $variant_name:ident } ),*) => {
70        impl FilterKeyExtractorImpl {
71            pub fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8]{
72                match self {
73                    $( Self::$variant_name(inner) => inner.extract(user_key), )*
74                }
75            }
76        }
77    }
78
79}
80
81macro_rules! for_all_filter_key_extractor_variants {
82    ($macro:ident) => {
83        $macro! {
84            { Schema },
85            { FullKey },
86            { Dummy },
87            { Multi },
88            { FixedLength }
89        }
90    };
91}
92
93for_all_filter_key_extractor_variants! { impl_filter_key_extractor }
94
95#[derive(Default)]
96pub struct FullKeyFilterKeyExtractor;
97
98impl FilterKeyExtractor for FullKeyFilterKeyExtractor {
99    fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
100        user_key
101    }
102}
103
104#[derive(Default)]
105pub struct DummyFilterKeyExtractor;
106impl FilterKeyExtractor for DummyFilterKeyExtractor {
107    fn extract<'a>(&self, _user_key: &'a [u8]) -> &'a [u8] {
108        &[]
109    }
110}
111
112/// [`SchemaFilterKeyExtractor`] build from `table_catalog` and extract a `user_key` to prefix for
113#[derive(Default)]
114pub struct FixedLengthFilterKeyExtractor {
115    fixed_length: usize,
116}
117
118impl FilterKeyExtractor for FixedLengthFilterKeyExtractor {
119    fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
120        &user_key[0..self.fixed_length]
121    }
122}
123
124impl FixedLengthFilterKeyExtractor {
125    pub fn new(fixed_length: usize) -> Self {
126        Self { fixed_length }
127    }
128}
129
130/// [`SchemaFilterKeyExtractor`] build from `table_catalog` and transform a `user_key` to prefix for
131/// `prefix_bloom_filter`
132pub struct SchemaFilterKeyExtractor {
133    /// Each stateful operator has its own read pattern, partly using prefix scan.
134    /// Prefix key length can be decoded through its `DataType` and `OrderType` which obtained from
135    /// `TableCatalog`. `read_pattern_prefix_column` means the count of column to decode prefix
136    /// from storage key.
137    read_prefix_len: usize,
138    deserializer: OrderedRowSerde,
139    // TODO:need some bench test for same prefix case like join (if we need a prefix_cache for same
140    // prefix_key)
141}
142
143impl FilterKeyExtractor for SchemaFilterKeyExtractor {
144    fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
145        if user_key.len() < TABLE_PREFIX_LEN + VirtualNode::SIZE {
146            return &[];
147        }
148
149        let (_table_prefix, key) = user_key.split_at(TABLE_PREFIX_LEN);
150        let (_vnode_prefix, pk) = key.split_at(VirtualNode::SIZE);
151
152        // if the key with table_id deserializer fail from schema, that should panic here for early
153        // detection.
154
155        let bloom_filter_key_len = self
156            .deserializer
157            .deserialize_prefix_len(pk, self.read_prefix_len)
158            .unwrap();
159
160        let end_position = TABLE_PREFIX_LEN + VirtualNode::SIZE + bloom_filter_key_len;
161        &user_key[TABLE_PREFIX_LEN + VirtualNode::SIZE..end_position]
162    }
163}
164
165impl SchemaFilterKeyExtractor {
166    pub fn new(table_catalog: &Table) -> Self {
167        let pk_indices: Vec<usize> = table_catalog
168            .pk
169            .iter()
170            .map(|col_order| col_order.column_index as usize)
171            .collect();
172
173        let read_prefix_len = table_catalog.get_read_prefix_len_hint() as usize;
174
175        let data_types = pk_indices
176            .iter()
177            .map(|column_idx| &table_catalog.columns[*column_idx])
178            .map(|col| ColumnDesc::from(col.column_desc.as_ref().unwrap()).data_type)
179            .collect();
180
181        let order_types: Vec<OrderType> = table_catalog
182            .pk
183            .iter()
184            .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
185            .collect();
186
187        Self {
188            read_prefix_len,
189            deserializer: OrderedRowSerde::new(data_types, order_types),
190        }
191    }
192}
193
194#[derive(Default)]
195pub struct MultiFilterKeyExtractor {
196    id_to_filter_key_extractor: HashMap<TableId, FilterKeyExtractorImpl>,
197}
198
199impl MultiFilterKeyExtractor {
200    pub fn register(&mut self, table_id: TableId, filter_key_extractor: FilterKeyExtractorImpl) {
201        self.id_to_filter_key_extractor
202            .insert(table_id, filter_key_extractor);
203    }
204
205    pub fn size(&self) -> usize {
206        self.id_to_filter_key_extractor.len()
207    }
208
209    pub fn get_existing_table_ids(&self) -> HashSet<TableId> {
210        self.id_to_filter_key_extractor.keys().cloned().collect()
211    }
212}
213
214impl Debug for MultiFilterKeyExtractor {
215    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
216        write!(f, "MultiFilterKeyExtractor size {} ", self.size())
217    }
218}
219
220impl FilterKeyExtractor for MultiFilterKeyExtractor {
221    fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
222        if user_key.len() < TABLE_PREFIX_LEN + VirtualNode::SIZE {
223            return user_key;
224        }
225
226        let table_id = get_table_id(user_key);
227        self.id_to_filter_key_extractor
228            .get(&table_id)
229            .unwrap()
230            .extract(user_key)
231    }
232}
233
234#[async_trait::async_trait]
235pub trait StateTableAccessor: Send + Sync {
236    async fn get_tables(&self, table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>>;
237}
238
239#[derive(Default)]
240pub struct FakeRemoteTableAccessor {}
241
242pub struct RemoteTableAccessor {
243    meta_client: MetaClient,
244}
245
246impl RemoteTableAccessor {
247    pub fn new(meta_client: MetaClient) -> Self {
248        Self { meta_client }
249    }
250}
251
252#[async_trait::async_trait]
253impl StateTableAccessor for RemoteTableAccessor {
254    async fn get_tables(&self, table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>> {
255        self.meta_client.get_tables(table_ids.to_vec(), true).await
256    }
257}
258
259#[async_trait::async_trait]
260impl StateTableAccessor for FakeRemoteTableAccessor {
261    async fn get_tables(&self, _table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>> {
262        Err(RpcError::Internal(anyhow::anyhow!(
263            "fake accessor does not support fetch remote table"
264        )))
265    }
266}
267
268/// `CompactionCatalogManager` is a manager to manage all `Table` which used in compaction
269pub struct CompactionCatalogManager {
270    // `table_id_to_catalog` is a map to store all `Table` which used in compaction
271    table_id_to_catalog: RwLock<HashMap<StateTableId, Table>>,
272    // `table_accessor` is a accessor to fetch `Table` from meta when the table not found
273    table_accessor: Box<dyn StateTableAccessor>,
274}
275
276impl Default for CompactionCatalogManager {
277    fn default() -> Self {
278        Self::new(Box::<FakeRemoteTableAccessor>::default())
279    }
280}
281
282impl CompactionCatalogManager {
283    pub fn new(table_accessor: Box<dyn StateTableAccessor>) -> Self {
284        Self {
285            table_id_to_catalog: Default::default(),
286            table_accessor,
287        }
288    }
289}
290
291impl CompactionCatalogManager {
292    /// `update` is used to update `Table` in `table_id_to_catalog` from notification
293    pub fn update(&self, table_id: TableId, catalog: Table) {
294        self.table_id_to_catalog.write().insert(table_id, catalog);
295    }
296
297    /// `sync` is used to sync all `Table` in `table_id_to_catalog` from notification whole snapshot
298    pub fn sync(&self, catalog_map: HashMap<TableId, Table>) {
299        let mut guard = self.table_id_to_catalog.write();
300        guard.clear();
301        guard.extend(catalog_map);
302    }
303
304    /// `remove` is used to remove `Table` in `table_id_to_catalog` by `table_id`
305    pub fn remove(&self, table_id: TableId) {
306        self.table_id_to_catalog.write().remove(&table_id);
307    }
308
309    /// `acquire` is used to acquire `CompactionCatalogAgent` by `table_ids`
310    /// if the table not found in `table_id_to_catalog`, it will fetch from meta
311    pub async fn acquire(
312        &self,
313        mut table_ids: Vec<StateTableId>,
314    ) -> HummockResult<CompactionCatalogAgentRef> {
315        if table_ids.is_empty() {
316            // table_id_set is empty
317            // the table in sst has been deleted
318
319            // use full key as default
320            return Err(HummockError::other("table_id_set is empty"));
321        }
322
323        let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
324        let mut table_id_to_vnode = HashMap::new();
325        let mut table_id_to_watermark_serde = HashMap::new();
326        let mut table_id_to_value_watermark_serde = HashMap::new();
327
328        {
329            let guard = self.table_id_to_catalog.read();
330            table_ids.retain(|table_id| match guard.get(table_id) {
331                Some(table_catalog) => {
332                    // filter-key-extractor
333                    multi_filter_key_extractor
334                        .register(*table_id, FilterKeyExtractorImpl::from_table(table_catalog));
335
336                    // vnode
337                    table_id_to_vnode.insert(*table_id, table_catalog.vnode_count());
338
339                    // watermark
340                    table_id_to_watermark_serde
341                        .insert(*table_id, build_watermark_col_serde(table_catalog));
342                    table_id_to_value_watermark_serde.insert(
343                        *table_id,
344                        build_value_watermark_col_serde(table_catalog).map(Arc::new),
345                    );
346
347                    false
348                }
349
350                None => true,
351            });
352        }
353
354        if !table_ids.is_empty() {
355            let mut state_tables =
356                self.table_accessor
357                    .get_tables(&table_ids)
358                    .await
359                    .map_err(|e| {
360                        HummockError::other(format!(
361                            "request rpc list_tables for meta failed: {}",
362                            e.as_report()
363                        ))
364                    })?;
365
366            let mut guard = self.table_id_to_catalog.write();
367            for table_id in table_ids {
368                if let Some(table) = state_tables.remove(&table_id) {
369                    let table_id = table.id;
370                    let key_extractor = FilterKeyExtractorImpl::from_table(&table);
371                    let vnode = table.vnode_count();
372                    let watermark_serde = build_watermark_col_serde(&table);
373                    let value_watermark_serde = build_value_watermark_col_serde(&table);
374                    guard.insert(table_id, table);
375                    // filter-key-extractor
376                    multi_filter_key_extractor.register(table_id, key_extractor);
377
378                    // vnode
379                    table_id_to_vnode.insert(table_id, vnode);
380
381                    // watermark
382                    table_id_to_watermark_serde.insert(table_id, watermark_serde);
383                    table_id_to_value_watermark_serde
384                        .insert(table_id, value_watermark_serde.map(Arc::new));
385                }
386            }
387        }
388
389        Ok(Arc::new(CompactionCatalogAgent::new(
390            FilterKeyExtractorImpl::Multi(multi_filter_key_extractor),
391            table_id_to_vnode,
392            table_id_to_watermark_serde,
393            table_id_to_value_watermark_serde,
394        )))
395    }
396
397    /// `build_compaction_catalog_agent` is used to build `CompactionCatalogAgent` by `table_catalogs`
398    pub fn build_compaction_catalog_agent(
399        table_catalogs: HashMap<StateTableId, Table>,
400    ) -> CompactionCatalogAgentRef {
401        let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
402        let mut table_id_to_vnode = HashMap::new();
403        let mut table_id_to_watermark_serde = HashMap::new();
404        let mut value_table_id_to_watermark_serde = HashMap::new();
405        for (table_id, table_catalog) in table_catalogs {
406            // filter-key-extractor
407            multi_filter_key_extractor
408                .register(table_id, FilterKeyExtractorImpl::from_table(&table_catalog));
409
410            // vnode
411            table_id_to_vnode.insert(table_id, table_catalog.vnode_count());
412
413            // watermark
414            table_id_to_watermark_serde.insert(table_id, build_watermark_col_serde(&table_catalog));
415            value_table_id_to_watermark_serde.insert(
416                table_id,
417                build_value_watermark_col_serde(&table_catalog).map(Arc::new),
418            );
419        }
420
421        Arc::new(CompactionCatalogAgent::new(
422            FilterKeyExtractorImpl::Multi(multi_filter_key_extractor),
423            table_id_to_vnode,
424            table_id_to_watermark_serde,
425            value_table_id_to_watermark_serde,
426        ))
427    }
428}
429
430/// `CompactionCatalogAgent` is a wrapper of `filter_key_extractor_manager` and `table_id_to_vnode`
431/// The `CompactionCatalogAgent` belongs to a compaction task call, which we will build from the `table_ids` contained in a compact task and use it during the compaction.
432/// The `CompactionCatalogAgent` can act as a agent for the `CompactionCatalogManager`, providing `extract` and `vnode_count` capabilities.
433pub struct CompactionCatalogAgent {
434    filter_key_extractor_manager: FilterKeyExtractorImpl,
435    table_id_to_vnode: HashMap<StateTableId, usize>,
436    // table_id ->(pk_prefix_serde, clean_watermark_col_serde, watermark_col_idx)
437    // cache for reduce serde build
438    table_id_to_watermark_serde:
439        HashMap<StateTableId, Option<(OrderedRowSerde, OrderedRowSerde, usize)>>,
440    value_table_id_to_watermark_serde: HashMap<StateTableId, Option<ValueWatermarkColumnSerdeRef>>,
441}
442
443impl CompactionCatalogAgent {
444    pub fn new(
445        filter_key_extractor_manager: FilterKeyExtractorImpl,
446        table_id_to_vnode: HashMap<StateTableId, usize>,
447        table_id_to_watermark_serde: HashMap<
448            StateTableId,
449            Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
450        >,
451        value_table_id_to_watermark_serde: HashMap<
452            StateTableId,
453            Option<ValueWatermarkColumnSerdeRef>,
454        >,
455    ) -> Self {
456        Self {
457            filter_key_extractor_manager,
458            table_id_to_vnode,
459            table_id_to_watermark_serde,
460            value_table_id_to_watermark_serde,
461        }
462    }
463
464    pub fn dummy() -> Self {
465        Self {
466            filter_key_extractor_manager: FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
467            table_id_to_vnode: Default::default(),
468            table_id_to_watermark_serde: Default::default(),
469            value_table_id_to_watermark_serde: Default::default(),
470        }
471    }
472
473    pub fn for_test(table_ids: Vec<impl Into<StateTableId>>) -> Arc<Self> {
474        let full_key_filter_key_extractor =
475            FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
476
477        let table_id_to_vnode: HashMap<TableId, usize> = table_ids
478            .into_iter()
479            .map(|table_id| (table_id.into(), VirtualNode::COUNT_FOR_TEST))
480            .collect();
481
482        let table_id_to_watermark_serde = table_id_to_vnode
483            .keys()
484            .map(|table_id| (*table_id, None))
485            .collect();
486
487        let value_table_id_to_watermark_serde = table_id_to_vnode
488            .keys()
489            .map(|table_id| (*table_id, None))
490            .collect();
491
492        Arc::new(CompactionCatalogAgent::new(
493            full_key_filter_key_extractor,
494            table_id_to_vnode,
495            table_id_to_watermark_serde,
496            value_table_id_to_watermark_serde,
497        ))
498    }
499}
500
501impl CompactionCatalogAgent {
502    pub fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
503        self.filter_key_extractor_manager.extract(user_key)
504    }
505
506    pub fn vnode_count(&self, table_id: StateTableId) -> usize {
507        *self.table_id_to_vnode.get(&table_id).unwrap_or_else(|| {
508            panic!(
509                "table_id not found {} all_table_ids {:?}",
510                table_id,
511                self.table_id_to_vnode.keys()
512            )
513        })
514    }
515
516    pub fn watermark_serde(
517        &self,
518        table_id: StateTableId,
519    ) -> Option<(OrderedRowSerde, OrderedRowSerde, usize)> {
520        self.table_id_to_watermark_serde
521            .get(&table_id)
522            .unwrap_or_else(|| {
523                panic!(
524                    "table_id not found {} all_table_ids {:?}",
525                    table_id,
526                    self.table_id_to_watermark_serde.keys()
527                )
528            })
529            .clone()
530    }
531
532    pub fn value_watermark_serde(
533        &self,
534        table_id: StateTableId,
535    ) -> Option<ValueWatermarkColumnSerdeRef> {
536        self.value_table_id_to_watermark_serde
537            .get(&table_id)
538            .unwrap_or_else(|| {
539                panic!(
540                    "table_id not found {} all_table_ids {:?}",
541                    table_id,
542                    self.value_table_id_to_watermark_serde.keys()
543                )
544            })
545            .clone()
546    }
547
548    pub fn table_id_to_vnode_ref(&self) -> &HashMap<StateTableId, usize> {
549        &self.table_id_to_vnode
550    }
551
552    pub fn table_ids(&self) -> impl Iterator<Item = StateTableId> + '_ {
553        self.table_id_to_vnode.keys().cloned()
554    }
555}
556
557pub type CompactionCatalogManagerRef = Arc<CompactionCatalogManager>;
558pub type CompactionCatalogAgentRef = Arc<CompactionCatalogAgent>;
559
560fn build_watermark_col_serde(
561    table_catalog: &Table,
562) -> Option<(OrderedRowSerde, OrderedRowSerde, usize)> {
563    // Get clean watermark PK index using the helper method
564    let clean_watermark_index_in_pk = table_catalog.get_clean_watermark_index_in_pk_compat();
565
566    match clean_watermark_index_in_pk {
567        None => {
568            // non watermark table or watermark column is the first column (pk_prefix_watermark)
569            // TODO(ttl): if the watermark column is in the value, we may also get a `None` here, support it.
570            None
571        }
572
573        Some(clean_watermark_index_in_pk) => {
574            use risingwave_common::types::DataType;
575            let table_columns: Vec<ColumnDesc> = table_catalog
576                .columns
577                .iter()
578                .map(|col| col.column_desc.as_ref().unwrap().into())
579                .collect();
580
581            let pk_data_types: Vec<DataType> = table_catalog
582                .pk
583                .iter()
584                .map(|col_order| {
585                    table_columns[col_order.column_index as usize]
586                        .data_type
587                        .clone()
588                })
589                .collect();
590
591            let pk_order_types = table_catalog
592                .pk
593                .iter()
594                .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
595                .collect_vec();
596
597            assert_eq!(pk_data_types.len(), pk_order_types.len());
598            let pk_serde = OrderedRowSerde::new(pk_data_types, pk_order_types);
599            let watermark_col_serde = pk_serde.index(clean_watermark_index_in_pk).into_owned();
600            Some((pk_serde, watermark_col_serde, clean_watermark_index_in_pk))
601        }
602    }
603}
604
605fn build_value_watermark_col_serde(table_catalog: &Table) -> Option<ValueWatermarkColumnSerde> {
606    /// Returns the column index of non-pk watermark column.
607    pub fn try_get_non_pk_clean_watermark_column_index(table: &Table) -> Option<usize> {
608        table
609            .get_clean_watermark_column_indices()
610            .iter()
611            .filter_map(|&col_idx| {
612                if table
613                    .pk
614                    .iter()
615                    .any(|col_order| col_order.column_index == col_idx)
616                {
617                    return None;
618                }
619                Some(col_idx as usize)
620            })
621            .at_most_one()
622            .unwrap()
623    }
624
625    let clean_watermark_index = try_get_non_pk_clean_watermark_column_index(table_catalog)?;
626    Some(ValueWatermarkColumnSerde::new(
627        table_catalog,
628        clean_watermark_index,
629    ))
630}
631
632pub struct ValueWatermarkColumnSerde {
633    /// For `ColumnAwareSerde`, only 1 column is deserialized.
634    row_serde: EitherSerde,
635    /// For `ColumnAwareSerde`, index have been rewritten to 0.
636    watermark_index_in_de_row: usize,
637    watermark_column_mem_encoding_order: OrderType,
638}
639
640pub type ValueWatermarkColumnSerdeRef = Arc<ValueWatermarkColumnSerde>;
641
642impl ValueWatermarkColumnSerde {
643    fn new(table_catalog: &Table, clean_watermark_index: usize) -> Self {
644        let table_columns: Vec<ColumnDesc> = table_catalog
645            .columns
646            .iter()
647            .map(|col| col.column_desc.as_ref().unwrap().into())
648            .collect();
649        let pk_order_type = table_catalog
650            .pk
651            .iter()
652            .filter_map(|col_order| {
653                if col_order.column_index as usize == clean_watermark_index {
654                    return Some(OrderType::from_protobuf(
655                        col_order.get_order_type().unwrap(),
656                    ));
657                }
658                None
659            })
660            .at_most_one()
661            .unwrap();
662        // Correctness requires the assumption that a table contains at most one watermark column.
663        let watermark_column_mem_encoding_order = match pk_order_type {
664            Some(o) => o,
665            // Correctness requires the assumption that the order is the same as the one used in StateTable when serializing watermark for value column.
666            None => OrderType::ascending(),
667        };
668        // Correctness requires the assumption that the watermark column is stored in the value. (See comment on Table::value_indices.)
669        let Some(watermark_index_in_value_indices) = table_catalog
670            .value_indices
671            .iter()
672            .position(|p| *p as usize == clean_watermark_index)
673        else {
674            panic!(
675                "Watermark index {} not found in value_indices {:?}.",
676                clean_watermark_index, table_catalog.value_indices
677            );
678        };
679        let (row_serde, watermark_index_in_de_row) = if table_catalog.version.is_none() {
680            let row_serde = BasicSerde::new(
681                Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
682                Arc::from(table_columns.into_boxed_slice()),
683            )
684            .into();
685            (row_serde, watermark_index_in_value_indices)
686        } else {
687            let row_serde = ColumnAwareSerde::new(
688                Arc::from_iter(iter::once(clean_watermark_index)),
689                Arc::from(table_columns.into_boxed_slice()),
690            )
691            .into();
692            // Only one column.
693            (row_serde, 0)
694        };
695        Self {
696            row_serde,
697            watermark_index_in_de_row,
698            watermark_column_mem_encoding_order,
699        }
700    }
701
702    pub fn deserialize(&self, encoded_bytes: &[u8]) -> HummockResult<Option<Vec<u8>>> {
703        let mut row = self
704            .row_serde
705            .deserialize(encoded_bytes)
706            .map_err(HummockError::decode_error)?;
707        if self.watermark_index_in_de_row >= row.len() {
708            // The watermark column has been dropped in column aware row encoding.
709            return Ok(None);
710        }
711        let datum = std::mem::take(&mut row[self.watermark_index_in_de_row]);
712        // Correctness requires on the assumption that the watermark is serialized using memcmp_encoding in StateTable.
713        let bytes = memcmp_encoding::encode_value(datum, self.watermark_column_mem_encoding_order)
714            .map_err(HummockError::encode_error)?;
715        Ok(Some(bytes.into()))
716    }
717}
718
719#[cfg(test)]
720mod tests {
721    use std::mem;
722
723    use bytes::{BufMut, BytesMut};
724    use itertools::Itertools;
725    use risingwave_common::catalog::ColumnDesc;
726    use risingwave_common::hash::VirtualNode;
727    use risingwave_common::row::OwnedRow;
728    use risingwave_common::types::DataType;
729    use risingwave_common::types::ScalarImpl::{self};
730    use risingwave_common::util::row_serde::OrderedRowSerde;
731    use risingwave_common::util::sort_util::OrderType;
732    use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN;
733    use risingwave_pb::catalog::table::{PbEngine, TableType};
734    use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable};
735    use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType};
736    use risingwave_pb::plan_common::PbColumnCatalog;
737
738    use super::{DummyFilterKeyExtractor, FilterKeyExtractor, SchemaFilterKeyExtractor};
739    use crate::compaction_catalog_manager::{
740        FilterKeyExtractorImpl, FullKeyFilterKeyExtractor, MultiFilterKeyExtractor,
741    };
742    const fn dummy_vnode() -> [u8; VirtualNode::SIZE] {
743        VirtualNode::from_index(233).to_be_bytes()
744    }
745
746    #[test]
747    fn test_default_filter_key_extractor() {
748        let dummy_filter_key_extractor = DummyFilterKeyExtractor;
749        let full_key = "full_key".as_bytes();
750        let output_key = dummy_filter_key_extractor.extract(full_key);
751
752        assert_eq!("".as_bytes(), output_key);
753
754        let full_key_filter_key_extractor = FullKeyFilterKeyExtractor;
755        let output_key = full_key_filter_key_extractor.extract(full_key);
756
757        assert_eq!(full_key, output_key);
758    }
759
760    fn build_table_with_prefix_column_num(column_count: u32) -> PbTable {
761        PbTable {
762            id: 0.into(),
763            schema_id: 0.into(),
764            database_id: 0.into(),
765            name: "test".to_owned(),
766            table_type: TableType::Table as i32,
767            columns: vec![
768                PbColumnCatalog {
769                    column_desc: Some(
770                        (&ColumnDesc::named("_row_id", 0.into(), DataType::Int64)).into(),
771                    ),
772                    is_hidden: true,
773                },
774                PbColumnCatalog {
775                    column_desc: Some(
776                        (&ColumnDesc::named("col_1", 0.into(), DataType::Int64)).into(),
777                    ),
778                    is_hidden: false,
779                },
780                PbColumnCatalog {
781                    column_desc: Some(
782                        (&ColumnDesc::named("col_2", 0.into(), DataType::Float64)).into(),
783                    ),
784                    is_hidden: false,
785                },
786                PbColumnCatalog {
787                    column_desc: Some(
788                        (&ColumnDesc::named("col_3", 0.into(), DataType::Varchar)).into(),
789                    ),
790                    is_hidden: false,
791                },
792            ],
793            pk: vec![
794                PbColumnOrder {
795                    column_index: 1,
796                    order_type: Some(PbOrderType {
797                        direction: PbDirection::Ascending as _,
798                        nulls_are: PbNullsAre::Largest as _,
799                    }),
800                },
801                PbColumnOrder {
802                    column_index: 3,
803                    order_type: Some(PbOrderType {
804                        direction: PbDirection::Ascending as _,
805                        nulls_are: PbNullsAre::Largest as _,
806                    }),
807                },
808            ],
809            stream_key: vec![0],
810            distribution_key: (0..column_count as i32).collect_vec(),
811            optional_associated_source_id: None,
812            append_only: false,
813            owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
814            retention_seconds: Some(300),
815            fragment_id: 0.into(),
816            dml_fragment_id: None,
817            initialized_at_epoch: None,
818            vnode_col_index: None,
819            row_id_index: Some(0),
820            value_indices: vec![0],
821            definition: "".into(),
822            handle_pk_conflict_behavior: 0,
823            version_column_indices: vec![],
824            read_prefix_len_hint: 1,
825            version: None,
826            watermark_indices: vec![],
827            dist_key_in_pk: vec![],
828            cardinality: None,
829            created_at_epoch: None,
830            #[expect(deprecated)]
831            cleaned_by_watermark: false,
832            stream_job_status: PbStreamJobStatus::Created.into(),
833            create_type: PbCreateType::Foreground.into(),
834            description: None,
835            #[expect(deprecated)]
836            incoming_sinks: Default::default(),
837            initialized_at_cluster_version: None,
838            created_at_cluster_version: None,
839            cdc_table_id: None,
840            maybe_vnode_count: None,
841            webhook_info: None,
842            job_id: None,
843            engine: Some(PbEngine::Hummock as i32),
844            #[expect(deprecated)]
845            clean_watermark_index_in_pk: None,
846            clean_watermark_indices: vec![],
847            refreshable: false,
848            vector_index_info: None,
849            cdc_table_type: None,
850        }
851    }
852
853    #[test]
854    fn test_zero_read_prefix_len_uses_dummy_extractor() {
855        let mut prost_table = build_table_with_prefix_column_num(1);
856        prost_table.read_prefix_len_hint = 0;
857
858        let extractor = FilterKeyExtractorImpl::from_table(&prost_table);
859        let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
860        let schema = vec![DataType::Int64, DataType::Varchar];
861        let serializer = OrderedRowSerde::new(schema, order_types);
862        let row = OwnedRow::new(vec![
863            Some(ScalarImpl::Int64(100)),
864            Some(ScalarImpl::Utf8("abc".into())),
865        ]);
866        let mut row_bytes = vec![];
867        serializer.serialize(&row, &mut row_bytes);
868
869        let table_prefix = {
870            let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
871            buf.put_u32(1);
872            buf.to_vec()
873        };
874        let vnode_prefix = &dummy_vnode()[..];
875        let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
876
877        assert!(extractor.extract(&full_key).is_empty());
878    }
879
880    #[test]
881    fn test_schema_filter_key_extractor() {
882        let prost_table = build_table_with_prefix_column_num(1);
883        let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
884
885        let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
886        let schema = vec![DataType::Int64, DataType::Varchar];
887        let serializer = OrderedRowSerde::new(schema, order_types);
888        let row = OwnedRow::new(vec![
889            Some(ScalarImpl::Int64(100)),
890            Some(ScalarImpl::Utf8("abc".into())),
891        ]);
892        let mut row_bytes = vec![];
893        serializer.serialize(&row, &mut row_bytes);
894
895        let table_prefix = {
896            let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
897            buf.put_u32(1);
898            buf.to_vec()
899        };
900
901        let vnode_prefix = &dummy_vnode()[..];
902
903        let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
904        let output_key = schema_filter_key_extractor.extract(&full_key);
905        assert_eq!(1 + mem::size_of::<i64>(), output_key.len());
906    }
907
908    #[test]
909    fn test_multi_filter_key_extractor() {
910        let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
911        {
912            // test table_id 1
913            let prost_table = build_table_with_prefix_column_num(1);
914            let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
915            multi_filter_key_extractor.register(
916                1.into(),
917                FilterKeyExtractorImpl::Schema(schema_filter_key_extractor),
918            );
919            let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
920            let schema = vec![DataType::Int64, DataType::Varchar];
921            let serializer = OrderedRowSerde::new(schema, order_types);
922            let row = OwnedRow::new(vec![
923                Some(ScalarImpl::Int64(100)),
924                Some(ScalarImpl::Utf8("abc".into())),
925            ]);
926            let mut row_bytes = vec![];
927            serializer.serialize(&row, &mut row_bytes);
928
929            let table_prefix = {
930                let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
931                buf.put_u32(1);
932                buf.to_vec()
933            };
934
935            let vnode_prefix = &dummy_vnode()[..];
936
937            let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
938            let output_key = multi_filter_key_extractor.extract(&full_key);
939
940            let data_types = vec![DataType::Int64];
941            let order_types = vec![OrderType::ascending()];
942            let deserializer = OrderedRowSerde::new(data_types, order_types);
943
944            let pk_prefix_len = deserializer.deserialize_prefix_len(&row_bytes, 1).unwrap();
945            assert_eq!(pk_prefix_len, output_key.len());
946        }
947
948        {
949            // test table_id 1
950            let prost_table = build_table_with_prefix_column_num(2);
951            let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
952            multi_filter_key_extractor.register(
953                2.into(),
954                FilterKeyExtractorImpl::Schema(schema_filter_key_extractor),
955            );
956            let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
957            let schema = vec![DataType::Int64, DataType::Varchar];
958            let serializer = OrderedRowSerde::new(schema, order_types);
959            let row = OwnedRow::new(vec![
960                Some(ScalarImpl::Int64(100)),
961                Some(ScalarImpl::Utf8("abc".into())),
962            ]);
963            let mut row_bytes = vec![];
964            serializer.serialize(&row, &mut row_bytes);
965
966            let table_prefix = {
967                let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
968                buf.put_u32(2);
969                buf.to_vec()
970            };
971
972            let vnode_prefix = &dummy_vnode()[..];
973
974            let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
975            let output_key = multi_filter_key_extractor.extract(&full_key);
976
977            let data_types = vec![DataType::Int64, DataType::Varchar];
978            let order_types = vec![OrderType::ascending(), OrderType::ascending()];
979            let deserializer = OrderedRowSerde::new(data_types, order_types);
980
981            let pk_prefix_len = deserializer.deserialize_prefix_len(&row_bytes, 1).unwrap();
982
983            assert_eq!(pk_prefix_len, output_key.len());
984        }
985    }
986
987    #[tokio::test]
988    async fn test_compaction_catalog_manager_exception() {
989        let compaction_catalog_manager = super::CompactionCatalogManager::default();
990
991        {
992            let ret = compaction_catalog_manager.acquire(vec![]).await;
993            assert!(ret.is_err());
994            if let Err(e) = ret {
995                assert_eq!(e.to_string(), "Other error: table_id_set is empty");
996            }
997        }
998
999        {
1000            // network error with FakeRemoteTableAccessor
1001            let ret = compaction_catalog_manager.acquire(vec![1.into()]).await;
1002            assert!(ret.is_err());
1003            if let Err(e) = ret {
1004                assert_eq!(
1005                    e.to_string(),
1006                    "Other error: request rpc list_tables for meta failed: fake accessor does not support fetch remote table"
1007                );
1008            }
1009        }
1010    }
1011}