risingwave_common/catalog/
external_table.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashMap};

use risingwave_pb::plan_common::ExternalTableDesc;
use risingwave_pb::secret::PbSecretRef;

use super::{ColumnDesc, ColumnId, TableId};
use crate::util::sort_util::ColumnOrder;

/// Necessary information for compute node to access data in the external database.
/// Compute node will use this information to connect to the external database and scan the table.
#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
pub struct CdcTableDesc {
    /// Id of the table in RW
    pub table_id: TableId,

    /// Id of the upstream source in sharing cdc mode
    pub source_id: TableId,

    /// The full name of the table in external database, e.g. `database_name.table_name` in MySQL
    /// and `schema_name.table_name` in the Postgres.
    pub external_table_name: String,
    /// The key used to sort in storage.
    pub pk: Vec<ColumnOrder>,
    /// All columns in the table, noticed it is NOT sorted by columnId in the vec.
    pub columns: Vec<ColumnDesc>,

    /// Column indices for primary keys.
    pub stream_key: Vec<usize>,

    /// properties will be passed into the `StreamScanNode`
    pub connect_properties: BTreeMap<String, String>,
    /// Secret refs
    pub secret_refs: BTreeMap<String, PbSecretRef>,
}

impl CdcTableDesc {
    pub fn order_column_indices(&self) -> Vec<usize> {
        self.pk.iter().map(|col| (col.column_index)).collect()
    }

    pub fn order_column_ids(&self) -> Vec<ColumnId> {
        self.pk
            .iter()
            .map(|col| self.columns[col.column_index].column_id)
            .collect()
    }

    pub fn to_protobuf(&self) -> ExternalTableDesc {
        ExternalTableDesc {
            table_id: self.table_id.into(),
            source_id: self.source_id.into(),
            columns: self.columns.iter().map(Into::into).collect(),
            pk: self.pk.iter().map(|v| v.to_protobuf()).collect(),
            table_name: self.external_table_name.clone(),
            stream_key: self.stream_key.iter().map(|k| *k as _).collect(),
            connect_properties: self.connect_properties.clone(),
            secret_refs: self.secret_refs.clone(),
        }
    }

    /// Helper function to create a mapping from `column id` to `column index`
    pub fn get_id_to_op_idx_mapping(&self) -> HashMap<ColumnId, usize> {
        let mut id_to_idx = HashMap::new();
        self.columns.iter().enumerate().for_each(|(idx, c)| {
            id_to_idx.insert(c.column_id, idx);
        });
        id_to_idx
    }
}