Skip to main content

risingwave_storage/
compaction_catalog_manager.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::iter;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RwLock;
22use risingwave_common::catalog::{ColumnDesc, TableId};
23use risingwave_common::hash::{VirtualNode, VnodeCountCompat};
24use risingwave_common::util::memcmp_encoding;
25use risingwave_common::util::row_serde::OrderedRowSerde;
26use risingwave_common::util::sort_util::OrderType;
27use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
28use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
29use risingwave_hummock_sdk::compaction_group::StateTableId;
30use risingwave_hummock_sdk::key::{TABLE_PREFIX_LEN, get_table_id};
31use risingwave_pb::catalog::Table;
32use risingwave_rpc_client::MetaClient;
33use risingwave_rpc_client::error::{Result as RpcResult, RpcError};
34use thiserror_ext::AsReport;
35
36use crate::hummock::{HummockError, HummockResult};
37use crate::row_serde::value_serde::ValueRowSerdeNew;
38
39/// `FilterKeyExtractor` is generally used to extract keys stored in SST filters.
40pub trait FilterKeyExtractor: Send + Sync {
41    fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8];
42}
43
44pub enum FilterKeyExtractorImpl {
45    Schema(SchemaFilterKeyExtractor),
46    FullKey(FullKeyFilterKeyExtractor),
47    Dummy(DummyFilterKeyExtractor),
48    Multi(MultiFilterKeyExtractor),
49    FixedLength(FixedLengthFilterKeyExtractor),
50}
51
52impl FilterKeyExtractorImpl {
53    pub fn from_table(table_catalog: &Table) -> Self {
54        let read_prefix_len = table_catalog.get_read_prefix_len_hint() as usize;
55
56        if read_prefix_len == 0 {
57            FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor)
58        } else if read_prefix_len > table_catalog.get_pk().len() {
59            // for now frontend had not infer the table_id_to_filter_key_extractor, so we
60            // use FullKeyFilterKeyExtractor
61            FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)
62        } else {
63            FilterKeyExtractorImpl::Schema(SchemaFilterKeyExtractor::new(table_catalog))
64        }
65    }
66}
67
68macro_rules! impl_filter_key_extractor {
69    ($( { $variant_name:ident } ),*) => {
70        impl FilterKeyExtractorImpl {
71            pub fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8]{
72                match self {
73                    $( Self::$variant_name(inner) => inner.extract(user_key), )*
74                }
75            }
76        }
77    }
78
79}
80
81macro_rules! for_all_filter_key_extractor_variants {
82    ($macro:ident) => {
83        $macro! {
84            { Schema },
85            { FullKey },
86            { Dummy },
87            { Multi },
88            { FixedLength }
89        }
90    };
91}
92
93for_all_filter_key_extractor_variants! { impl_filter_key_extractor }
94
95#[derive(Default)]
96pub struct FullKeyFilterKeyExtractor;
97
98impl FilterKeyExtractor for FullKeyFilterKeyExtractor {
99    fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
100        user_key
101    }
102}
103
104#[derive(Default)]
105pub struct DummyFilterKeyExtractor;
106impl FilterKeyExtractor for DummyFilterKeyExtractor {
107    fn extract<'a>(&self, _user_key: &'a [u8]) -> &'a [u8] {
108        &[]
109    }
110}
111
112/// [`SchemaFilterKeyExtractor`] build from `table_catalog` and extract a `user_key` to prefix for
113#[derive(Default)]
114pub struct FixedLengthFilterKeyExtractor {
115    fixed_length: usize,
116}
117
118impl FilterKeyExtractor for FixedLengthFilterKeyExtractor {
119    fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
120        &user_key[0..self.fixed_length]
121    }
122}
123
124impl FixedLengthFilterKeyExtractor {
125    pub fn new(fixed_length: usize) -> Self {
126        Self { fixed_length }
127    }
128}
129
130/// [`SchemaFilterKeyExtractor`] builds from `table_catalog` and transforms a `user_key` to a prefix
131/// for the SST filter.
132pub struct SchemaFilterKeyExtractor {
133    /// Each stateful operator has its own read pattern, partly using prefix scan.
134    /// Prefix key length can be decoded through its `DataType` and `OrderType` which obtained from
135    /// `TableCatalog`. `read_pattern_prefix_column` means the count of column to decode prefix
136    /// from storage key.
137    read_prefix_len: usize,
138    deserializer: OrderedRowSerde,
139    // TODO:need some bench test for same prefix case like join (if we need a prefix_cache for same
140    // prefix_key)
141}
142
143impl FilterKeyExtractor for SchemaFilterKeyExtractor {
144    fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
145        if user_key.len() < TABLE_PREFIX_LEN + VirtualNode::SIZE {
146            return &[];
147        }
148
149        let (_table_prefix, key) = user_key.split_at(TABLE_PREFIX_LEN);
150        let (_vnode_prefix, pk) = key.split_at(VirtualNode::SIZE);
151
152        // if the key with table_id deserializer fail from schema, that should panic here for early
153        // detection.
154
155        let filter_key_len = self
156            .deserializer
157            .deserialize_prefix_len(pk, self.read_prefix_len)
158            .unwrap();
159
160        let end_position = TABLE_PREFIX_LEN + VirtualNode::SIZE + filter_key_len;
161        &user_key[TABLE_PREFIX_LEN + VirtualNode::SIZE..end_position]
162    }
163}
164
165impl SchemaFilterKeyExtractor {
166    pub fn new(table_catalog: &Table) -> Self {
167        let pk_indices: Vec<usize> = table_catalog
168            .pk
169            .iter()
170            .map(|col_order| col_order.column_index as usize)
171            .collect();
172
173        let read_prefix_len = table_catalog.get_read_prefix_len_hint() as usize;
174
175        let data_types = pk_indices
176            .iter()
177            .map(|column_idx| &table_catalog.columns[*column_idx])
178            .map(|col| ColumnDesc::from(col.column_desc.as_ref().unwrap()).data_type)
179            .collect();
180
181        let order_types: Vec<OrderType> = table_catalog
182            .pk
183            .iter()
184            .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
185            .collect();
186
187        Self {
188            read_prefix_len,
189            deserializer: OrderedRowSerde::new(data_types, order_types),
190        }
191    }
192}
193
194#[derive(Default)]
195pub struct MultiFilterKeyExtractor {
196    id_to_filter_key_extractor: HashMap<TableId, FilterKeyExtractorImpl>,
197}
198
199impl MultiFilterKeyExtractor {
200    pub fn register(&mut self, table_id: TableId, filter_key_extractor: FilterKeyExtractorImpl) {
201        self.id_to_filter_key_extractor
202            .insert(table_id, filter_key_extractor);
203    }
204
205    pub fn size(&self) -> usize {
206        self.id_to_filter_key_extractor.len()
207    }
208
209    pub fn get_existing_table_ids(&self) -> HashSet<TableId> {
210        self.id_to_filter_key_extractor.keys().cloned().collect()
211    }
212}
213
214impl Debug for MultiFilterKeyExtractor {
215    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
216        write!(f, "MultiFilterKeyExtractor size {} ", self.size())
217    }
218}
219
220impl FilterKeyExtractor for MultiFilterKeyExtractor {
221    fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
222        if user_key.len() < TABLE_PREFIX_LEN + VirtualNode::SIZE {
223            return user_key;
224        }
225
226        let table_id = get_table_id(user_key);
227        self.id_to_filter_key_extractor
228            .get(&table_id)
229            .unwrap()
230            .extract(user_key)
231    }
232}
233
234#[async_trait::async_trait]
235pub trait StateTableAccessor: Send + Sync {
236    async fn get_tables(&self, table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>>;
237}
238
239#[derive(Default)]
240pub struct FakeRemoteTableAccessor {}
241
242#[derive(Default)]
243struct PreloadedOnlyTableAccessor {}
244
245pub struct RemoteTableAccessor {
246    meta_client: MetaClient,
247}
248
249impl RemoteTableAccessor {
250    pub fn new(meta_client: MetaClient) -> Self {
251        Self { meta_client }
252    }
253}
254
255#[async_trait::async_trait]
256impl StateTableAccessor for RemoteTableAccessor {
257    async fn get_tables(&self, table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>> {
258        self.meta_client.get_tables(table_ids.to_vec(), true).await
259    }
260}
261
262#[async_trait::async_trait]
263impl StateTableAccessor for FakeRemoteTableAccessor {
264    async fn get_tables(&self, _table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>> {
265        Err(RpcError::Internal(anyhow::anyhow!(
266            "fake accessor does not support fetch remote table"
267        )))
268    }
269}
270
271#[async_trait::async_trait]
272impl StateTableAccessor for PreloadedOnlyTableAccessor {
273    async fn get_tables(&self, _table_ids: &[TableId]) -> RpcResult<HashMap<TableId, Table>> {
274        Ok(HashMap::new())
275    }
276}
277
278/// `CompactionCatalogManager` is a manager to manage all `Table` which used in compaction
279pub struct CompactionCatalogManager {
280    // `table_id_to_catalog` is a map to store all `Table` which used in compaction
281    table_id_to_catalog: RwLock<HashMap<StateTableId, Table>>,
282    // `table_accessor` is a accessor to fetch `Table` from meta when the table not found
283    table_accessor: Box<dyn StateTableAccessor>,
284}
285
286impl Default for CompactionCatalogManager {
287    fn default() -> Self {
288        Self::new(Box::<FakeRemoteTableAccessor>::default())
289    }
290}
291
292impl CompactionCatalogManager {
293    pub fn new(table_accessor: Box<dyn StateTableAccessor>) -> Self {
294        Self {
295            table_id_to_catalog: Default::default(),
296            table_accessor,
297        }
298    }
299
300    /// Creates a catalog manager backed only by the given table catalogs.
301    /// Missing tables are not fetched remotely and remain missing, so callers can reject
302    /// partial agents explicitly.
303    pub fn new_preloaded(table_id_to_catalog: HashMap<StateTableId, Table>) -> Self {
304        Self {
305            table_id_to_catalog: RwLock::new(table_id_to_catalog),
306            table_accessor: Box::<PreloadedOnlyTableAccessor>::default(),
307        }
308    }
309}
310
311impl CompactionCatalogManager {
312    /// `update` is used to update `Table` in `table_id_to_catalog` from notification
313    pub fn update(&self, table_id: TableId, catalog: Table) {
314        self.table_id_to_catalog.write().insert(table_id, catalog);
315    }
316
317    /// `sync` is used to sync all `Table` in `table_id_to_catalog` from notification whole snapshot
318    pub fn sync(&self, catalog_map: HashMap<TableId, Table>) {
319        let mut guard = self.table_id_to_catalog.write();
320        guard.clear();
321        guard.extend(catalog_map);
322    }
323
324    /// `remove` is used to remove `Table` in `table_id_to_catalog` by `table_id`
325    pub fn remove(&self, table_id: TableId) {
326        self.table_id_to_catalog.write().remove(&table_id);
327    }
328
329    /// `acquire` is used to acquire `CompactionCatalogAgent` by `table_ids`
330    /// if the table not found in `table_id_to_catalog`, it will fetch from meta
331    pub async fn acquire(
332        &self,
333        mut table_ids: Vec<StateTableId>,
334    ) -> HummockResult<CompactionCatalogAgentRef> {
335        if table_ids.is_empty() {
336            // table_id_set is empty
337            // the table in sst has been deleted
338
339            // use full key as default
340            return Err(HummockError::other("table_id_set is empty"));
341        }
342
343        let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
344        let mut table_id_to_vnode = HashMap::new();
345        let mut table_id_to_watermark_serde = HashMap::new();
346        let mut table_id_to_value_watermark_serde = HashMap::new();
347
348        {
349            let guard = self.table_id_to_catalog.read();
350            table_ids.retain(|table_id| match guard.get(table_id) {
351                Some(table_catalog) => {
352                    // filter-key-extractor
353                    multi_filter_key_extractor
354                        .register(*table_id, FilterKeyExtractorImpl::from_table(table_catalog));
355
356                    // vnode
357                    table_id_to_vnode.insert(*table_id, table_catalog.vnode_count());
358
359                    // watermark
360                    table_id_to_watermark_serde
361                        .insert(*table_id, build_watermark_col_serde(table_catalog));
362                    table_id_to_value_watermark_serde.insert(
363                        *table_id,
364                        build_value_watermark_col_serde(table_catalog).map(Arc::new),
365                    );
366
367                    false
368                }
369
370                None => true,
371            });
372        }
373
374        if !table_ids.is_empty() {
375            let mut state_tables =
376                self.table_accessor
377                    .get_tables(&table_ids)
378                    .await
379                    .map_err(|e| {
380                        HummockError::other(format!(
381                            "request rpc list_tables for meta failed: {}",
382                            e.as_report()
383                        ))
384                    })?;
385
386            let mut guard = self.table_id_to_catalog.write();
387            for table_id in table_ids {
388                if let Some(table) = state_tables.remove(&table_id) {
389                    let table_id = table.id;
390                    let key_extractor = FilterKeyExtractorImpl::from_table(&table);
391                    let vnode = table.vnode_count();
392                    let watermark_serde = build_watermark_col_serde(&table);
393                    let value_watermark_serde = build_value_watermark_col_serde(&table);
394                    guard.insert(table_id, table);
395                    // filter-key-extractor
396                    multi_filter_key_extractor.register(table_id, key_extractor);
397
398                    // vnode
399                    table_id_to_vnode.insert(table_id, vnode);
400
401                    // watermark
402                    table_id_to_watermark_serde.insert(table_id, watermark_serde);
403                    table_id_to_value_watermark_serde
404                        .insert(table_id, value_watermark_serde.map(Arc::new));
405                }
406            }
407        }
408
409        Ok(Arc::new(CompactionCatalogAgent::new(
410            FilterKeyExtractorImpl::Multi(multi_filter_key_extractor),
411            table_id_to_vnode,
412            table_id_to_watermark_serde,
413            table_id_to_value_watermark_serde,
414        )))
415    }
416}
417
418/// `CompactionCatalogAgent` is a wrapper of `filter_key_extractor_manager` and `table_id_to_vnode`
419/// The `CompactionCatalogAgent` belongs to a compaction task call, which we will build from the `table_ids` contained in a compact task and use it during the compaction.
420/// The `CompactionCatalogAgent` can act as a agent for the `CompactionCatalogManager`, providing `extract` and `vnode_count` capabilities.
421pub struct CompactionCatalogAgent {
422    filter_key_extractor_manager: FilterKeyExtractorImpl,
423    table_id_to_vnode: HashMap<StateTableId, usize>,
424    // table_id ->(pk_prefix_serde, clean_watermark_col_serde, watermark_col_idx)
425    // cache for reduce serde build
426    table_id_to_watermark_serde:
427        HashMap<StateTableId, Option<(OrderedRowSerde, OrderedRowSerde, usize)>>,
428    value_table_id_to_watermark_serde: HashMap<StateTableId, Option<ValueWatermarkColumnSerdeRef>>,
429}
430
431impl CompactionCatalogAgent {
432    pub fn new(
433        filter_key_extractor_manager: FilterKeyExtractorImpl,
434        table_id_to_vnode: HashMap<StateTableId, usize>,
435        table_id_to_watermark_serde: HashMap<
436            StateTableId,
437            Option<(OrderedRowSerde, OrderedRowSerde, usize)>,
438        >,
439        value_table_id_to_watermark_serde: HashMap<
440            StateTableId,
441            Option<ValueWatermarkColumnSerdeRef>,
442        >,
443    ) -> Self {
444        Self {
445            filter_key_extractor_manager,
446            table_id_to_vnode,
447            table_id_to_watermark_serde,
448            value_table_id_to_watermark_serde,
449        }
450    }
451
452    pub fn dummy() -> Self {
453        Self {
454            filter_key_extractor_manager: FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor),
455            table_id_to_vnode: Default::default(),
456            table_id_to_watermark_serde: Default::default(),
457            value_table_id_to_watermark_serde: Default::default(),
458        }
459    }
460
461    pub fn for_test(table_ids: Vec<impl Into<StateTableId>>) -> Arc<Self> {
462        let full_key_filter_key_extractor =
463            FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor);
464
465        let table_id_to_vnode: HashMap<TableId, usize> = table_ids
466            .into_iter()
467            .map(|table_id| (table_id.into(), VirtualNode::COUNT_FOR_TEST))
468            .collect();
469
470        let table_id_to_watermark_serde = table_id_to_vnode
471            .keys()
472            .map(|table_id| (*table_id, None))
473            .collect();
474
475        let value_table_id_to_watermark_serde = table_id_to_vnode
476            .keys()
477            .map(|table_id| (*table_id, None))
478            .collect();
479
480        Arc::new(CompactionCatalogAgent::new(
481            full_key_filter_key_extractor,
482            table_id_to_vnode,
483            table_id_to_watermark_serde,
484            value_table_id_to_watermark_serde,
485        ))
486    }
487}
488
489impl CompactionCatalogAgent {
490    pub fn extract<'a>(&self, user_key: &'a [u8]) -> &'a [u8] {
491        self.filter_key_extractor_manager.extract(user_key)
492    }
493
494    pub fn vnode_count(&self, table_id: StateTableId) -> usize {
495        *self.table_id_to_vnode.get(&table_id).unwrap_or_else(|| {
496            panic!(
497                "table_id not found {} all_table_ids {:?}",
498                table_id,
499                self.table_id_to_vnode.keys()
500            )
501        })
502    }
503
504    pub fn watermark_serde(
505        &self,
506        table_id: StateTableId,
507    ) -> Option<(OrderedRowSerde, OrderedRowSerde, usize)> {
508        self.table_id_to_watermark_serde
509            .get(&table_id)
510            .unwrap_or_else(|| {
511                panic!(
512                    "table_id not found {} all_table_ids {:?}",
513                    table_id,
514                    self.table_id_to_watermark_serde.keys()
515                )
516            })
517            .clone()
518    }
519
520    pub fn value_watermark_serde(
521        &self,
522        table_id: StateTableId,
523    ) -> Option<ValueWatermarkColumnSerdeRef> {
524        self.value_table_id_to_watermark_serde
525            .get(&table_id)
526            .unwrap_or_else(|| {
527                panic!(
528                    "table_id not found {} all_table_ids {:?}",
529                    table_id,
530                    self.value_table_id_to_watermark_serde.keys()
531                )
532            })
533            .clone()
534    }
535
536    pub fn table_id_to_vnode_ref(&self) -> &HashMap<StateTableId, usize> {
537        &self.table_id_to_vnode
538    }
539
540    pub fn table_ids(&self) -> impl Iterator<Item = StateTableId> + '_ {
541        self.table_id_to_vnode.keys().cloned()
542    }
543}
544
545pub type CompactionCatalogManagerRef = Arc<CompactionCatalogManager>;
546pub type CompactionCatalogAgentRef = Arc<CompactionCatalogAgent>;
547
548fn build_watermark_col_serde(
549    table_catalog: &Table,
550) -> Option<(OrderedRowSerde, OrderedRowSerde, usize)> {
551    // Get clean watermark PK index using the helper method
552    let clean_watermark_index_in_pk = table_catalog.get_clean_watermark_index_in_pk_compat();
553
554    match clean_watermark_index_in_pk {
555        None => {
556            // non watermark table or watermark column is the first column (pk_prefix_watermark)
557            // TODO(ttl): if the watermark column is in the value, we may also get a `None` here, support it.
558            None
559        }
560
561        Some(clean_watermark_index_in_pk) => {
562            use risingwave_common::types::DataType;
563            let table_columns: Vec<ColumnDesc> = table_catalog
564                .columns
565                .iter()
566                .map(|col| col.column_desc.as_ref().unwrap().into())
567                .collect();
568
569            let pk_data_types: Vec<DataType> = table_catalog
570                .pk
571                .iter()
572                .map(|col_order| {
573                    table_columns[col_order.column_index as usize]
574                        .data_type
575                        .clone()
576                })
577                .collect();
578
579            let pk_order_types = table_catalog
580                .pk
581                .iter()
582                .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
583                .collect_vec();
584
585            assert_eq!(pk_data_types.len(), pk_order_types.len());
586            let pk_serde = OrderedRowSerde::new(pk_data_types, pk_order_types);
587            let watermark_col_serde = pk_serde.index(clean_watermark_index_in_pk).into_owned();
588            Some((pk_serde, watermark_col_serde, clean_watermark_index_in_pk))
589        }
590    }
591}
592
593fn build_value_watermark_col_serde(table_catalog: &Table) -> Option<ValueWatermarkColumnSerde> {
594    /// Returns the column index of non-pk watermark column.
595    pub fn try_get_non_pk_clean_watermark_column_index(table: &Table) -> Option<usize> {
596        table
597            .get_clean_watermark_column_indices()
598            .iter()
599            .filter_map(|&col_idx| {
600                if table
601                    .pk
602                    .iter()
603                    .any(|col_order| col_order.column_index == col_idx)
604                {
605                    return None;
606                }
607                Some(col_idx as usize)
608            })
609            .at_most_one()
610            .unwrap()
611    }
612
613    let clean_watermark_index = try_get_non_pk_clean_watermark_column_index(table_catalog)?;
614    Some(ValueWatermarkColumnSerde::new(
615        table_catalog,
616        clean_watermark_index,
617    ))
618}
619
620pub struct ValueWatermarkColumnSerde {
621    /// For `ColumnAwareSerde`, only 1 column is deserialized.
622    row_serde: EitherSerde,
623    /// For `ColumnAwareSerde`, index have been rewritten to 0.
624    watermark_index_in_de_row: usize,
625    watermark_column_mem_encoding_order: OrderType,
626}
627
628pub type ValueWatermarkColumnSerdeRef = Arc<ValueWatermarkColumnSerde>;
629
630impl ValueWatermarkColumnSerde {
631    fn new(table_catalog: &Table, clean_watermark_index: usize) -> Self {
632        let table_columns: Vec<ColumnDesc> = table_catalog
633            .columns
634            .iter()
635            .map(|col| col.column_desc.as_ref().unwrap().into())
636            .collect();
637        let pk_order_type = table_catalog
638            .pk
639            .iter()
640            .filter_map(|col_order| {
641                if col_order.column_index as usize == clean_watermark_index {
642                    return Some(OrderType::from_protobuf(
643                        col_order.get_order_type().unwrap(),
644                    ));
645                }
646                None
647            })
648            .at_most_one()
649            .unwrap();
650        // Correctness requires the assumption that a table contains at most one watermark column.
651        let watermark_column_mem_encoding_order = match pk_order_type {
652            Some(o) => o,
653            // Correctness requires the assumption that the order is the same as the one used in StateTable when serializing watermark for value column.
654            None => OrderType::ascending(),
655        };
656        // Correctness requires the assumption that the watermark column is stored in the value. (See comment on Table::value_indices.)
657        let Some(watermark_index_in_value_indices) = table_catalog
658            .value_indices
659            .iter()
660            .position(|p| *p as usize == clean_watermark_index)
661        else {
662            panic!(
663                "Watermark index {} not found in value_indices {:?}.",
664                clean_watermark_index, table_catalog.value_indices
665            );
666        };
667        let (row_serde, watermark_index_in_de_row) = if table_catalog.version.is_none() {
668            let row_serde = BasicSerde::new(
669                Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
670                Arc::from(table_columns.into_boxed_slice()),
671            )
672            .into();
673            (row_serde, watermark_index_in_value_indices)
674        } else {
675            let row_serde = ColumnAwareSerde::new(
676                Arc::from_iter(iter::once(clean_watermark_index)),
677                Arc::from(table_columns.into_boxed_slice()),
678            )
679            .into();
680            // Only one column.
681            (row_serde, 0)
682        };
683        Self {
684            row_serde,
685            watermark_index_in_de_row,
686            watermark_column_mem_encoding_order,
687        }
688    }
689
690    pub fn deserialize(&self, encoded_bytes: &[u8]) -> HummockResult<Option<Vec<u8>>> {
691        let mut row = self
692            .row_serde
693            .deserialize(encoded_bytes)
694            .map_err(HummockError::decode_error)?;
695        if self.watermark_index_in_de_row >= row.len() {
696            // The watermark column has been dropped in column aware row encoding.
697            return Ok(None);
698        }
699        let datum = std::mem::take(&mut row[self.watermark_index_in_de_row]);
700        // Correctness requires on the assumption that the watermark is serialized using memcmp_encoding in StateTable.
701        let bytes = memcmp_encoding::encode_value(datum, self.watermark_column_mem_encoding_order)
702            .map_err(HummockError::encode_error)?;
703        Ok(Some(bytes.into()))
704    }
705}
706
707#[cfg(test)]
708mod tests {
709    use std::collections::{HashMap, HashSet};
710    use std::mem;
711    use std::sync::Arc;
712
713    use bytes::{BufMut, BytesMut};
714    use itertools::Itertools;
715    use risingwave_common::catalog::ColumnDesc;
716    use risingwave_common::hash::VirtualNode;
717    use risingwave_common::row::OwnedRow;
718    use risingwave_common::types::DataType;
719    use risingwave_common::types::ScalarImpl::{self};
720    use risingwave_common::util::row_serde::OrderedRowSerde;
721    use risingwave_common::util::sort_util::OrderType;
722    use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN;
723    use risingwave_pb::catalog::table::{PbEngine, TableType};
724    use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable};
725    use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType};
726    use risingwave_pb::plan_common::PbColumnCatalog;
727    use thiserror_ext::AsReport;
728
729    use super::{DummyFilterKeyExtractor, FilterKeyExtractor, SchemaFilterKeyExtractor};
730    use crate::compaction_catalog_manager::{
731        FilterKeyExtractorImpl, FullKeyFilterKeyExtractor, MultiFilterKeyExtractor,
732    };
733    const fn dummy_vnode() -> [u8; VirtualNode::SIZE] {
734        VirtualNode::from_index(233).to_be_bytes()
735    }
736
737    #[test]
738    fn test_default_filter_key_extractor() {
739        let dummy_filter_key_extractor = DummyFilterKeyExtractor;
740        let full_key = "full_key".as_bytes();
741        let output_key = dummy_filter_key_extractor.extract(full_key);
742
743        assert_eq!("".as_bytes(), output_key);
744
745        let full_key_filter_key_extractor = FullKeyFilterKeyExtractor;
746        let output_key = full_key_filter_key_extractor.extract(full_key);
747
748        assert_eq!(full_key, output_key);
749    }
750
751    fn build_table_with_prefix_column_num(column_count: u32) -> PbTable {
752        PbTable {
753            id: 0.into(),
754            schema_id: 0.into(),
755            database_id: 0.into(),
756            name: "test".to_owned(),
757            table_type: TableType::Table as i32,
758            columns: vec![
759                PbColumnCatalog {
760                    column_desc: Some(
761                        (&ColumnDesc::named("_row_id", 0.into(), DataType::Int64)).into(),
762                    ),
763                    is_hidden: true,
764                },
765                PbColumnCatalog {
766                    column_desc: Some(
767                        (&ColumnDesc::named("col_1", 0.into(), DataType::Int64)).into(),
768                    ),
769                    is_hidden: false,
770                },
771                PbColumnCatalog {
772                    column_desc: Some(
773                        (&ColumnDesc::named("col_2", 0.into(), DataType::Float64)).into(),
774                    ),
775                    is_hidden: false,
776                },
777                PbColumnCatalog {
778                    column_desc: Some(
779                        (&ColumnDesc::named("col_3", 0.into(), DataType::Varchar)).into(),
780                    ),
781                    is_hidden: false,
782                },
783            ],
784            pk: vec![
785                PbColumnOrder {
786                    column_index: 1,
787                    order_type: Some(PbOrderType {
788                        direction: PbDirection::Ascending as _,
789                        nulls_are: PbNullsAre::Largest as _,
790                    }),
791                },
792                PbColumnOrder {
793                    column_index: 3,
794                    order_type: Some(PbOrderType {
795                        direction: PbDirection::Ascending as _,
796                        nulls_are: PbNullsAre::Largest as _,
797                    }),
798                },
799            ],
800            stream_key: vec![0],
801            distribution_key: (0..column_count as i32).collect_vec(),
802            optional_associated_source_id: None,
803            append_only: false,
804            owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
805            retention_seconds: Some(300),
806            fragment_id: 0.into(),
807            dml_fragment_id: None,
808            initialized_at_epoch: None,
809            vnode_col_index: None,
810            row_id_index: Some(0),
811            value_indices: vec![0],
812            definition: "".into(),
813            handle_pk_conflict_behavior: 0,
814            version_column_indices: vec![],
815            read_prefix_len_hint: 1,
816            version: None,
817            watermark_indices: vec![],
818            dist_key_in_pk: vec![],
819            cardinality: None,
820            created_at_epoch: None,
821            #[expect(deprecated)]
822            cleaned_by_watermark: false,
823            stream_job_status: PbStreamJobStatus::Created.into(),
824            create_type: PbCreateType::Foreground.into(),
825            description: None,
826            #[expect(deprecated)]
827            incoming_sinks: Default::default(),
828            initialized_at_cluster_version: None,
829            created_at_cluster_version: None,
830            cdc_table_id: None,
831            maybe_vnode_count: None,
832            webhook_info: None,
833            job_id: None,
834            engine: Some(PbEngine::Hummock as i32),
835            #[expect(deprecated)]
836            clean_watermark_index_in_pk: None,
837            clean_watermark_indices: vec![],
838            refreshable: false,
839            vector_index_info: None,
840            cdc_table_type: None,
841        }
842    }
843
844    #[test]
845    fn test_zero_read_prefix_len_uses_dummy_extractor() {
846        let mut prost_table = build_table_with_prefix_column_num(1);
847        prost_table.read_prefix_len_hint = 0;
848
849        let extractor = FilterKeyExtractorImpl::from_table(&prost_table);
850        let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
851        let schema = vec![DataType::Int64, DataType::Varchar];
852        let serializer = OrderedRowSerde::new(schema, order_types);
853        let row = OwnedRow::new(vec![
854            Some(ScalarImpl::Int64(100)),
855            Some(ScalarImpl::Utf8("abc".into())),
856        ]);
857        let mut row_bytes = vec![];
858        serializer.serialize(&row, &mut row_bytes);
859
860        let table_prefix = {
861            let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
862            buf.put_u32(1);
863            buf.to_vec()
864        };
865        let vnode_prefix = &dummy_vnode()[..];
866        let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
867
868        assert!(extractor.extract(&full_key).is_empty());
869    }
870
871    #[test]
872    fn test_schema_filter_key_extractor() {
873        let prost_table = build_table_with_prefix_column_num(1);
874        let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
875
876        let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
877        let schema = vec![DataType::Int64, DataType::Varchar];
878        let serializer = OrderedRowSerde::new(schema, order_types);
879        let row = OwnedRow::new(vec![
880            Some(ScalarImpl::Int64(100)),
881            Some(ScalarImpl::Utf8("abc".into())),
882        ]);
883        let mut row_bytes = vec![];
884        serializer.serialize(&row, &mut row_bytes);
885
886        let table_prefix = {
887            let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
888            buf.put_u32(1);
889            buf.to_vec()
890        };
891
892        let vnode_prefix = &dummy_vnode()[..];
893
894        let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
895        let output_key = schema_filter_key_extractor.extract(&full_key);
896        assert_eq!(1 + mem::size_of::<i64>(), output_key.len());
897    }
898
899    #[test]
900    fn test_multi_filter_key_extractor() {
901        let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default();
902        {
903            // test table_id 1
904            let prost_table = build_table_with_prefix_column_num(1);
905            let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
906            multi_filter_key_extractor.register(
907                1.into(),
908                FilterKeyExtractorImpl::Schema(schema_filter_key_extractor),
909            );
910            let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
911            let schema = vec![DataType::Int64, DataType::Varchar];
912            let serializer = OrderedRowSerde::new(schema, order_types);
913            let row = OwnedRow::new(vec![
914                Some(ScalarImpl::Int64(100)),
915                Some(ScalarImpl::Utf8("abc".into())),
916            ]);
917            let mut row_bytes = vec![];
918            serializer.serialize(&row, &mut row_bytes);
919
920            let table_prefix = {
921                let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
922                buf.put_u32(1);
923                buf.to_vec()
924            };
925
926            let vnode_prefix = &dummy_vnode()[..];
927
928            let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
929            let output_key = multi_filter_key_extractor.extract(&full_key);
930
931            let data_types = vec![DataType::Int64];
932            let order_types = vec![OrderType::ascending()];
933            let deserializer = OrderedRowSerde::new(data_types, order_types);
934
935            let pk_prefix_len = deserializer.deserialize_prefix_len(&row_bytes, 1).unwrap();
936            assert_eq!(pk_prefix_len, output_key.len());
937        }
938
939        {
940            // test table_id 1
941            let prost_table = build_table_with_prefix_column_num(2);
942            let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table);
943            multi_filter_key_extractor.register(
944                2.into(),
945                FilterKeyExtractorImpl::Schema(schema_filter_key_extractor),
946            );
947            let order_types: Vec<OrderType> = vec![OrderType::ascending(), OrderType::ascending()];
948            let schema = vec![DataType::Int64, DataType::Varchar];
949            let serializer = OrderedRowSerde::new(schema, order_types);
950            let row = OwnedRow::new(vec![
951                Some(ScalarImpl::Int64(100)),
952                Some(ScalarImpl::Utf8("abc".into())),
953            ]);
954            let mut row_bytes = vec![];
955            serializer.serialize(&row, &mut row_bytes);
956
957            let table_prefix = {
958                let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN);
959                buf.put_u32(2);
960                buf.to_vec()
961            };
962
963            let vnode_prefix = &dummy_vnode()[..];
964
965            let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat();
966            let output_key = multi_filter_key_extractor.extract(&full_key);
967
968            let data_types = vec![DataType::Int64, DataType::Varchar];
969            let order_types = vec![OrderType::ascending(), OrderType::ascending()];
970            let deserializer = OrderedRowSerde::new(data_types, order_types);
971
972            let pk_prefix_len = deserializer.deserialize_prefix_len(&row_bytes, 1).unwrap();
973
974            assert_eq!(pk_prefix_len, output_key.len());
975        }
976    }
977
978    #[tokio::test]
979    async fn test_compaction_catalog_manager_exception() {
980        let compaction_catalog_manager = super::CompactionCatalogManager::default();
981
982        {
983            let ret = compaction_catalog_manager.acquire(vec![]).await;
984            assert!(ret.is_err());
985            if let Err(e) = ret {
986                assert_eq!(e.to_string(), "Other error: table_id_set is empty");
987            }
988        }
989
990        {
991            // network error with FakeRemoteTableAccessor
992            let ret = compaction_catalog_manager.acquire(vec![1.into()]).await;
993            assert!(ret.is_err());
994            if let Err(e) = ret {
995                assert_eq!(
996                    e.to_string(),
997                    "Other error: request rpc list_tables for meta failed: fake accessor does not support fetch remote table"
998                );
999            }
1000        }
1001    }
1002
1003    #[tokio::test]
1004    async fn test_preloaded_compaction_catalog_manager() {
1005        let mut table = build_table_with_prefix_column_num(1);
1006        table.id = 1.into();
1007        let compaction_catalog_manager = Arc::new(super::CompactionCatalogManager::new_preloaded(
1008            HashMap::from([(1.into(), table)]),
1009        ));
1010
1011        let agent = compaction_catalog_manager
1012            .acquire(vec![1.into(), 2.into()])
1013            .await
1014            .unwrap();
1015        let table_ids = agent.table_ids().collect::<HashSet<_>>();
1016
1017        assert_eq!(table_ids, HashSet::from([1.into()]));
1018
1019        let err = match crate::hummock::compactor::compactor_runner::acquire_complete_catalog_agent(
1020            &compaction_catalog_manager,
1021            vec![1.into(), 2.into()],
1022        )
1023        .await
1024        {
1025            Ok(_) => panic!("partial preloaded catalog should fail strict acquire"),
1026            Err(err) => err,
1027        };
1028
1029        assert!(
1030            err.to_report_string()
1031                .contains("some table ids are not acquired")
1032        );
1033    }
1034}