risingwave_common/catalog/external_table.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16
17use risingwave_pb::plan_common::ExternalTableDesc;
18use risingwave_pb::secret::PbSecretRef;
19
20use super::{ColumnDesc, ColumnId, TableId};
21use crate::util::sort_util::ColumnOrder;
22
23/// Necessary information for compute node to access data in the external database.
24/// Compute node will use this information to connect to the external database and scan the table.
25#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
26pub struct CdcTableDesc {
27 /// Id of the table in RW
28 pub table_id: TableId,
29
30 /// Id of the upstream source in sharing cdc mode
31 pub source_id: TableId,
32
33 /// The full name of the table in external database, e.g. `database_name.table_name` in MySQL
34 /// and `schema_name.table_name` in the Postgres.
35 pub external_table_name: String,
36 /// The key used to sort in storage.
37 pub pk: Vec<ColumnOrder>,
38 /// All columns in the table, noticed it is NOT sorted by columnId in the vec.
39 pub columns: Vec<ColumnDesc>,
40
41 /// Column indices for primary keys.
42 pub stream_key: Vec<usize>,
43
44 /// properties will be passed into the `StreamScanNode`
45 pub connect_properties: BTreeMap<String, String>,
46 /// Secret refs
47 pub secret_refs: BTreeMap<String, PbSecretRef>,
48}
49
50impl CdcTableDesc {
51 pub fn to_protobuf(&self) -> ExternalTableDesc {
52 ExternalTableDesc {
53 table_id: self.table_id.into(),
54 source_id: self.source_id.into(),
55 columns: self.columns.iter().map(Into::into).collect(),
56 pk: self.pk.iter().map(|v| v.to_protobuf()).collect(),
57 table_name: self.external_table_name.clone(),
58 stream_key: self.stream_key.iter().map(|k| *k as _).collect(),
59 connect_properties: self.connect_properties.clone(),
60 secret_refs: self.secret_refs.clone(),
61 }
62 }
63
64 /// Helper function to create a mapping from `column id` to `column index`
65 pub fn get_id_to_op_idx_mapping(&self) -> HashMap<ColumnId, usize> {
66 ColumnDesc::get_id_to_op_idx_mapping(self.columns.as_slice(), None)
67 }
68}