risingwave_connector/source/
manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16
17use risingwave_common::catalog::{
18    CDC_OFFSET_COLUMN_NAME, CDC_TABLE_NAME_COLUMN_NAME, ColumnDesc, ColumnId,
19    KAFKA_TIMESTAMP_COLUMN_NAME, ROW_ID_COLUMN_ID, ROW_ID_COLUMN_NAME,
20};
21use risingwave_common::types::DataType;
22use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
23use risingwave_pb::plan_common::{AdditionalColumn, ColumnDescVersion};
24
25/// `SourceColumnDesc` is used to describe a column in the Source.
26///
27/// See the implementation of `From<&ColumnDesc>` for the difference between `SourceColumnDesc` and [`ColumnDesc`].
28#[derive(Clone, Debug)]
29pub struct SourceColumnDesc {
30    pub name: String,
31    pub data_type: DataType,
32    pub column_id: ColumnId,
33    /// `additional_column` and `column_type` are orthogonal
34    /// `additional_column` is used to indicate the column is from which part of the message
35    /// `column_type` is used to indicate the type of the column, only used in cdc scenario
36    pub additional_column: AdditionalColumn,
37    // ------
38    // Fields above are the same in `ColumnDesc`.
39    // Fields below are specific to `SourceColumnDesc`.
40    // ------
41    pub column_type: SourceColumnType,
42    /// `is_pk` is used to indicate whether the column is part of the primary key columns.
43    pub is_pk: bool,
44    /// `is_hidden_addition_col` is used to indicate whether the column is a hidden addition column.
45    pub is_hidden_addition_col: bool,
46}
47
48/// `SourceColumnType` is used to indicate the type of a column emitted by the Source.
49/// There are 4 types of columns:
50/// - `Normal`: a visible column
51/// - `RowId`: internal column to uniquely identify a row
52/// - `Meta`: internal column to store source related metadata
53/// - `Offset`: internal column to store upstream offset for a row, used in CDC source
54#[derive(Clone, Debug, PartialEq)]
55pub enum SourceColumnType {
56    Normal,
57
58    // internal columns
59    RowId,
60    Meta,
61    Offset,
62}
63
64impl SourceColumnType {
65    pub fn from_name(name: &str) -> Self {
66        if name.starts_with(KAFKA_TIMESTAMP_COLUMN_NAME)
67            || name.starts_with(CDC_TABLE_NAME_COLUMN_NAME)
68        {
69            Self::Meta
70        } else if name == ROW_ID_COLUMN_NAME {
71            Self::RowId
72        } else if name == CDC_OFFSET_COLUMN_NAME {
73            Self::Offset
74        } else {
75            Self::Normal
76        }
77    }
78}
79
80impl SourceColumnDesc {
81    /// Create a [`SourceColumnDesc`].
82    // TODO(struct): rename to `new`?
83    pub fn simple(name: impl Into<String>, data_type: DataType, column_id: ColumnId) -> Self {
84        let name = name.into();
85        Self {
86            name,
87            data_type,
88            column_id,
89            column_type: SourceColumnType::Normal,
90            is_pk: false,
91            is_hidden_addition_col: false,
92            additional_column: AdditionalColumn { column_type: None },
93        }
94    }
95
96    pub fn hidden_addition_col_from_column_desc(c: &ColumnDesc) -> Self {
97        Self {
98            is_hidden_addition_col: true,
99            ..c.into()
100        }
101    }
102
103    pub fn is_row_id(&self) -> bool {
104        self.column_type == SourceColumnType::RowId
105    }
106
107    pub fn is_meta(&self) -> bool {
108        self.column_type == SourceColumnType::Meta
109    }
110
111    pub fn is_offset(&self) -> bool {
112        self.column_type == SourceColumnType::Offset
113    }
114
115    #[inline]
116    pub fn is_visible(&self) -> bool {
117        !self.is_hidden_addition_col && self.column_type == SourceColumnType::Normal
118    }
119}
120
121impl From<&ColumnDesc> for SourceColumnDesc {
122    fn from(
123        ColumnDesc {
124            data_type,
125            column_id,
126            name,
127            additional_column,
128            // ignored fields below
129            generated_or_default_column,
130            description: _,
131            version: _,
132            system_column: _,
133            nullable: _,
134        }: &ColumnDesc,
135    ) -> Self {
136        if let Some(option) = generated_or_default_column {
137            debug_assert!(
138                matches!(option, GeneratedOrDefaultColumn::DefaultColumn(_)),
139                "source column should not be generated: {:?}",
140                generated_or_default_column.as_ref().unwrap()
141            )
142        }
143
144        let column_type = SourceColumnType::from_name(name);
145        if column_type == SourceColumnType::RowId {
146            debug_assert_eq!(name, ROW_ID_COLUMN_NAME);
147            debug_assert_eq!(*column_id, ROW_ID_COLUMN_ID);
148        }
149
150        Self {
151            name: name.clone(),
152            data_type: data_type.clone(),
153            column_id: *column_id,
154            additional_column: additional_column.clone(),
155            // additional fields below
156            column_type,
157            is_pk: false,
158            is_hidden_addition_col: false,
159        }
160    }
161}
162
163impl From<&SourceColumnDesc> for ColumnDesc {
164    fn from(
165        SourceColumnDesc {
166            name,
167            data_type,
168            column_id,
169            additional_column,
170            // ignored fields below
171            column_type: _,
172            is_pk: _,
173            is_hidden_addition_col: _,
174        }: &SourceColumnDesc,
175    ) -> Self {
176        ColumnDesc {
177            data_type: data_type.clone(),
178            column_id: *column_id,
179            name: name.clone(),
180            additional_column: additional_column.clone(),
181            // additional fields below
182            generated_or_default_column: None,
183            description: None,
184            version: ColumnDescVersion::LATEST,
185            system_column: None,
186            nullable: true,
187        }
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194
195    #[test]
196    fn test_is_visible() {
197        let mut c = SourceColumnDesc::simple("a", DataType::Int32, ColumnId::new(0));
198        assert!(c.is_visible());
199        c.column_type = SourceColumnType::RowId;
200        assert!(!c.is_visible());
201        c.column_type = SourceColumnType::Meta;
202        assert!(!c.is_visible());
203    }
204}