risingwave_common/catalog/
external_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16
17use risingwave_pb::plan_common::ExternalTableDesc;
18use risingwave_pb::secret::PbSecretRef;
19
20use super::{ColumnDesc, ColumnId, TableId};
21use crate::id::SourceId;
22use crate::util::sort_util::ColumnOrder;
23
24/// Necessary information for compute node to access data in the external database.
25/// Compute node will use this information to connect to the external database and scan the table.
26#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
27pub struct CdcTableDesc {
28    /// Id of the table in RW
29    pub table_id: TableId,
30
31    /// Id of the upstream source in sharing cdc mode
32    pub source_id: SourceId,
33
34    /// The full name of the table in external database, e.g. `database_name.table_name` in MySQL
35    /// and `schema_name.table_name` in the Postgres.
36    pub external_table_name: String,
37    /// The key used to sort in storage.
38    pub pk: Vec<ColumnOrder>,
39    /// All columns in the table, noticed it is NOT sorted by columnId in the vec.
40    pub columns: Vec<ColumnDesc>,
41
42    /// Column indices for primary keys.
43    pub stream_key: Vec<usize>,
44
45    /// properties will be passed into the `StreamScanNode`
46    pub connect_properties: BTreeMap<String, String>,
47    /// Secret refs
48    pub secret_refs: BTreeMap<String, PbSecretRef>,
49}
50
51impl CdcTableDesc {
52    pub fn to_protobuf(&self) -> ExternalTableDesc {
53        ExternalTableDesc {
54            table_id: self.table_id,
55            source_id: self.source_id,
56            columns: self.columns.iter().map(Into::into).collect(),
57            pk: self.pk.iter().map(|v| v.to_protobuf()).collect(),
58            table_name: self.external_table_name.clone(),
59            stream_key: self.stream_key.iter().map(|k| *k as _).collect(),
60            connect_properties: self.connect_properties.clone(),
61            secret_refs: self.secret_refs.clone(),
62        }
63    }
64
65    /// Helper function to create a mapping from `column id` to `column index`
66    pub fn get_id_to_op_idx_mapping(&self) -> HashMap<ColumnId, usize> {
67        ColumnDesc::get_id_to_op_idx_mapping(self.columns.as_slice(), None)
68    }
69}