risingwave_common/catalog/external_table.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16
17use risingwave_pb::plan_common::ExternalTableDesc;
18use risingwave_pb::secret::PbSecretRef;
19
20use super::{ColumnDesc, ColumnId, TableId};
21use crate::id::SourceId;
22use crate::util::sort_util::ColumnOrder;
23
24/// Necessary information for compute node to access data in the external database.
25/// Compute node will use this information to connect to the external database and scan the table.
26#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
27pub struct CdcTableDesc {
28 /// Id of the table in RW
29 pub table_id: TableId,
30
31 /// Id of the upstream source in sharing cdc mode
32 pub source_id: SourceId,
33
34 /// The full name of the table in external database, e.g. `database_name.table_name` in MySQL
35 /// and `schema_name.table_name` in the Postgres.
36 pub external_table_name: String,
37 /// The key used to sort in storage.
38 pub pk: Vec<ColumnOrder>,
39 /// All columns in the table, noticed it is NOT sorted by columnId in the vec.
40 pub columns: Vec<ColumnDesc>,
41
42 /// Column indices for primary keys.
43 pub stream_key: Vec<usize>,
44
45 /// properties will be passed into the `StreamScanNode`
46 pub connect_properties: BTreeMap<String, String>,
47 /// Secret refs
48 pub secret_refs: BTreeMap<String, PbSecretRef>,
49}
50
51impl CdcTableDesc {
52 pub fn to_protobuf(&self) -> ExternalTableDesc {
53 ExternalTableDesc {
54 table_id: self.table_id,
55 source_id: self.source_id,
56 columns: self.columns.iter().map(Into::into).collect(),
57 pk: self.pk.iter().map(|v| v.to_protobuf()).collect(),
58 table_name: self.external_table_name.clone(),
59 stream_key: self.stream_key.iter().map(|k| *k as _).collect(),
60 connect_properties: self.connect_properties.clone(),
61 secret_refs: self.secret_refs.clone(),
62 }
63 }
64
65 /// Helper function to create a mapping from `column id` to `column index`
66 pub fn get_id_to_op_idx_mapping(&self) -> HashMap<ColumnId, usize> {
67 ColumnDesc::get_id_to_op_idx_mapping(self.columns.as_slice(), None)
68 }
69}