risingwave_common/catalog/
external_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16
17use risingwave_pb::plan_common::ExternalTableDesc;
18use risingwave_pb::secret::PbSecretRef;
19
20use super::{ColumnDesc, ColumnId, TableId};
21use crate::util::sort_util::ColumnOrder;
22
23/// Necessary information for compute node to access data in the external database.
24/// Compute node will use this information to connect to the external database and scan the table.
25#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
26pub struct CdcTableDesc {
27    /// Id of the table in RW
28    pub table_id: TableId,
29
30    /// Id of the upstream source in sharing cdc mode
31    pub source_id: TableId,
32
33    /// The full name of the table in external database, e.g. `database_name.table_name` in MySQL
34    /// and `schema_name.table_name` in the Postgres.
35    pub external_table_name: String,
36    /// The key used to sort in storage.
37    pub pk: Vec<ColumnOrder>,
38    /// All columns in the table, noticed it is NOT sorted by columnId in the vec.
39    pub columns: Vec<ColumnDesc>,
40
41    /// Column indices for primary keys.
42    pub stream_key: Vec<usize>,
43
44    /// properties will be passed into the `StreamScanNode`
45    pub connect_properties: BTreeMap<String, String>,
46    /// Secret refs
47    pub secret_refs: BTreeMap<String, PbSecretRef>,
48}
49
50impl CdcTableDesc {
51    pub fn order_column_indices(&self) -> Vec<usize> {
52        self.pk.iter().map(|col| (col.column_index)).collect()
53    }
54
55    pub fn order_column_ids(&self) -> Vec<ColumnId> {
56        self.pk
57            .iter()
58            .map(|col| self.columns[col.column_index].column_id)
59            .collect()
60    }
61
62    pub fn to_protobuf(&self) -> ExternalTableDesc {
63        ExternalTableDesc {
64            table_id: self.table_id.into(),
65            source_id: self.source_id.into(),
66            columns: self.columns.iter().map(Into::into).collect(),
67            pk: self.pk.iter().map(|v| v.to_protobuf()).collect(),
68            table_name: self.external_table_name.clone(),
69            stream_key: self.stream_key.iter().map(|k| *k as _).collect(),
70            connect_properties: self.connect_properties.clone(),
71            secret_refs: self.secret_refs.clone(),
72        }
73    }
74
75    /// Helper function to create a mapping from `column id` to `column index`
76    pub fn get_id_to_op_idx_mapping(&self) -> HashMap<ColumnId, usize> {
77        let mut id_to_idx = HashMap::new();
78        self.columns.iter().enumerate().for_each(|(idx, c)| {
79            id_to_idx.insert(c.column_id, idx);
80        });
81        id_to_idx
82    }
83}