Skip to main content

risingwave_connector/parser/
additional_columns.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::LazyLock;
17
18use risingwave_common::bail;
19use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId, max_column_id};
20use risingwave_common::types::{DataType, StructType};
21use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
22use risingwave_pb::plan_common::{
23    AdditionalCollectionName, AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader,
24    AdditionalColumnHeaders, AdditionalColumnKey, AdditionalColumnOffset,
25    AdditionalColumnPartition, AdditionalColumnPayload, AdditionalColumnPulsarMessageIdData,
26    AdditionalColumnTimestamp, AdditionalDatabaseName, AdditionalSchemaName, AdditionalSubject,
27    AdditionalTableName,
28};
29
30use crate::error::ConnectorResult;
31use crate::source::cdc::MONGODB_CDC_CONNECTOR;
32use crate::source::{
33    AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, MQTT_CONNECTOR,
34    NATS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR,
35};
36
37// Hidden additional columns connectors which do not support `include` syntax.
38pub static COMMON_COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashSet<&'static str>> =
39    LazyLock::new(|| HashSet::from(["partition", "offset"]));
40
41pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashMap<&'static str, HashSet<&'static str>>> =
42    LazyLock::new(|| {
43        HashMap::from([
44            (
45                KAFKA_CONNECTOR,
46                HashSet::from([
47                    "key",
48                    "timestamp",
49                    "partition",
50                    "offset",
51                    "header",
52                    "payload",
53                ]),
54            ),
55            (
56                PULSAR_CONNECTOR,
57                HashSet::from([
58                    "key",
59                    "partition",
60                    "offset",
61                    "payload",
62                    "message_id_data",
63                    "header",
64                ]),
65            ),
66            (
67                KINESIS_CONNECTOR,
68                HashSet::from(["key", "partition", "offset", "timestamp", "payload"]),
69            ),
70            (
71                NATS_CONNECTOR,
72                HashSet::from(["partition", "offset", "payload", "subject"]),
73            ),
74            (
75                OPENDAL_S3_CONNECTOR,
76                HashSet::from(["file", "offset", "payload"]),
77            ),
78            (GCS_CONNECTOR, HashSet::from(["file", "offset", "payload"])),
79            (
80                AZBLOB_CONNECTOR,
81                HashSet::from(["file", "offset", "payload"]),
82            ),
83            (
84                POSIX_FS_CONNECTOR,
85                HashSet::from(["file", "offset", "payload"]),
86            ),
87            // mongodb-cdc doesn't support cdc backfill table
88            (
89                MONGODB_CDC_CONNECTOR,
90                HashSet::from([
91                    "timestamp",
92                    "partition",
93                    "offset",
94                    "database_name",
95                    "collection_name",
96                ]),
97            ),
98            (MQTT_CONNECTOR, HashSet::from(["offset", "partition"])),
99        ])
100    });
101
102// For CDC backfill table, the additional columns are added to the schema of `StreamCdcScan`
103pub static CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS: LazyLock<Option<HashSet<&'static str>>> =
104    LazyLock::new(|| {
105        Some(HashSet::from([
106            "timestamp",
107            "database_name",
108            "schema_name",
109            "table_name",
110        ]))
111    });
112
113pub fn get_supported_additional_columns(
114    connector_name: &str,
115    is_cdc_backfill: bool,
116) -> Option<&HashSet<&'static str>> {
117    if is_cdc_backfill {
118        CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS.as_ref()
119    } else {
120        COMPATIBLE_ADDITIONAL_COLUMNS.get(connector_name)
121    }
122}
123
124pub fn gen_default_addition_col_name(
125    connector_name: &str,
126    additional_col_type: &str,
127    inner_field_name: Option<&str>,
128    data_type: Option<&DataType>,
129) -> String {
130    let legacy_dt_name = data_type.map(|dt| format!("{:?}", dt).to_lowercase());
131    let col_name = [
132        Some(connector_name),
133        Some(additional_col_type),
134        inner_field_name,
135        legacy_dt_name.as_deref(),
136    ];
137    col_name.iter().fold("_rw".to_owned(), |name, ele| {
138        if let Some(ele) = ele {
139            format!("{}_{}", name, ele)
140        } else {
141            name
142        }
143    })
144}
145
146pub fn build_additional_column_desc(
147    column_id: ColumnId,
148    connector_name: &str,
149    additional_col_type: &str,
150    column_alias: Option<String>,
151    inner_field_name: Option<&str>,
152    data_type: Option<&DataType>,
153    reject_unknown_connector: bool,
154    is_cdc_backfill_table: bool,
155) -> ConnectorResult<ColumnDesc> {
156    let compatible_columns = match (
157        get_supported_additional_columns(connector_name, is_cdc_backfill_table),
158        reject_unknown_connector,
159    ) {
160        (Some(compat_cols), _) => compat_cols,
161        (None, false) => &COMMON_COMPATIBLE_ADDITIONAL_COLUMNS,
162        (None, true) => {
163            bail!(
164                "additional column is not supported for connector {}, acceptable connectors: {:?}",
165                connector_name,
166                COMPATIBLE_ADDITIONAL_COLUMNS.keys(),
167            );
168        }
169    };
170    if !compatible_columns.contains(additional_col_type) {
171        bail!(
172            "additional column type {} is not supported for connector {}, acceptable column types: {:?}",
173            additional_col_type,
174            connector_name,
175            compatible_columns
176        );
177    }
178
179    let column_name = column_alias.unwrap_or_else(|| {
180        gen_default_addition_col_name(
181            connector_name,
182            additional_col_type,
183            inner_field_name,
184            data_type,
185        )
186    });
187
188    let col_desc = match additional_col_type {
189        "key" => ColumnDesc::named_with_additional_column(
190            column_name,
191            column_id,
192            DataType::Bytea,
193            AdditionalColumn {
194                column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
195            },
196        ),
197
198        "timestamp" => ColumnDesc::named_with_additional_column(
199            column_name,
200            column_id,
201            DataType::Timestamptz,
202            AdditionalColumn {
203                column_type: Some(AdditionalColumnType::Timestamp(
204                    AdditionalColumnTimestamp {},
205                )),
206            },
207        ),
208        "partition" => ColumnDesc::named_with_additional_column(
209            column_name,
210            column_id,
211            DataType::Varchar,
212            AdditionalColumn {
213                column_type: Some(AdditionalColumnType::Partition(
214                    AdditionalColumnPartition {},
215                )),
216            },
217        ),
218        "payload" => ColumnDesc::named_with_additional_column(
219            column_name,
220            column_id,
221            DataType::Jsonb,
222            AdditionalColumn {
223                column_type: Some(AdditionalColumnType::Payload(AdditionalColumnPayload {})),
224            },
225        ),
226        "offset" => ColumnDesc::named_with_additional_column(
227            column_name,
228            column_id,
229            DataType::Varchar,
230            AdditionalColumn {
231                column_type: Some(AdditionalColumnType::Offset(AdditionalColumnOffset {})),
232            },
233        ),
234
235        "file" => ColumnDesc::named_with_additional_column(
236            column_name,
237            column_id,
238            DataType::Varchar,
239            AdditionalColumn {
240                column_type: Some(AdditionalColumnType::Filename(AdditionalColumnFilename {})),
241            },
242        ),
243        "header" => build_header_catalog(column_id, &column_name, inner_field_name, data_type),
244        "database_name" => ColumnDesc::named_with_additional_column(
245            column_name,
246            column_id,
247            DataType::Varchar,
248            AdditionalColumn {
249                column_type: Some(AdditionalColumnType::DatabaseName(
250                    AdditionalDatabaseName {},
251                )),
252            },
253        ),
254        "schema_name" => ColumnDesc::named_with_additional_column(
255            column_name,
256            column_id,
257            DataType::Varchar,
258            AdditionalColumn {
259                column_type: Some(AdditionalColumnType::SchemaName(AdditionalSchemaName {})),
260            },
261        ),
262        "table_name" => ColumnDesc::named_with_additional_column(
263            column_name,
264            column_id,
265            DataType::Varchar,
266            AdditionalColumn {
267                column_type: Some(AdditionalColumnType::TableName(AdditionalTableName {})),
268            },
269        ),
270        "collection_name" => ColumnDesc::named_with_additional_column(
271            column_name,
272            column_id,
273            DataType::Varchar,
274            AdditionalColumn {
275                column_type: Some(AdditionalColumnType::CollectionName(
276                    AdditionalCollectionName {},
277                )),
278            },
279        ),
280        "subject" => ColumnDesc::named_with_additional_column(
281            column_name,
282            column_id,
283            DataType::Varchar, // Assuming subject is a string
284            AdditionalColumn {
285                column_type: Some(AdditionalColumnType::Subject(AdditionalSubject {})),
286            },
287        ),
288        "message_id_data" => ColumnDesc::named_with_additional_column(
289            column_name,
290            column_id,
291            DataType::Bytea,
292            AdditionalColumn {
293                column_type: Some(AdditionalColumnType::PulsarMessageIdData(
294                    AdditionalColumnPulsarMessageIdData {},
295                )),
296            },
297        ),
298        _ => unreachable!(),
299    };
300
301    Ok(col_desc)
302}
303
304pub fn derive_pulsar_message_id_data_column(
305    connector_name: &str,
306    column_exist: &mut Vec<bool>,
307    additional_columns: &mut Vec<ColumnDesc>,
308) {
309    // additional columns already check the max_column_id
310    // so we can take the max column id of additional columns as the max column id of all columns
311    let max_column_id = additional_columns
312        .iter()
313        .fold(ColumnId::first_user_column(), |a, b| a.max(b.column_id));
314
315    // assume user does not include `message_id_data` column
316    column_exist.push(false);
317    additional_columns.push(
318        build_additional_column_desc(
319            max_column_id.next(),
320            connector_name,
321            "message_id_data",
322            None,
323            None,
324            None,
325            false,
326            false,
327        )
328        .unwrap(),
329    );
330}
331
332/// Utility function for adding partition and offset columns to the columns, if not specified by the user.
333///
334/// ## Returns
335/// - `columns_exist`: whether 1. `partition`/`file` and 2. `offset` columns are included in `columns`.
336/// - `additional_columns`: The `ColumnCatalog` for `partition`/`file` and `offset` columns.
337pub fn source_add_partition_offset_cols(
338    columns: &[ColumnCatalog],
339    connector_name: &str,
340    skip_col_id: bool,
341) -> (Vec<bool>, Vec<ColumnDesc>) {
342    let mut columns_exist = vec![false; 2];
343
344    let mut last_column_id = max_column_id(columns);
345    let mut assign_col_id = || {
346        if skip_col_id {
347            // col id will be filled outside later. Here just use a placeholder.
348            ColumnId::placeholder()
349        } else {
350            last_column_id = last_column_id.next();
351            last_column_id
352        }
353    };
354
355    let additional_columns: Vec<_> = {
356        let compat_col_types = COMPATIBLE_ADDITIONAL_COLUMNS
357            .get(connector_name)
358            .unwrap_or(&COMMON_COMPATIBLE_ADDITIONAL_COLUMNS);
359        ["partition", "file", "offset"]
360            .iter()
361            .filter_map(|col_type| {
362                if compat_col_types.contains(col_type) {
363                    Some(
364                        build_additional_column_desc(
365                            assign_col_id(),
366                            connector_name,
367                            col_type,
368                            None,
369                            None,
370                            None,
371                            false,
372                            false,
373                        )
374                        .unwrap(),
375                    )
376                } else {
377                    None
378                }
379            })
380            .collect()
381    };
382    assert_eq!(additional_columns.len(), 2);
383    use risingwave_pb::plan_common::additional_column::ColumnType;
384    assert_matches::assert_matches!(
385        additional_columns[0].additional_column,
386        AdditionalColumn {
387            column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
388        }
389    );
390    assert_matches::assert_matches!(
391        additional_columns[1].additional_column,
392        AdditionalColumn {
393            column_type: Some(ColumnType::Offset(_)),
394        }
395    );
396
397    // Check if partition/file/offset columns are included explicitly.
398    for col in columns {
399        match col.column_desc.additional_column {
400            AdditionalColumn {
401                column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
402            } => {
403                columns_exist[0] = true;
404            }
405            AdditionalColumn {
406                column_type: Some(ColumnType::Offset(_)),
407            } => {
408                columns_exist[1] = true;
409            }
410            _ => (),
411        }
412    }
413
414    (columns_exist, additional_columns)
415}
416
417fn build_header_catalog(
418    column_id: ColumnId,
419    col_name: &str,
420    inner_field_name: Option<&str>,
421    data_type: Option<&DataType>,
422) -> ColumnDesc {
423    if let Some(inner) = inner_field_name {
424        let data_type = data_type.unwrap_or(&DataType::Bytea);
425        let pb_data_type = data_type.to_protobuf();
426        ColumnDesc::named_with_additional_column(
427            col_name,
428            column_id,
429            data_type.clone(),
430            AdditionalColumn {
431                column_type: Some(AdditionalColumnType::HeaderInner(AdditionalColumnHeader {
432                    inner_field: inner.to_owned(),
433                    data_type: Some(pb_data_type),
434                })),
435            },
436        )
437    } else {
438        ColumnDesc::named_with_additional_column(
439            col_name,
440            column_id,
441            DataType::list(get_kafka_header_item_datatype()),
442            AdditionalColumn {
443                column_type: Some(AdditionalColumnType::Headers(AdditionalColumnHeaders {})),
444            },
445        )
446    }
447}
448
449pub fn get_kafka_header_item_datatype() -> DataType {
450    let struct_inner = vec![("key", DataType::Varchar), ("value", DataType::Bytea)];
451    DataType::Struct(StructType::new(struct_inner))
452}
453
454#[cfg(test)]
455mod test {
456    use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
457
458    use super::*;
459
460    #[test]
461    fn test_gen_default_addition_col_name() {
462        assert_eq!(
463            gen_default_addition_col_name("kafka", "key", None, None),
464            "_rw_kafka_key"
465        );
466        assert_eq!(
467            gen_default_addition_col_name("kafka", "header", Some("inner"), None),
468            "_rw_kafka_header_inner"
469        );
470        assert_eq!(
471            gen_default_addition_col_name(
472                "kafka",
473                "header",
474                Some("inner"),
475                Some(&DataType::Varchar)
476            ),
477            "_rw_kafka_header_inner_varchar"
478        );
479    }
480
481    #[test]
482    fn test_build_pulsar_header_additional_column() {
483        let col = build_additional_column_desc(
484            ColumnId::new(1),
485            PULSAR_CONNECTOR,
486            "header",
487            Some("pulsar_header".to_owned()),
488            Some("tenant"),
489            Some(&DataType::Varchar),
490            true,
491            false,
492        )
493        .unwrap();
494
495        assert_eq!(col.name, "pulsar_header");
496        assert_eq!(col.data_type, DataType::Varchar);
497        assert_matches::assert_matches!(
498            col.additional_column.column_type,
499            Some(AdditionalColumnType::HeaderInner(ref header))
500                if header.inner_field == "tenant"
501        );
502    }
503}