risingwave_connector/parser/
additional_columns.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::LazyLock;
17
18use risingwave_common::bail;
19use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId, max_column_id};
20use risingwave_common::types::{DataType, StructType};
21use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
22use risingwave_pb::plan_common::{
23    AdditionalCollectionName, AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader,
24    AdditionalColumnHeaders, AdditionalColumnKey, AdditionalColumnOffset,
25    AdditionalColumnPartition, AdditionalColumnPayload, AdditionalColumnPulsarMessageIdData,
26    AdditionalColumnTimestamp, AdditionalDatabaseName, AdditionalSchemaName, AdditionalSubject,
27    AdditionalTableName,
28};
29
30use crate::error::ConnectorResult;
31use crate::source::cdc::MONGODB_CDC_CONNECTOR;
32use crate::source::{
33    AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, MQTT_CONNECTOR,
34    NATS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR,
35};
36
37// Hidden additional columns connectors which do not support `include` syntax.
38pub static COMMON_COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashSet<&'static str>> =
39    LazyLock::new(|| HashSet::from(["partition", "offset"]));
40
41pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashMap<&'static str, HashSet<&'static str>>> =
42    LazyLock::new(|| {
43        HashMap::from([
44            (
45                KAFKA_CONNECTOR,
46                HashSet::from([
47                    "key",
48                    "timestamp",
49                    "partition",
50                    "offset",
51                    "header",
52                    "payload",
53                ]),
54            ),
55            (
56                PULSAR_CONNECTOR,
57                HashSet::from(["key", "partition", "offset", "payload", "message_id_data"]),
58            ),
59            (
60                KINESIS_CONNECTOR,
61                HashSet::from(["key", "partition", "offset", "timestamp", "payload"]),
62            ),
63            (
64                NATS_CONNECTOR,
65                HashSet::from(["partition", "offset", "payload", "subject"]),
66            ),
67            (
68                OPENDAL_S3_CONNECTOR,
69                HashSet::from(["file", "offset", "payload"]),
70            ),
71            (GCS_CONNECTOR, HashSet::from(["file", "offset", "payload"])),
72            (
73                AZBLOB_CONNECTOR,
74                HashSet::from(["file", "offset", "payload"]),
75            ),
76            (
77                POSIX_FS_CONNECTOR,
78                HashSet::from(["file", "offset", "payload"]),
79            ),
80            // mongodb-cdc doesn't support cdc backfill table
81            (
82                MONGODB_CDC_CONNECTOR,
83                HashSet::from([
84                    "timestamp",
85                    "partition",
86                    "offset",
87                    "database_name",
88                    "collection_name",
89                ]),
90            ),
91            (MQTT_CONNECTOR, HashSet::from(["offset", "partition"])),
92        ])
93    });
94
95// For CDC backfill table, the additional columns are added to the schema of `StreamCdcScan`
96pub static CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS: LazyLock<Option<HashSet<&'static str>>> =
97    LazyLock::new(|| {
98        Some(HashSet::from([
99            "timestamp",
100            "database_name",
101            "schema_name",
102            "table_name",
103        ]))
104    });
105
106pub fn get_supported_additional_columns(
107    connector_name: &str,
108    is_cdc_backfill: bool,
109) -> Option<&HashSet<&'static str>> {
110    if is_cdc_backfill {
111        CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS.as_ref()
112    } else {
113        COMPATIBLE_ADDITIONAL_COLUMNS.get(connector_name)
114    }
115}
116
117pub fn gen_default_addition_col_name(
118    connector_name: &str,
119    additional_col_type: &str,
120    inner_field_name: Option<&str>,
121    data_type: Option<&DataType>,
122) -> String {
123    let legacy_dt_name = data_type.map(|dt| format!("{:?}", dt).to_lowercase());
124    let col_name = [
125        Some(connector_name),
126        Some(additional_col_type),
127        inner_field_name,
128        legacy_dt_name.as_deref(),
129    ];
130    col_name.iter().fold("_rw".to_owned(), |name, ele| {
131        if let Some(ele) = ele {
132            format!("{}_{}", name, ele)
133        } else {
134            name
135        }
136    })
137}
138
139pub fn build_additional_column_desc(
140    column_id: ColumnId,
141    connector_name: &str,
142    additional_col_type: &str,
143    column_alias: Option<String>,
144    inner_field_name: Option<&str>,
145    data_type: Option<&DataType>,
146    reject_unknown_connector: bool,
147    is_cdc_backfill_table: bool,
148) -> ConnectorResult<ColumnDesc> {
149    let compatible_columns = match (
150        get_supported_additional_columns(connector_name, is_cdc_backfill_table),
151        reject_unknown_connector,
152    ) {
153        (Some(compat_cols), _) => compat_cols,
154        (None, false) => &COMMON_COMPATIBLE_ADDITIONAL_COLUMNS,
155        (None, true) => {
156            bail!(
157                "additional column is not supported for connector {}, acceptable connectors: {:?}",
158                connector_name,
159                COMPATIBLE_ADDITIONAL_COLUMNS.keys(),
160            );
161        }
162    };
163    if !compatible_columns.contains(additional_col_type) {
164        bail!(
165            "additional column type {} is not supported for connector {}, acceptable column types: {:?}",
166            additional_col_type,
167            connector_name,
168            compatible_columns
169        );
170    }
171
172    let column_name = column_alias.unwrap_or_else(|| {
173        gen_default_addition_col_name(
174            connector_name,
175            additional_col_type,
176            inner_field_name,
177            data_type,
178        )
179    });
180
181    let col_desc = match additional_col_type {
182        "key" => ColumnDesc::named_with_additional_column(
183            column_name,
184            column_id,
185            DataType::Bytea,
186            AdditionalColumn {
187                column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
188            },
189        ),
190
191        "timestamp" => ColumnDesc::named_with_additional_column(
192            column_name,
193            column_id,
194            DataType::Timestamptz,
195            AdditionalColumn {
196                column_type: Some(AdditionalColumnType::Timestamp(
197                    AdditionalColumnTimestamp {},
198                )),
199            },
200        ),
201        "partition" => ColumnDesc::named_with_additional_column(
202            column_name,
203            column_id,
204            DataType::Varchar,
205            AdditionalColumn {
206                column_type: Some(AdditionalColumnType::Partition(
207                    AdditionalColumnPartition {},
208                )),
209            },
210        ),
211        "payload" => ColumnDesc::named_with_additional_column(
212            column_name,
213            column_id,
214            DataType::Jsonb,
215            AdditionalColumn {
216                column_type: Some(AdditionalColumnType::Payload(AdditionalColumnPayload {})),
217            },
218        ),
219        "offset" => ColumnDesc::named_with_additional_column(
220            column_name,
221            column_id,
222            DataType::Varchar,
223            AdditionalColumn {
224                column_type: Some(AdditionalColumnType::Offset(AdditionalColumnOffset {})),
225            },
226        ),
227
228        "file" => ColumnDesc::named_with_additional_column(
229            column_name,
230            column_id,
231            DataType::Varchar,
232            AdditionalColumn {
233                column_type: Some(AdditionalColumnType::Filename(AdditionalColumnFilename {})),
234            },
235        ),
236        "header" => build_header_catalog(column_id, &column_name, inner_field_name, data_type),
237        "database_name" => ColumnDesc::named_with_additional_column(
238            column_name,
239            column_id,
240            DataType::Varchar,
241            AdditionalColumn {
242                column_type: Some(AdditionalColumnType::DatabaseName(
243                    AdditionalDatabaseName {},
244                )),
245            },
246        ),
247        "schema_name" => ColumnDesc::named_with_additional_column(
248            column_name,
249            column_id,
250            DataType::Varchar,
251            AdditionalColumn {
252                column_type: Some(AdditionalColumnType::SchemaName(AdditionalSchemaName {})),
253            },
254        ),
255        "table_name" => ColumnDesc::named_with_additional_column(
256            column_name,
257            column_id,
258            DataType::Varchar,
259            AdditionalColumn {
260                column_type: Some(AdditionalColumnType::TableName(AdditionalTableName {})),
261            },
262        ),
263        "collection_name" => ColumnDesc::named_with_additional_column(
264            column_name,
265            column_id,
266            DataType::Varchar,
267            AdditionalColumn {
268                column_type: Some(AdditionalColumnType::CollectionName(
269                    AdditionalCollectionName {},
270                )),
271            },
272        ),
273        "subject" => ColumnDesc::named_with_additional_column(
274            column_name,
275            column_id,
276            DataType::Varchar, // Assuming subject is a string
277            AdditionalColumn {
278                column_type: Some(AdditionalColumnType::Subject(AdditionalSubject {})),
279            },
280        ),
281        "message_id_data" => ColumnDesc::named_with_additional_column(
282            column_name,
283            column_id,
284            DataType::Bytea,
285            AdditionalColumn {
286                column_type: Some(AdditionalColumnType::PulsarMessageIdData(
287                    AdditionalColumnPulsarMessageIdData {},
288                )),
289            },
290        ),
291        _ => unreachable!(),
292    };
293
294    Ok(col_desc)
295}
296
297pub fn derive_pulsar_message_id_data_column(
298    connector_name: &str,
299    column_exist: &mut Vec<bool>,
300    additional_columns: &mut Vec<ColumnDesc>,
301) {
302    // additional columns already check the max_column_id
303    // so we can take the max column id of additional columns as the max column id of all columns
304    let max_column_id = additional_columns
305        .iter()
306        .fold(ColumnId::first_user_column(), |a, b| a.max(b.column_id));
307
308    // assume user does not include `message_id_data` column
309    column_exist.push(false);
310    additional_columns.push(
311        build_additional_column_desc(
312            max_column_id.next(),
313            connector_name,
314            "message_id_data",
315            None,
316            None,
317            None,
318            false,
319            false,
320        )
321        .unwrap(),
322    );
323}
324
325/// Utility function for adding partition and offset columns to the columns, if not specified by the user.
326///
327/// ## Returns
328/// - `columns_exist`: whether 1. `partition`/`file` and 2. `offset` columns are included in `columns`.
329/// - `additional_columns`: The `ColumnCatalog` for `partition`/`file` and `offset` columns.
330pub fn source_add_partition_offset_cols(
331    columns: &[ColumnCatalog],
332    connector_name: &str,
333    skip_col_id: bool,
334) -> (Vec<bool>, Vec<ColumnDesc>) {
335    let mut columns_exist = vec![false; 2];
336
337    let mut last_column_id = max_column_id(columns);
338    let mut assign_col_id = || {
339        if skip_col_id {
340            // col id will be filled outside later. Here just use a placeholder.
341            ColumnId::placeholder()
342        } else {
343            last_column_id = last_column_id.next();
344            last_column_id
345        }
346    };
347
348    let additional_columns: Vec<_> = {
349        let compat_col_types = COMPATIBLE_ADDITIONAL_COLUMNS
350            .get(connector_name)
351            .unwrap_or(&COMMON_COMPATIBLE_ADDITIONAL_COLUMNS);
352        ["partition", "file", "offset"]
353            .iter()
354            .filter_map(|col_type| {
355                if compat_col_types.contains(col_type) {
356                    Some(
357                        build_additional_column_desc(
358                            assign_col_id(),
359                            connector_name,
360                            col_type,
361                            None,
362                            None,
363                            None,
364                            false,
365                            false,
366                        )
367                        .unwrap(),
368                    )
369                } else {
370                    None
371                }
372            })
373            .collect()
374    };
375    assert_eq!(additional_columns.len(), 2);
376    use risingwave_pb::plan_common::additional_column::ColumnType;
377    assert_matches::assert_matches!(
378        additional_columns[0].additional_column,
379        AdditionalColumn {
380            column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
381        }
382    );
383    assert_matches::assert_matches!(
384        additional_columns[1].additional_column,
385        AdditionalColumn {
386            column_type: Some(ColumnType::Offset(_)),
387        }
388    );
389
390    // Check if partition/file/offset columns are included explicitly.
391    for col in columns {
392        match col.column_desc.additional_column {
393            AdditionalColumn {
394                column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
395            } => {
396                columns_exist[0] = true;
397            }
398            AdditionalColumn {
399                column_type: Some(ColumnType::Offset(_)),
400            } => {
401                columns_exist[1] = true;
402            }
403            _ => (),
404        }
405    }
406
407    (columns_exist, additional_columns)
408}
409
410fn build_header_catalog(
411    column_id: ColumnId,
412    col_name: &str,
413    inner_field_name: Option<&str>,
414    data_type: Option<&DataType>,
415) -> ColumnDesc {
416    if let Some(inner) = inner_field_name {
417        let data_type = data_type.unwrap_or(&DataType::Bytea);
418        let pb_data_type = data_type.to_protobuf();
419        ColumnDesc::named_with_additional_column(
420            col_name,
421            column_id,
422            data_type.clone(),
423            AdditionalColumn {
424                column_type: Some(AdditionalColumnType::HeaderInner(AdditionalColumnHeader {
425                    inner_field: inner.to_owned(),
426                    data_type: Some(pb_data_type),
427                })),
428            },
429        )
430    } else {
431        ColumnDesc::named_with_additional_column(
432            col_name,
433            column_id,
434            DataType::list(get_kafka_header_item_datatype()),
435            AdditionalColumn {
436                column_type: Some(AdditionalColumnType::Headers(AdditionalColumnHeaders {})),
437            },
438        )
439    }
440}
441
442pub fn get_kafka_header_item_datatype() -> DataType {
443    let struct_inner = vec![("key", DataType::Varchar), ("value", DataType::Bytea)];
444    DataType::Struct(StructType::new(struct_inner))
445}
446
447#[cfg(test)]
448mod test {
449    use super::*;
450
451    #[test]
452    fn test_gen_default_addition_col_name() {
453        assert_eq!(
454            gen_default_addition_col_name("kafka", "key", None, None),
455            "_rw_kafka_key"
456        );
457        assert_eq!(
458            gen_default_addition_col_name("kafka", "header", Some("inner"), None),
459            "_rw_kafka_header_inner"
460        );
461        assert_eq!(
462            gen_default_addition_col_name(
463                "kafka",
464                "header",
465                Some("inner"),
466                Some(&DataType::Varchar)
467            ),
468            "_rw_kafka_header_inner_varchar"
469        );
470    }
471}