risingwave_connector/parser/
additional_columns.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::LazyLock;
17
18use risingwave_common::bail;
19use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId, max_column_id};
20use risingwave_common::types::{DataType, StructType};
21use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
22use risingwave_pb::plan_common::{
23    AdditionalCollectionName, AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader,
24    AdditionalColumnHeaders, AdditionalColumnKey, AdditionalColumnOffset,
25    AdditionalColumnPartition, AdditionalColumnPayload, AdditionalColumnTimestamp,
26    AdditionalDatabaseName, AdditionalSchemaName, AdditionalSubject, AdditionalTableName,
27};
28
29use crate::error::ConnectorResult;
30use crate::source::cdc::MONGODB_CDC_CONNECTOR;
31use crate::source::{
32    AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, MQTT_CONNECTOR,
33    NATS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR,
34};
35
36// Hidden additional columns connectors which do not support `include` syntax.
37pub static COMMON_COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashSet<&'static str>> =
38    LazyLock::new(|| HashSet::from(["partition", "offset"]));
39
40pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashMap<&'static str, HashSet<&'static str>>> =
41    LazyLock::new(|| {
42        HashMap::from([
43            (
44                KAFKA_CONNECTOR,
45                HashSet::from([
46                    "key",
47                    "timestamp",
48                    "partition",
49                    "offset",
50                    "header",
51                    "payload",
52                ]),
53            ),
54            (
55                PULSAR_CONNECTOR,
56                HashSet::from(["key", "partition", "offset", "payload"]),
57            ),
58            (
59                KINESIS_CONNECTOR,
60                HashSet::from(["key", "partition", "offset", "timestamp", "payload"]),
61            ),
62            (
63                NATS_CONNECTOR,
64                HashSet::from(["partition", "offset", "payload", "subject"]),
65            ),
66            (
67                OPENDAL_S3_CONNECTOR,
68                HashSet::from(["file", "offset", "payload"]),
69            ),
70            (GCS_CONNECTOR, HashSet::from(["file", "offset", "payload"])),
71            (
72                AZBLOB_CONNECTOR,
73                HashSet::from(["file", "offset", "payload"]),
74            ),
75            (
76                POSIX_FS_CONNECTOR,
77                HashSet::from(["file", "offset", "payload"]),
78            ),
79            // mongodb-cdc doesn't support cdc backfill table
80            (
81                MONGODB_CDC_CONNECTOR,
82                HashSet::from([
83                    "timestamp",
84                    "partition",
85                    "offset",
86                    "database_name",
87                    "collection_name",
88                ]),
89            ),
90            (MQTT_CONNECTOR, HashSet::from(["offset", "partition"])),
91        ])
92    });
93
94// For CDC backfill table, the additional columns are added to the schema of `StreamCdcScan`
95pub static CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS: LazyLock<Option<HashSet<&'static str>>> =
96    LazyLock::new(|| {
97        Some(HashSet::from([
98            "timestamp",
99            "database_name",
100            "schema_name",
101            "table_name",
102        ]))
103    });
104
105pub fn get_supported_additional_columns(
106    connector_name: &str,
107    is_cdc_backfill: bool,
108) -> Option<&HashSet<&'static str>> {
109    if is_cdc_backfill {
110        CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS.as_ref()
111    } else {
112        COMPATIBLE_ADDITIONAL_COLUMNS.get(connector_name)
113    }
114}
115
116pub fn gen_default_addition_col_name(
117    connector_name: &str,
118    additional_col_type: &str,
119    inner_field_name: Option<&str>,
120    data_type: Option<&DataType>,
121) -> String {
122    let legacy_dt_name = data_type.map(|dt| format!("{:?}", dt).to_lowercase());
123    let col_name = [
124        Some(connector_name),
125        Some(additional_col_type),
126        inner_field_name,
127        legacy_dt_name.as_deref(),
128    ];
129    col_name.iter().fold("_rw".to_owned(), |name, ele| {
130        if let Some(ele) = ele {
131            format!("{}_{}", name, ele)
132        } else {
133            name
134        }
135    })
136}
137
138pub fn build_additional_column_desc(
139    column_id: ColumnId,
140    connector_name: &str,
141    additional_col_type: &str,
142    column_alias: Option<String>,
143    inner_field_name: Option<&str>,
144    data_type: Option<&DataType>,
145    reject_unknown_connector: bool,
146    is_cdc_backfill_table: bool,
147) -> ConnectorResult<ColumnDesc> {
148    let compatible_columns = match (
149        get_supported_additional_columns(connector_name, is_cdc_backfill_table),
150        reject_unknown_connector,
151    ) {
152        (Some(compat_cols), _) => compat_cols,
153        (None, false) => &COMMON_COMPATIBLE_ADDITIONAL_COLUMNS,
154        (None, true) => {
155            bail!(
156                "additional column is not supported for connector {}, acceptable connectors: {:?}",
157                connector_name,
158                COMPATIBLE_ADDITIONAL_COLUMNS.keys(),
159            );
160        }
161    };
162    if !compatible_columns.contains(additional_col_type) {
163        bail!(
164            "additional column type {} is not supported for connector {}, acceptable column types: {:?}",
165            additional_col_type,
166            connector_name,
167            compatible_columns
168        );
169    }
170
171    let column_name = column_alias.unwrap_or_else(|| {
172        gen_default_addition_col_name(
173            connector_name,
174            additional_col_type,
175            inner_field_name,
176            data_type,
177        )
178    });
179
180    let col_desc = match additional_col_type {
181        "key" => ColumnDesc::named_with_additional_column(
182            column_name,
183            column_id,
184            DataType::Bytea,
185            AdditionalColumn {
186                column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
187            },
188        ),
189
190        "timestamp" => ColumnDesc::named_with_additional_column(
191            column_name,
192            column_id,
193            DataType::Timestamptz,
194            AdditionalColumn {
195                column_type: Some(AdditionalColumnType::Timestamp(
196                    AdditionalColumnTimestamp {},
197                )),
198            },
199        ),
200        "partition" => ColumnDesc::named_with_additional_column(
201            column_name,
202            column_id,
203            DataType::Varchar,
204            AdditionalColumn {
205                column_type: Some(AdditionalColumnType::Partition(
206                    AdditionalColumnPartition {},
207                )),
208            },
209        ),
210        "payload" => ColumnDesc::named_with_additional_column(
211            column_name,
212            column_id,
213            DataType::Jsonb,
214            AdditionalColumn {
215                column_type: Some(AdditionalColumnType::Payload(AdditionalColumnPayload {})),
216            },
217        ),
218        "offset" => ColumnDesc::named_with_additional_column(
219            column_name,
220            column_id,
221            DataType::Varchar,
222            AdditionalColumn {
223                column_type: Some(AdditionalColumnType::Offset(AdditionalColumnOffset {})),
224            },
225        ),
226
227        "file" => ColumnDesc::named_with_additional_column(
228            column_name,
229            column_id,
230            DataType::Varchar,
231            AdditionalColumn {
232                column_type: Some(AdditionalColumnType::Filename(AdditionalColumnFilename {})),
233            },
234        ),
235        "header" => build_header_catalog(column_id, &column_name, inner_field_name, data_type),
236        "database_name" => ColumnDesc::named_with_additional_column(
237            column_name,
238            column_id,
239            DataType::Varchar,
240            AdditionalColumn {
241                column_type: Some(AdditionalColumnType::DatabaseName(
242                    AdditionalDatabaseName {},
243                )),
244            },
245        ),
246        "schema_name" => ColumnDesc::named_with_additional_column(
247            column_name,
248            column_id,
249            DataType::Varchar,
250            AdditionalColumn {
251                column_type: Some(AdditionalColumnType::SchemaName(AdditionalSchemaName {})),
252            },
253        ),
254        "table_name" => ColumnDesc::named_with_additional_column(
255            column_name,
256            column_id,
257            DataType::Varchar,
258            AdditionalColumn {
259                column_type: Some(AdditionalColumnType::TableName(AdditionalTableName {})),
260            },
261        ),
262        "collection_name" => ColumnDesc::named_with_additional_column(
263            column_name,
264            column_id,
265            DataType::Varchar,
266            AdditionalColumn {
267                column_type: Some(AdditionalColumnType::CollectionName(
268                    AdditionalCollectionName {},
269                )),
270            },
271        ),
272        "subject" => ColumnDesc::named_with_additional_column(
273            column_name,
274            column_id,
275            DataType::Varchar, // Assuming subject is a string
276            AdditionalColumn {
277                column_type: Some(AdditionalColumnType::Subject(AdditionalSubject {})),
278            },
279        ),
280        _ => unreachable!(),
281    };
282
283    Ok(col_desc)
284}
285
286/// Utility function for adding partition and offset columns to the columns, if not specified by the user.
287///
288/// ## Returns
289/// - `columns_exist`: whether 1. `partition`/`file` and 2. `offset` columns are included in `columns`.
290/// - `additional_columns`: The `ColumnCatalog` for `partition`/`file` and `offset` columns.
291pub fn source_add_partition_offset_cols(
292    columns: &[ColumnCatalog],
293    connector_name: &str,
294    skip_col_id: bool,
295) -> ([bool; 2], [ColumnDesc; 2]) {
296    let mut columns_exist = [false; 2];
297
298    let mut last_column_id = max_column_id(columns);
299    let mut assign_col_id = || {
300        if skip_col_id {
301            // col id will be filled outside later. Here just use a placeholder.
302            ColumnId::placeholder()
303        } else {
304            last_column_id = last_column_id.next();
305            last_column_id
306        }
307    };
308
309    let additional_columns: Vec<_> = {
310        let compat_col_types = COMPATIBLE_ADDITIONAL_COLUMNS
311            .get(connector_name)
312            .unwrap_or(&COMMON_COMPATIBLE_ADDITIONAL_COLUMNS);
313        ["partition", "file", "offset"]
314            .iter()
315            .filter_map(|col_type| {
316                if compat_col_types.contains(col_type) {
317                    Some(
318                        build_additional_column_desc(
319                            assign_col_id(),
320                            connector_name,
321                            col_type,
322                            None,
323                            None,
324                            None,
325                            false,
326                            false,
327                        )
328                        .unwrap(),
329                    )
330                } else {
331                    None
332                }
333            })
334            .collect()
335    };
336    assert_eq!(additional_columns.len(), 2);
337    use risingwave_pb::plan_common::additional_column::ColumnType;
338    assert_matches::assert_matches!(
339        additional_columns[0].additional_column,
340        AdditionalColumn {
341            column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
342        }
343    );
344    assert_matches::assert_matches!(
345        additional_columns[1].additional_column,
346        AdditionalColumn {
347            column_type: Some(ColumnType::Offset(_)),
348        }
349    );
350
351    // Check if partition/file/offset columns are included explicitly.
352    for col in columns {
353        match col.column_desc.additional_column {
354            AdditionalColumn {
355                column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
356            } => {
357                columns_exist[0] = true;
358            }
359            AdditionalColumn {
360                column_type: Some(ColumnType::Offset(_)),
361            } => {
362                columns_exist[1] = true;
363            }
364            _ => (),
365        }
366    }
367
368    (columns_exist, additional_columns.try_into().unwrap())
369}
370
371fn build_header_catalog(
372    column_id: ColumnId,
373    col_name: &str,
374    inner_field_name: Option<&str>,
375    data_type: Option<&DataType>,
376) -> ColumnDesc {
377    if let Some(inner) = inner_field_name {
378        let data_type = data_type.unwrap_or(&DataType::Bytea);
379        let pb_data_type = data_type.to_protobuf();
380        ColumnDesc::named_with_additional_column(
381            col_name,
382            column_id,
383            data_type.clone(),
384            AdditionalColumn {
385                column_type: Some(AdditionalColumnType::HeaderInner(AdditionalColumnHeader {
386                    inner_field: inner.to_owned(),
387                    data_type: Some(pb_data_type),
388                })),
389            },
390        )
391    } else {
392        ColumnDesc::named_with_additional_column(
393            col_name,
394            column_id,
395            DataType::List(get_kafka_header_item_datatype().into()),
396            AdditionalColumn {
397                column_type: Some(AdditionalColumnType::Headers(AdditionalColumnHeaders {})),
398            },
399        )
400    }
401}
402
403pub fn get_kafka_header_item_datatype() -> DataType {
404    let struct_inner = vec![("key", DataType::Varchar), ("value", DataType::Bytea)];
405    DataType::Struct(StructType::new(struct_inner))
406}
407
408#[cfg(test)]
409mod test {
410    use super::*;
411
412    #[test]
413    fn test_gen_default_addition_col_name() {
414        assert_eq!(
415            gen_default_addition_col_name("kafka", "key", None, None),
416            "_rw_kafka_key"
417        );
418        assert_eq!(
419            gen_default_addition_col_name("kafka", "header", Some("inner"), None),
420            "_rw_kafka_header_inner"
421        );
422        assert_eq!(
423            gen_default_addition_col_name(
424                "kafka",
425                "header",
426                Some("inner"),
427                Some(&DataType::Varchar)
428            ),
429            "_rw_kafka_header_inner_varchar"
430        );
431    }
432}