risingwave_storage/
compaction_catalog_manager.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::iter;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RwLock;
22use risingwave_common::catalog::{ColumnDesc, TableId};
23use risingwave_common::hash::{VirtualNode, VnodeCountCompat};
24use risingwave_common::util::memcmp_encoding;
25use risingwave_common::util::row_serde::OrderedRowSerde;
26use risingwave_common::util::sort_util::OrderType;
27use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
28use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
29use risingwave_hummock_sdk::compaction_group::StateTableId;
30use risingwave_hummock_sdk::key::{TABLE_PREFIX_LEN, get_table_id};
31use risingwave_pb::catalog::Table;
32use risingwave_rpc_client::MetaClient;
33use risingwave_rpc_client::error::{Result as RpcResult, RpcError};
34use thiserror_ext::AsReport;
35
36use crate::hummock::{HummockError, HummockResult};
37use crate::row_serde::value_serde::ValueRowSerdeNew;
38
39/// `FilterKeyExtractor` generally used to extract key which will store in BloomFilter
40pub trait FilterKeyExtractor: Send + Sync {
41    fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8];
42}
43
44pub enum FilterKeyExtractorImpl {
45    Schema(SchemaFilterKeyExtractor),
46    FullKey(FullKeyFilterKeyExtractor),
47    Dummy(DummyFilterKeyExtractor),
48    Multi(MultiFilterKeyExtractor),
49    FixedLength(FixedLengthFilterKeyExtractor),
50}
51
52impl FilterKeyExtractorImpl {
53    pub fn from_table(table_catalog: &Table) -> Self {
54        let read_prefix_len = table_catalog.get_read_prefix_len_hint() as usize;
55
56        if read_prefix_len == 0 || read_prefix_len > table_catalog.get_pk().len() {
57            // for now frontend had not infer the table_id_to_filter_key_extractor, so we
58            // use FullKeyFilterKeyExtractor
59            FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)
60        } else {
61            FilterKeyExtractorImpl::Schema(SchemaFilterKeyExtractor::new(table_catalog))
62        }
63    }
64}
65
66macro_rules! impl_filter_key_extractor {
67    ($( { $variant_name:ident } ),*) => {
68        impl FilterKeyExtractorImpl {
69            pub fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8]{
70                match self {
71                    $( Self::$variant_name(inner) => inner.extract(full_key), )*
72                }
73            }
74        }
75    }
76
77}
78
79macro_rules! for_all_filter_key_extractor_variants {
80    ($macro:ident) => {
81        $macro! {
82            { Schema },
83            { FullKey },
84            { Dummy },
85            { Multi },
86            { FixedLength }
87        }
88    };
89}
90
91for_all_filter_key_extractor_variants! { impl_filter_key_extractor }
92
93#[derive(Default)]
94pub struct FullKeyFilterKeyExtractor;
95
96impl FilterKeyExtractor for FullKeyFilterKeyExtractor {
97    fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
98        user_key
99    }
100}
101
102#[derive(Default)]
103pub struct DummyFilterKeyExtractor;
104impl FilterKeyExtractor for DummyFilterKeyExtractor {
105    fn extract<'a>(&self, _full_key: &'a [u8]) -> &'a [u8] {
106        &[]
107    }
108}
109
110/// [`SchemaFilterKeyExtractor`] build from `table_catalog` and extract a `full_key` to prefix for
111#[derive(Default)]
112pub struct FixedLengthFilterKeyExtractor {
113    fixed_length: usize,
114}
115
116impl FilterKeyExtractor for FixedLengthFilterKeyExtractor {
117    fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8] {
118        &full_key[0..self.fixed_length]
119    }
120}
121
122impl FixedLengthFilterKeyExtractor {
123    pub fn new(fixed_length: usize) -> Self {
124        Self { fixed_length }
125    }
126}
127
128/// [`SchemaFilterKeyExtractor`] build from `table_catalog` and transform a `full_key` to prefix for
129/// `prefix_bloom_filter`
130pub struct SchemaFilterKeyExtractor {
131    /// Each stateful operator has its own read pattern, partly using prefix scan.
132    /// Prefix key length can be decoded through its `DataType` and `OrderType` which obtained from
133    /// `TableCatalog`. `read_pattern_prefix_column` means the count of column to decode prefix
134    /// from storage key.
135    read_prefix_len: usize,
136    deserializer: OrderedRowSerde,
137    // TODO:need some bench test for same prefix case like join (if we need a prefix_cache for same
138    // prefix_key)
139}
140
141impl FilterKeyExtractor for SchemaFilterKeyExtractor {
142    fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8] {
143        if full_key.len() < TABLE_PREFIX_LEN + VirtualNode::SIZE {
144            return &[];
145        }
146
147        let (_table_prefix, key) = full_key.split_at(TABLE_PREFIX_LEN);
148        let (_vnode_prefix, pk) = key.split_at(VirtualNode::SIZE);
149
150        // if the key with table_id deserializer fail from schema, that should panic here for early
151        // detection.
152
153        let bloom_filter_key_len = self
154            .deserializer
155            .deserialize_prefix_len(pk, self.read_prefix_len)
156            .unwrap();
157
158        let end_position = TABLE_PREFIX_LEN + VirtualNode::SIZE + bloom_filter_key_len;
159        &full_key[TABLE_PREFIX_LEN + VirtualNode::SIZE..end_position]
160    }
161}
162
163impl SchemaFilterKeyExtractor {
164    pub fn new(table_catalog: &Table) -> Self {
165        let pk_indices: Vec<usize> = table_catalog
166            .pk
167            .iter()
168            .map(|col_order| col_order.column_index as usize)
169            .collect();
170
171        let read_prefix_len = table_catalog.get_read_prefix_len_hint() as usize;
172
173        let data_types = pk_indices
174            .iter()
175            .map(|column_idx| &table_catalog.columns[*column_idx])
176            .map(|col| ColumnDesc::from(col.column_desc.as_ref().unwrap()).data_type)
177            .collect();
178
179        let order_types: Vec<OrderType> = table_catalog
180            .pk
181            .iter()
182            .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
183            .collect();
184
185        Self {
186            read_prefix_len,
187            deserializer: OrderedRowSerde::new(data_types, order_types),
188        }
189    }
190}
191
192#[derive(Default)]
193pub struct MultiFilterKeyExtractor {
194    id_to_filter_key_extractor: HashMap<TableId, FilterKeyExtractorImpl>,
195}
196
197impl MultiFilterKeyExtractor {
198    pub fn register(&mut self, table_id: TableId, filter_key_extractor: FilterKeyExtractorImpl) {
199        self.id_to_filter_key_extractor
200            .insert(table_id, filter_key_extractor);
201    }
202
203    pub fn size(&self) -> usize {
204        self.id_to_filter_key_extractor.len()
205    }
206
207    pub fn get_existing_table_ids(&self) -> HashSet<TableId> {
208        self.id_to_filter_key_extractor.keys().cloned().collect()
209    }
210}
211
212impl Debug for MultiFilterKeyExtractor {
213    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214        write!(f, "MultiFilterKeyExtractor size {} ", self.size())
215    }
216}
217
218impl FilterKeyExtractor for MultiFilterKeyExtractor {
219    fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8] {
220        if full_key.len() < TABLE_PREFIX_LEN + VirtualNode::SIZE {
221            return full_key;
222        }
223
224        let table_id = get_table_id(full_key);
225        self.id_to_filter_key_extractor
226            .get(&table_id)
227            .unwrap()
228            .extract(full_key)
229    }
230}
231
232#[async_trait::async_trait]
233pub trait StateTableAccessor: Send + Sync {
234    async fn get_tables(&self, table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>>;
235}
236
237#[derive(Default)]
238pub struct FakeRemoteTableAccessor {}
239
240pub struct RemoteTableAccessor {
241    meta_client: MetaClient,
242}
243
244impl RemoteTableAccessor {
245    pub fn new(meta_client: MetaClient) -> Self {
246        Self { meta_client }
247    }
248}
249
250#[async_trait::async_trait]
251impl StateTableAccessor for RemoteTableAccessor {
252    async fn get_tables(&self, table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>> {
253        self.meta_client.get_tables(table_ids.to_vec(), true).await
254    }
255}
256
257#[async_trait::async_trait]
258impl StateTableAccessor for FakeRemoteTableAccessor {
259    async fn get_tables(&self, _table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>> {
260        Err(RpcError::Internal(anyhow::anyhow!(
261            "fake accessor does not support fetch remote table"
262        )))
263    }
264}
265
266/// `CompactionCatalogManager` is a manager to manage all `Table` which used in compaction
267pub struct CompactionCatalogManager {
268    // `table_id_to_catalog` is a map to store all `Table` which used in compaction
269    table_id_to_catalog: RwLock<HashMap<StateTableId, Table>>,
270    // `table_accessor` is a accessor to fetch `Table` from meta when the table not found
271    table_accessor: Box<dyn StateTableAccessor>,
272}
273
274impl Default for CompactionCatalogManager {
275    fn default() -> Self {
276        Self::new(Box::<FakeRemoteTableAccessor>::default())
277    }
278}
279
280impl CompactionCatalogManager {
281    pub fn new(table_accessor: Box<dyn StateTableAccessor>) -> Self {
282        Self {
283            table_id_to_catalog: Default::default(),
284            table_accessor,
285        }
286    }
287}
288
289impl CompactionCatalogManager {
290    /// `update` is used to update `Table` in `table_id_to_catalog` from notification
291    pub fn update(&self, table_id: TableId, catalog: Table) {
292        self.table_id_to_catalog.write().insert(table_id, catalog);
293    }
294
295    /// `sync` is used to sync all `Table` in `table_id_to_catalog` from notification whole snapshot
296    pub fn sync(&self, catalog_map: HashMap<TableId, Table>) {
297        let mut guard = self.table_id_to_catalog.write();
298        guard.clear();
299        guard.extend(catalog_map);
300    }
301
302    /// `remove` is used to remove `Table` in `table_id_to_catalog` by `table_id`
303    pub fn remove(&self, table_id: TableId) {
304        self.table_id_to_catalog.write().remove(&table_id);
305    }
306
307    /// `acquire` is used to acquire `CompactionCatalogAgent` by `table_ids`
308    /// if the table not found in `table_id_to_catalog`, it will fetch from meta
309    pub async fn acquire(
310        &self,
311        mut table_ids: Vec<StateTableId>,
312    ) -> HummockResult<CompactionCatalogAgentRef> {
313        if table_ids.is_empty() {
314            // table_id_set is empty
315            // the table in sst has been deleted
316
317            // use full key as default
318            return Err(HummockError::other("table_id_set is empty"));
319        }
320
321        let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
322        let mut table_id_to_vnode = HashMap::new();
323        let mut table_id_to_watermark_serde = HashMap::new();
324        let mut table_id_to_value_watermark_serde = HashMap::new();
325
326        {
327            let guard = self.table_id_to_catalog.read();
328            table_ids.retain(|table_id| match guard.get(table_id) {
329                Some(table_catalog) => {
330                    // filter-key-extractor
331                    multi_filter_key_extractor
332                        .register(*table_id, FilterKeyExtractorImpl::from_table(table_catalog));
333
334                    // vnode
335                    table_id_to_vnode.insert(*table_id, table_catalog.vnode_count());
336
337                    // watermark
338                    table_id_to_watermark_serde
339                        .insert(*table_id, build_watermark_col_serde(table_catalog));
340                    table_id_to_value_watermark_serde.insert(
341                        *table_id,
342                        build_value_watermark_col_serde(table_catalog).map(Arc::new),
343                    );
344
345                    false
346                }
347
348                None => true,
349            });
350        }
351
352        if !table_ids.is_empty() {
353            let mut state_tables =
354                self.table_accessor
355                    .get_tables(&table_ids)
356                    .await
357                    .map_err(|e| {
358                        HummockError::other(format!(
359                            "request rpc list_tables for meta failed: {}",
360                            e.as_report()
361                        ))
362                    })?;
363
364            let mut guard = self.table_id_to_catalog.write();
365            for table_id in table_ids {
366                if let Some(table) = state_tables.remove(&table_id) {
367                    let table_id = table.id;
368                    let key_extractor = FilterKeyExtractorImpl::from_table(&table);
369                    let vnode = table.vnode_count();
370                    let watermark_serde = build_watermark_col_serde(&table);
371                    let value_watermark_serde = build_value_watermark_col_serde(&table);
372                    guard.insert(table_id, table);
373                    // filter-key-extractor
374                    multi_filter_key_extractor.register(table_id, key_extractor);
375
376                    // vnode
377                    table_id_to_vnode.insert(table_id, vnode);
378
379                    // watermark
380                    table_id_to_watermark_serde.insert(table_id, watermark_serde);
381                    table_id_to_value_watermark_serde
382                        .insert(table_id, value_watermark_serde.map(Arc::new));
383                }
384            }
385        }
386
387        Ok(Arc::new(CompactionCatalogAgent::new(
388            FilterKeyExtractorImpl::Multi(multi_filter_key_extractor),
389            table_id_to_vnode,
390            table_id_to_watermark_serde,
391            table_id_to_value_watermark_serde,
392        )))
393    }
394
395    /// `build_compaction_catalog_agent` is used to build `CompactionCatalogAgent` by `table_catalogs`
396    pub fn build_compaction_catalog_agent(
397        table_catalogs: HashMap<StateTableId, Table>,
398    ) -> CompactionCatalogAgentRef {
399        let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
400        let mut table_id_to_vnode = HashMap::new();
401        let mut table_id_to_watermark_serde = HashMap::new();
402        let mut value_table_id_to_watermark_serde = HashMap::new();
403        for (table_id, table_catalog) in table_catalogs {
404            // filter-key-extractor
405            multi_filter_key_extractor
406                .register(table_id, FilterKeyExtractorImpl::from_table(&table_catalog));
407
408            // vnode
409            table_id_to_vnode.insert(table_id, table_catalog.vnode_count());
410
411            // watermark
412            table_id_to_watermark_serde.insert(table_id, build_watermark_col_serde(&table_catalog));
413            value_table_id_to_watermark_serde.insert(
414                table_id,
415                build_value_watermark_col_serde(&table_catalog).map(Arc::new),
416            );
417        }
418
419        Arc::new(CompactionCatalogAgent::new(
420            FilterKeyExtractorImpl::Multi(multi_filter_key_extractor),
421            table_id_to_vnode,
422            table_id_to_watermark_serde,
423            value_table_id_to_watermark_serde,
424        ))
425    }
426}
427
428/// `CompactionCatalogAgent` is a wrapper of `filter_key_extractor_manager` and `table_id_to_vnode`
429/// The `CompactionCatalogAgent` belongs to a compaction task call, which we will build from the `table_ids` contained in a compact task and use it during the compaction.
430/// The `CompactionCatalogAgent` can act as a agent for the `CompactionCatalogManager`, providing `extract` and `vnode_count` capabilities.
431pub struct CompactionCatalogAgent {
432    filter_key_extractor_manager: FilterKeyExtractorImpl,
433    table_id_to_vnode: HashMap<StateTableId, usize>,
434    // table_id ->(pk_prefix_serde, clean_watermark_col_serde, watermark_col_idx)
435    // cache for reduce serde build
436    table_id_to_watermark_serde:
437        HashMap<StateTableId, Option<(OrderedRowSerde, OrderedRowSerde, usize)>>,
438    value_table_id_to_watermark_serde: HashMap<StateTableId, Option<ValueWatermarkColumnSerdeRef>>,
439}
440
441impl CompactionCatalogAgent {
442    pub fn new(
443        filter_key_extractor_manager: FilterKeyExtractorImpl,
444        table_id_to_vnode: HashMap<StateTableId, usize>,
445        table_id_to_watermark_serde: HashMap<
446            StateTableId,
447            Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
448        >,
449        value_table_id_to_watermark_serde: HashMap<
450            StateTableId,
451            Option<ValueWatermarkColumnSerdeRef>,
452        >,
453    ) -> Self {
454        Self {
455            filter_key_extractor_manager,
456            table_id_to_vnode,
457            table_id_to_watermark_serde,
458            value_table_id_to_watermark_serde,
459        }
460    }
461
462    pub fn dummy() -> Self {
463        Self {
464            filter_key_extractor_manager: FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
465            table_id_to_vnode: Default::default(),
466            table_id_to_watermark_serde: Default::default(),
467            value_table_id_to_watermark_serde: Default::default(),
468        }
469    }
470
471    pub fn for_test(table_ids: Vec<impl Into<StateTableId>>) -> Arc<Self> {
472        let full_key_filter_key_extractor =
473            FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
474
475        let table_id_to_vnode: HashMap<TableId, usize> = table_ids
476            .into_iter()
477            .map(|table_id| (table_id.into(), VirtualNode::COUNT_FOR_TEST))
478            .collect();
479
480        let table_id_to_watermark_serde = table_id_to_vnode
481            .keys()
482            .map(|table_id| (*table_id, None))
483            .collect();
484
485        let value_table_id_to_watermark_serde = table_id_to_vnode
486            .keys()
487            .map(|table_id| (*table_id, None))
488            .collect();
489
490        Arc::new(CompactionCatalogAgent::new(
491            full_key_filter_key_extractor,
492            table_id_to_vnode,
493            table_id_to_watermark_serde,
494            value_table_id_to_watermark_serde,
495        ))
496    }
497}
498
499impl CompactionCatalogAgent {
500    pub fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8] {
501        self.filter_key_extractor_manager.extract(full_key)
502    }
503
504    pub fn vnode_count(&self, table_id: StateTableId) -> usize {
505        *self.table_id_to_vnode.get(&table_id).unwrap_or_else(|| {
506            panic!(
507                "table_id not found {} all_table_ids {:?}",
508                table_id,
509                self.table_id_to_vnode.keys()
510            )
511        })
512    }
513
514    pub fn watermark_serde(
515        &self,
516        table_id: StateTableId,
517    ) -> Option<(OrderedRowSerde, OrderedRowSerde, usize)> {
518        self.table_id_to_watermark_serde
519            .get(&table_id)
520            .unwrap_or_else(|| {
521                panic!(
522                    "table_id not found {} all_table_ids {:?}",
523                    table_id,
524                    self.table_id_to_watermark_serde.keys()
525                )
526            })
527            .clone()
528    }
529
530    pub fn value_watermark_serde(
531        &self,
532        table_id: StateTableId,
533    ) -> Option<ValueWatermarkColumnSerdeRef> {
534        self.value_table_id_to_watermark_serde
535            .get(&table_id)
536            .unwrap_or_else(|| {
537                panic!(
538                    "table_id not found {} all_table_ids {:?}",
539                    table_id,
540                    self.value_table_id_to_watermark_serde.keys()
541                )
542            })
543            .clone()
544    }
545
546    pub fn table_id_to_vnode_ref(&self) -> &HashMap<StateTableId, usize> {
547        &self.table_id_to_vnode
548    }
549
550    pub fn table_ids(&self) -> impl Iterator<Item = StateTableId> + '_ {
551        self.table_id_to_vnode.keys().cloned()
552    }
553}
554
555pub type CompactionCatalogManagerRef = Arc<CompactionCatalogManager>;
556pub type CompactionCatalogAgentRef = Arc<CompactionCatalogAgent>;
557
558fn build_watermark_col_serde(
559    table_catalog: &Table,
560) -> Option<(OrderedRowSerde, OrderedRowSerde, usize)> {
561    // Get clean watermark PK index using the helper method
562    let clean_watermark_index_in_pk = table_catalog.get_clean_watermark_index_in_pk_compat();
563
564    match clean_watermark_index_in_pk {
565        None => {
566            // non watermark table or watermark column is the first column (pk_prefix_watermark)
567            // TODO(ttl): if the watermark column is in the value, we may also get a `None` here, support it.
568            None
569        }
570
571        Some(clean_watermark_index_in_pk) => {
572            use risingwave_common::types::DataType;
573            let table_columns: Vec<ColumnDesc> = table_catalog
574                .columns
575                .iter()
576                .map(|col| col.column_desc.as_ref().unwrap().into())
577                .collect();
578
579            let pk_data_types: Vec<DataType> = table_catalog
580                .pk
581                .iter()
582                .map(|col_order| {
583                    table_columns[col_order.column_index as usize]
584                        .data_type
585                        .clone()
586                })
587                .collect();
588
589            let pk_order_types = table_catalog
590                .pk
591                .iter()
592                .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
593                .collect_vec();
594
595            assert_eq!(pk_data_types.len(), pk_order_types.len());
596            let pk_serde = OrderedRowSerde::new(pk_data_types, pk_order_types);
597            let watermark_col_serde = pk_serde.index(clean_watermark_index_in_pk).into_owned();
598            Some((pk_serde, watermark_col_serde, clean_watermark_index_in_pk))
599        }
600    }
601}
602
603fn build_value_watermark_col_serde(table_catalog: &Table) -> Option<ValueWatermarkColumnSerde> {
604    /// Returns the column index of non-pk watermark column.
605    pub fn try_get_non_pk_clean_watermark_column_index(table: &Table) -> Option<usize> {
606        table
607            .get_clean_watermark_column_indices()
608            .iter()
609            .filter_map(|&col_idx| {
610                if table
611                    .pk
612                    .iter()
613                    .any(|col_order| col_order.column_index == col_idx)
614                {
615                    return None;
616                }
617                Some(col_idx as usize)
618            })
619            .at_most_one()
620            .unwrap()
621    }
622
623    let clean_watermark_index = try_get_non_pk_clean_watermark_column_index(table_catalog)?;
624    Some(ValueWatermarkColumnSerde::new(
625        table_catalog,
626        clean_watermark_index,
627    ))
628}
629
630pub struct ValueWatermarkColumnSerde {
631    /// For `ColumnAwareSerde`, only 1 column is deserialized.
632    row_serde: EitherSerde,
633    /// For `ColumnAwareSerde`, index have been rewritten to 0.
634    watermark_index_in_de_row: usize,
635    watermark_column_mem_encoding_order: OrderType,
636}
637
638pub type ValueWatermarkColumnSerdeRef = Arc<ValueWatermarkColumnSerde>;
639
640impl ValueWatermarkColumnSerde {
641    fn new(table_catalog: &Table, clean_watermark_index: usize) -> Self {
642        let table_columns: Vec<ColumnDesc> = table_catalog
643            .columns
644            .iter()
645            .map(|col| col.column_desc.as_ref().unwrap().into())
646            .collect();
647        let pk_order_type = table_catalog
648            .pk
649            .iter()
650            .filter_map(|col_order| {
651                if col_order.column_index as usize == clean_watermark_index {
652                    return Some(OrderType::from_protobuf(
653                        col_order.get_order_type().unwrap(),
654                    ));
655                }
656                None
657            })
658            .at_most_one()
659            .unwrap();
660        // Correctness requires the assumption that a table contains at most one watermark column.
661        let watermark_column_mem_encoding_order = match pk_order_type {
662            Some(o) => o,
663            // Correctness requires the assumption that the order is the same as the one used in StateTable when serializing watermark for value column.
664            None => OrderType::ascending(),
665        };
666        // Correctness requires the assumption that the watermark column is stored in the value. (See comment on Table::value_indices.)
667        let Some(watermark_index_in_value_indices) = table_catalog
668            .value_indices
669            .iter()
670            .position(|p| *p as usize == clean_watermark_index)
671        else {
672            panic!(
673                "Watermark index {} not found in value_indices {:?}.",
674                clean_watermark_index, table_catalog.value_indices
675            );
676        };
677        let (row_serde, watermark_index_in_de_row) = if table_catalog.version.is_none() {
678            let row_serde = BasicSerde::new(
679                Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
680                Arc::from(table_columns.into_boxed_slice()),
681            )
682            .into();
683            (row_serde, watermark_index_in_value_indices)
684        } else {
685            let row_serde = ColumnAwareSerde::new(
686                Arc::from_iter(iter::once(clean_watermark_index)),
687                Arc::from(table_columns.into_boxed_slice()),
688            )
689            .into();
690            // Only one column.
691            (row_serde, 0)
692        };
693        Self {
694            row_serde,
695            watermark_index_in_de_row,
696            watermark_column_mem_encoding_order,
697        }
698    }
699
700    pub fn deserialize(&self, encoded_bytes: &[u8]) -> HummockResult<Option<Vec<u8>>> {
701        let mut row = self
702            .row_serde
703            .deserialize(encoded_bytes)
704            .map_err(HummockError::decode_error)?;
705        if self.watermark_index_in_de_row >= row.len() {
706            // The watermark column has been dropped in column aware row encoding.
707            return Ok(None);
708        }
709        let datum = std::mem::take(&mut row[self.watermark_index_in_de_row]);
710        // Correctness requires on the assumption that the watermark is serialized using memcmp_encoding in StateTable.
711        let bytes = memcmp_encoding::encode_value(datum, self.watermark_column_mem_encoding_order)
712            .map_err(HummockError::encode_error)?;
713        Ok(Some(bytes.into()))
714    }
715}
716
717#[cfg(test)]
718mod tests {
719    use std::mem;
720
721    use bytes::{BufMut, BytesMut};
722    use itertools::Itertools;
723    use risingwave_common::catalog::ColumnDesc;
724    use risingwave_common::hash::VirtualNode;
725    use risingwave_common::row::OwnedRow;
726    use risingwave_common::types::DataType;
727    use risingwave_common::types::ScalarImpl::{self};
728    use risingwave_common::util::row_serde::OrderedRowSerde;
729    use risingwave_common::util::sort_util::OrderType;
730    use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN;
731    use risingwave_pb::catalog::table::{PbEngine, TableType};
732    use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable};
733    use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType};
734    use risingwave_pb::plan_common::PbColumnCatalog;
735
736    use super::{DummyFilterKeyExtractor, FilterKeyExtractor, SchemaFilterKeyExtractor};
737    use crate::compaction_catalog_manager::{
738        FilterKeyExtractorImpl, FullKeyFilterKeyExtractor, MultiFilterKeyExtractor,
739    };
740    const fn dummy_vnode() -> [u8; VirtualNode::SIZE] {
741        VirtualNode::from_index(233).to_be_bytes()
742    }
743
744    #[test]
745    fn test_default_filter_key_extractor() {
746        let dummy_filter_key_extractor = DummyFilterKeyExtractor;
747        let full_key = "full_key".as_bytes();
748        let output_key = dummy_filter_key_extractor.extract(full_key);
749
750        assert_eq!("".as_bytes(), output_key);
751
752        let full_key_filter_key_extractor = FullKeyFilterKeyExtractor;
753        let output_key = full_key_filter_key_extractor.extract(full_key);
754
755        assert_eq!(full_key, output_key);
756    }
757
758    fn build_table_with_prefix_column_num(column_count: u32) -> PbTable {
759        PbTable {
760            id: 0.into(),
761            schema_id: 0.into(),
762            database_id: 0.into(),
763            name: "test".to_owned(),
764            table_type: TableType::Table as i32,
765            columns: vec![
766                PbColumnCatalog {
767                    column_desc: Some(
768                        (&ColumnDesc::named("_row_id", 0.into(), DataType::Int64)).into(),
769                    ),
770                    is_hidden: true,
771                },
772                PbColumnCatalog {
773                    column_desc: Some(
774                        (&ColumnDesc::named("col_1", 0.into(), DataType::Int64)).into(),
775                    ),
776                    is_hidden: false,
777                },
778                PbColumnCatalog {
779                    column_desc: Some(
780                        (&ColumnDesc::named("col_2", 0.into(), DataType::Float64)).into(),
781                    ),
782                    is_hidden: false,
783                },
784                PbColumnCatalog {
785                    column_desc: Some(
786                        (&ColumnDesc::named("col_3", 0.into(), DataType::Varchar)).into(),
787                    ),
788                    is_hidden: false,
789                },
790            ],
791            pk: vec![
792                PbColumnOrder {
793                    column_index: 1,
794                    order_type: Some(PbOrderType {
795                        direction: PbDirection::Ascending as _,
796                        nulls_are: PbNullsAre::Largest as _,
797                    }),
798                },
799                PbColumnOrder {
800                    column_index: 3,
801                    order_type: Some(PbOrderType {
802                        direction: PbDirection::Ascending as _,
803                        nulls_are: PbNullsAre::Largest as _,
804                    }),
805                },
806            ],
807            stream_key: vec![0],
808            distribution_key: (0..column_count as i32).collect_vec(),
809            optional_associated_source_id: None,
810            append_only: false,
811            owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
812            retention_seconds: Some(300),
813            fragment_id: 0.into(),
814            dml_fragment_id: None,
815            initialized_at_epoch: None,
816            vnode_col_index: None,
817            row_id_index: Some(0),
818            value_indices: vec![0],
819            definition: "".into(),
820            handle_pk_conflict_behavior: 0,
821            version_column_indices: vec![],
822            read_prefix_len_hint: 1,
823            version: None,
824            watermark_indices: vec![],
825            dist_key_in_pk: vec![],
826            cardinality: None,
827            created_at_epoch: None,
828            #[expect(deprecated)]
829            cleaned_by_watermark: false,
830            stream_job_status: PbStreamJobStatus::Created.into(),
831            create_type: PbCreateType::Foreground.into(),
832            description: None,
833            #[expect(deprecated)]
834            incoming_sinks: Default::default(),
835            initialized_at_cluster_version: None,
836            created_at_cluster_version: None,
837            cdc_table_id: None,
838            maybe_vnode_count: None,
839            webhook_info: None,
840            job_id: None,
841            engine: Some(PbEngine::Hummock as i32),
842            #[expect(deprecated)]
843            clean_watermark_index_in_pk: None,
844            clean_watermark_indices: vec![],
845            refreshable: false,
846            vector_index_info: None,
847            cdc_table_type: None,
848        }
849    }
850
851    #[test]
852    fn test_schema_filter_key_extractor() {
853        let prost_table = build_table_with_prefix_column_num(1);
854        let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
855
856        let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
857        let schema = vec![DataType::Int64, DataType::Varchar];
858        let serializer = OrderedRowSerde::new(schema, order_types);
859        let row = OwnedRow::new(vec![
860            Some(ScalarImpl::Int64(100)),
861            Some(ScalarImpl::Utf8("abc".into())),
862        ]);
863        let mut row_bytes = vec![];
864        serializer.serialize(&row, &mut row_bytes);
865
866        let table_prefix = {
867            let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
868            buf.put_u32(1);
869            buf.to_vec()
870        };
871
872        let vnode_prefix = &dummy_vnode()[..];
873
874        let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
875        let output_key = schema_filter_key_extractor.extract(&full_key);
876        assert_eq!(1 + mem::size_of::<i64>(), output_key.len());
877    }
878
879    #[test]
880    fn test_multi_filter_key_extractor() {
881        let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
882        {
883            // test table_id 1
884            let prost_table = build_table_with_prefix_column_num(1);
885            let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
886            multi_filter_key_extractor.register(
887                1.into(),
888                FilterKeyExtractorImpl::Schema(schema_filter_key_extractor),
889            );
890            let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
891            let schema = vec![DataType::Int64, DataType::Varchar];
892            let serializer = OrderedRowSerde::new(schema, order_types);
893            let row = OwnedRow::new(vec![
894                Some(ScalarImpl::Int64(100)),
895                Some(ScalarImpl::Utf8("abc".into())),
896            ]);
897            let mut row_bytes = vec![];
898            serializer.serialize(&row, &mut row_bytes);
899
900            let table_prefix = {
901                let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
902                buf.put_u32(1);
903                buf.to_vec()
904            };
905
906            let vnode_prefix = &dummy_vnode()[..];
907
908            let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
909            let output_key = multi_filter_key_extractor.extract(&full_key);
910
911            let data_types = vec![DataType::Int64];
912            let order_types = vec![OrderType::ascending()];
913            let deserializer = OrderedRowSerde::new(data_types, order_types);
914
915            let pk_prefix_len = deserializer.deserialize_prefix_len(&row_bytes, 1).unwrap();
916            assert_eq!(pk_prefix_len, output_key.len());
917        }
918
919        {
920            // test table_id 1
921            let prost_table = build_table_with_prefix_column_num(2);
922            let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
923            multi_filter_key_extractor.register(
924                2.into(),
925                FilterKeyExtractorImpl::Schema(schema_filter_key_extractor),
926            );
927            let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
928            let schema = vec![DataType::Int64, DataType::Varchar];
929            let serializer = OrderedRowSerde::new(schema, order_types);
930            let row = OwnedRow::new(vec![
931                Some(ScalarImpl::Int64(100)),
932                Some(ScalarImpl::Utf8("abc".into())),
933            ]);
934            let mut row_bytes = vec![];
935            serializer.serialize(&row, &mut row_bytes);
936
937            let table_prefix = {
938                let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
939                buf.put_u32(2);
940                buf.to_vec()
941            };
942
943            let vnode_prefix = &dummy_vnode()[..];
944
945            let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
946            let output_key = multi_filter_key_extractor.extract(&full_key);
947
948            let data_types = vec![DataType::Int64, DataType::Varchar];
949            let order_types = vec![OrderType::ascending(), OrderType::ascending()];
950            let deserializer = OrderedRowSerde::new(data_types, order_types);
951
952            let pk_prefix_len = deserializer.deserialize_prefix_len(&row_bytes, 1).unwrap();
953
954            assert_eq!(pk_prefix_len, output_key.len());
955        }
956    }
957
958    #[tokio::test]
959    async fn test_compaction_catalog_manager_exception() {
960        let compaction_catalog_manager = super::CompactionCatalogManager::default();
961
962        {
963            let ret = compaction_catalog_manager.acquire(vec![]).await;
964            assert!(ret.is_err());
965            if let Err(e) = ret {
966                assert_eq!(e.to_string(), "Other error: table_id_set is empty");
967            }
968        }
969
970        {
971            // network error with FakeRemoteTableAccessor
972            let ret = compaction_catalog_manager.acquire(vec![1.into()]).await;
973            assert!(ret.is_err());
974            if let Err(e) = ret {
975                assert_eq!(
976                    e.to_string(),
977                    "Other error: request rpc list_tables for meta failed: fake accessor does not support fetch remote table"
978                );
979            }
980        }
981    }
982}