risingwave_frontend/handler/create_source/
additional_column.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16
17// check the additional column compatibility with the format and encode
18fn check_additional_column_compatibility(
19    column_def: &IncludeOptionItem,
20    format_encode: Option<&FormatEncodeOptions>,
21) -> Result<()> {
22    // only allow header column have inner field
23    if column_def.inner_field.is_some()
24        && !column_def
25            .column_type
26            .real_value()
27            .eq_ignore_ascii_case("header")
28    {
29        return Err(RwError::from(ProtocolError(format!(
30            "Only header column can have inner field, but got {:?}",
31            column_def.column_type.real_value(),
32        ))));
33    }
34
35    // Payload column only allowed when encode is JSON
36    if let Some(schema) = format_encode
37        && column_def
38            .column_type
39            .real_value()
40            .eq_ignore_ascii_case("payload")
41        && !matches!(schema.row_encode, Encode::Json)
42    {
43        return Err(RwError::from(ProtocolError(format!(
44            "INCLUDE payload is only allowed when using ENCODE JSON, but got ENCODE {:?}",
45            schema.row_encode
46        ))));
47    }
48    Ok(())
49}
50
51/// add connector-spec columns to the end of column catalog
52pub fn handle_addition_columns(
53    format_encode: Option<&FormatEncodeOptions>,
54    with_properties: &BTreeMap<String, String>,
55    mut additional_columns: IncludeOption,
56    columns: &mut Vec<ColumnCatalog>,
57    is_cdc_backfill_table: bool,
58) -> Result<()> {
59    let connector_name = with_properties.get_connector().unwrap(); // there must be a connector in source
60
61    if get_supported_additional_columns(connector_name.as_str(), is_cdc_backfill_table).is_none()
62        && !additional_columns.is_empty()
63    {
64        return Err(RwError::from(ProtocolError(format!(
65            "Connector {} accepts no additional column but got {:?}",
66            connector_name, additional_columns
67        ))));
68    }
69
70    while let Some(item) = additional_columns.pop() {
71        check_additional_column_compatibility(&item, format_encode)?;
72
73        let data_type = item
74            .header_inner_expect_type
75            .map(|dt| bind_data_type(&dt))
76            .transpose()?;
77        if let Some(dt) = &data_type
78            && !matches!(dt, DataType::Bytea | DataType::Varchar)
79        {
80            return Err(
81                ErrorCode::BindError(format!("invalid additional column data type: {dt}")).into(),
82            );
83        }
84        let col = build_additional_column_desc(
85            ColumnId::placeholder(),
86            connector_name.as_str(),
87            item.column_type.real_value().as_str(),
88            item.column_alias.map(|alias| alias.real_value()),
89            item.inner_field.as_deref(),
90            data_type.as_ref(),
91            true,
92            is_cdc_backfill_table,
93        )?;
94        columns.push(ColumnCatalog::visible(col));
95    }
96
97    Ok(())
98}
99
100// Add a hidden column `_rw_kafka_timestamp` to each message from Kafka source.
101pub fn check_and_add_timestamp_column(
102    with_properties: &WithOptions,
103    columns: &mut Vec<ColumnCatalog>,
104) {
105    if with_properties.is_kafka_connector() {
106        if columns.iter().any(|col| {
107            matches!(
108                col.column_desc.additional_column.column_type,
109                Some(AdditionalColumnType::Timestamp(_))
110            )
111        }) {
112            // already has timestamp column, no need to add a new one
113            return;
114        }
115
116        // add a hidden column `_rw_kafka_timestamp` to each message from Kafka source
117        let col = build_additional_column_desc(
118            ColumnId::placeholder(),
119            KAFKA_CONNECTOR,
120            "timestamp",
121            Some(KAFKA_TIMESTAMP_COLUMN_NAME.to_owned()),
122            None,
123            None,
124            true,
125            false,
126        )
127        .unwrap();
128        columns.push(ColumnCatalog::hidden(col));
129    }
130}