risingwave_connector/source/
manager.rs
1use std::fmt::Debug;
16
17use risingwave_common::catalog::{
18 CDC_OFFSET_COLUMN_NAME, CDC_TABLE_NAME_COLUMN_NAME, ColumnDesc, ColumnId,
19 KAFKA_TIMESTAMP_COLUMN_NAME, ROW_ID_COLUMN_ID, ROW_ID_COLUMN_NAME,
20};
21use risingwave_common::types::DataType;
22use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
23use risingwave_pb::plan_common::{AdditionalColumn, ColumnDescVersion};
24
25#[derive(Clone, Debug)]
29pub struct SourceColumnDesc {
30 pub name: String,
31 pub data_type: DataType,
32 pub column_id: ColumnId,
33 pub additional_column: AdditionalColumn,
37 pub column_type: SourceColumnType,
42 pub is_pk: bool,
44 pub is_hidden_addition_col: bool,
46}
47
48#[derive(Clone, Debug, PartialEq)]
55pub enum SourceColumnType {
56 Normal,
57
58 RowId,
60 Meta,
61 Offset,
62}
63
64impl SourceColumnType {
65 pub fn from_name(name: &str) -> Self {
66 if name.starts_with(KAFKA_TIMESTAMP_COLUMN_NAME)
67 || name.starts_with(CDC_TABLE_NAME_COLUMN_NAME)
68 {
69 Self::Meta
70 } else if name == ROW_ID_COLUMN_NAME {
71 Self::RowId
72 } else if name == CDC_OFFSET_COLUMN_NAME {
73 Self::Offset
74 } else {
75 Self::Normal
76 }
77 }
78}
79
80impl SourceColumnDesc {
81 pub fn simple(name: impl Into<String>, data_type: DataType, column_id: ColumnId) -> Self {
84 let name = name.into();
85 Self {
86 name,
87 data_type,
88 column_id,
89 column_type: SourceColumnType::Normal,
90 is_pk: false,
91 is_hidden_addition_col: false,
92 additional_column: AdditionalColumn { column_type: None },
93 }
94 }
95
96 pub fn hidden_addition_col_from_column_desc(c: &ColumnDesc) -> Self {
97 Self {
98 is_hidden_addition_col: true,
99 ..c.into()
100 }
101 }
102
103 pub fn is_row_id(&self) -> bool {
104 self.column_type == SourceColumnType::RowId
105 }
106
107 pub fn is_meta(&self) -> bool {
108 self.column_type == SourceColumnType::Meta
109 }
110
111 pub fn is_offset(&self) -> bool {
112 self.column_type == SourceColumnType::Offset
113 }
114
115 #[inline]
116 pub fn is_visible(&self) -> bool {
117 !self.is_hidden_addition_col && self.column_type == SourceColumnType::Normal
118 }
119}
120
121impl From<&ColumnDesc> for SourceColumnDesc {
122 fn from(
123 ColumnDesc {
124 data_type,
125 column_id,
126 name,
127 additional_column,
128 generated_or_default_column,
130 description: _,
131 version: _,
132 system_column: _,
133 nullable: _,
134 }: &ColumnDesc,
135 ) -> Self {
136 if let Some(option) = generated_or_default_column {
137 debug_assert!(
138 matches!(option, GeneratedOrDefaultColumn::DefaultColumn(_)),
139 "source column should not be generated: {:?}",
140 generated_or_default_column.as_ref().unwrap()
141 )
142 }
143
144 let column_type = SourceColumnType::from_name(name);
145 if column_type == SourceColumnType::RowId {
146 debug_assert_eq!(name, ROW_ID_COLUMN_NAME);
147 debug_assert_eq!(*column_id, ROW_ID_COLUMN_ID);
148 }
149
150 Self {
151 name: name.clone(),
152 data_type: data_type.clone(),
153 column_id: *column_id,
154 additional_column: additional_column.clone(),
155 column_type,
157 is_pk: false,
158 is_hidden_addition_col: false,
159 }
160 }
161}
162
163impl From<&SourceColumnDesc> for ColumnDesc {
164 fn from(
165 SourceColumnDesc {
166 name,
167 data_type,
168 column_id,
169 additional_column,
170 column_type: _,
172 is_pk: _,
173 is_hidden_addition_col: _,
174 }: &SourceColumnDesc,
175 ) -> Self {
176 ColumnDesc {
177 data_type: data_type.clone(),
178 column_id: *column_id,
179 name: name.clone(),
180 additional_column: additional_column.clone(),
181 generated_or_default_column: None,
183 description: None,
184 version: ColumnDescVersion::LATEST,
185 system_column: None,
186 nullable: true,
187 }
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194
195 #[test]
196 fn test_is_visible() {
197 let mut c = SourceColumnDesc::simple("a", DataType::Int32, ColumnId::new(0));
198 assert!(c.is_visible());
199 c.column_type = SourceColumnType::RowId;
200 assert!(!c.is_visible());
201 c.column_type = SourceColumnType::Meta;
202 assert!(!c.is_visible());
203 }
204}