risingwave_connector/source/
manager.rs1use std::fmt::Debug;
16
17use risingwave_common::catalog::{
18    CDC_OFFSET_COLUMN_NAME, CDC_TABLE_NAME_COLUMN_NAME, ColumnDesc, ColumnId,
19    KAFKA_TIMESTAMP_COLUMN_NAME, ROW_ID_COLUMN_ID, ROW_ID_COLUMN_NAME,
20};
21use risingwave_common::types::DataType;
22use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
23use risingwave_pb::plan_common::{AdditionalColumn, ColumnDescVersion};
24
25#[derive(Clone, Debug)]
29pub struct SourceColumnDesc {
30    pub name: String,
31    pub data_type: DataType,
32    pub column_id: ColumnId,
33    pub additional_column: AdditionalColumn,
37    pub column_type: SourceColumnType,
42    pub is_pk: bool,
44    pub is_hidden_addition_col: bool,
46}
47
48#[derive(Clone, Debug, PartialEq)]
55pub enum SourceColumnType {
56    Normal,
57
58    RowId,
60    Meta,
61    Offset,
62}
63
64impl SourceColumnType {
65    pub fn from_name(name: &str) -> Self {
66        if name.starts_with(KAFKA_TIMESTAMP_COLUMN_NAME)
67            || name.starts_with(CDC_TABLE_NAME_COLUMN_NAME)
68        {
69            Self::Meta
70        } else if name == ROW_ID_COLUMN_NAME {
71            Self::RowId
72        } else if name == CDC_OFFSET_COLUMN_NAME {
73            Self::Offset
74        } else {
75            Self::Normal
76        }
77    }
78}
79
80impl SourceColumnDesc {
81    pub fn simple(name: impl Into<String>, data_type: DataType, column_id: ColumnId) -> Self {
84        let name = name.into();
85        Self {
86            name,
87            data_type,
88            column_id,
89            column_type: SourceColumnType::Normal,
90            is_pk: false,
91            is_hidden_addition_col: false,
92            additional_column: AdditionalColumn { column_type: None },
93        }
94    }
95
96    pub fn hidden_addition_col_from_column_desc(c: &ColumnDesc) -> Self {
97        Self {
98            is_hidden_addition_col: true,
99            ..c.into()
100        }
101    }
102
103    pub fn is_row_id(&self) -> bool {
104        self.column_type == SourceColumnType::RowId
105    }
106
107    pub fn is_meta(&self) -> bool {
108        self.column_type == SourceColumnType::Meta
109    }
110
111    pub fn is_offset(&self) -> bool {
112        self.column_type == SourceColumnType::Offset
113    }
114
115    #[inline]
116    pub fn is_visible(&self) -> bool {
117        !self.is_hidden_addition_col && self.column_type == SourceColumnType::Normal
118    }
119}
120
121impl From<&ColumnDesc> for SourceColumnDesc {
122    fn from(
123        ColumnDesc {
124            data_type,
125            column_id,
126            name,
127            additional_column,
128            generated_or_default_column,
130            description: _,
131            version: _,
132            system_column: _,
133            nullable: _,
134        }: &ColumnDesc,
135    ) -> Self {
136        if let Some(option) = generated_or_default_column {
137            debug_assert!(
138                matches!(option, GeneratedOrDefaultColumn::DefaultColumn(_)),
139                "source column should not be generated: {:?}",
140                generated_or_default_column.as_ref().unwrap()
141            )
142        }
143
144        let column_type = SourceColumnType::from_name(name);
145        if column_type == SourceColumnType::RowId {
146            debug_assert_eq!(name, ROW_ID_COLUMN_NAME);
147            debug_assert_eq!(*column_id, ROW_ID_COLUMN_ID);
148        }
149
150        Self {
151            name: name.clone(),
152            data_type: data_type.clone(),
153            column_id: *column_id,
154            additional_column: additional_column.clone(),
155            column_type,
157            is_pk: false,
158            is_hidden_addition_col: false,
159        }
160    }
161}
162
163impl From<&SourceColumnDesc> for ColumnDesc {
164    fn from(
165        SourceColumnDesc {
166            name,
167            data_type,
168            column_id,
169            additional_column,
170            column_type: _,
172            is_pk: _,
173            is_hidden_addition_col: _,
174        }: &SourceColumnDesc,
175    ) -> Self {
176        ColumnDesc {
177            data_type: data_type.clone(),
178            column_id: *column_id,
179            name: name.clone(),
180            additional_column: additional_column.clone(),
181            generated_or_default_column: None,
183            description: None,
184            version: ColumnDescVersion::LATEST,
185            system_column: None,
186            nullable: true,
187        }
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194
195    #[test]
196    fn test_is_visible() {
197        let mut c = SourceColumnDesc::simple("a", DataType::Int32, ColumnId::new(0));
198        assert!(c.is_visible());
199        c.column_type = SourceColumnType::RowId;
200        assert!(!c.is_visible());
201        c.column_type = SourceColumnType::Meta;
202        assert!(!c.is_visible());
203    }
204}