risingwave_common/catalog/
external_table.rs1use std::collections::{BTreeMap, HashMap};
16
17use risingwave_pb::plan_common::ExternalTableDesc;
18use risingwave_pb::secret::PbSecretRef;
19
20use super::{ColumnDesc, ColumnId, TableId};
21use crate::util::sort_util::ColumnOrder;
22
23#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
26pub struct CdcTableDesc {
27 pub table_id: TableId,
29
30 pub source_id: TableId,
32
33 pub external_table_name: String,
36 pub pk: Vec<ColumnOrder>,
38 pub columns: Vec<ColumnDesc>,
40
41 pub stream_key: Vec<usize>,
43
44 pub connect_properties: BTreeMap<String, String>,
46 pub secret_refs: BTreeMap<String, PbSecretRef>,
48}
49
50impl CdcTableDesc {
51 pub fn order_column_indices(&self) -> Vec<usize> {
52 self.pk.iter().map(|col| (col.column_index)).collect()
53 }
54
55 pub fn order_column_ids(&self) -> Vec<ColumnId> {
56 self.pk
57 .iter()
58 .map(|col| self.columns[col.column_index].column_id)
59 .collect()
60 }
61
62 pub fn to_protobuf(&self) -> ExternalTableDesc {
63 ExternalTableDesc {
64 table_id: self.table_id.into(),
65 source_id: self.source_id.into(),
66 columns: self.columns.iter().map(Into::into).collect(),
67 pk: self.pk.iter().map(|v| v.to_protobuf()).collect(),
68 table_name: self.external_table_name.clone(),
69 stream_key: self.stream_key.iter().map(|k| *k as _).collect(),
70 connect_properties: self.connect_properties.clone(),
71 secret_refs: self.secret_refs.clone(),
72 }
73 }
74
75 pub fn get_id_to_op_idx_mapping(&self) -> HashMap<ColumnId, usize> {
77 let mut id_to_idx = HashMap::new();
78 self.columns.iter().enumerate().for_each(|(idx, c)| {
79 id_to_idx.insert(c.column_id, idx);
80 });
81 id_to_idx
82 }
83}