risingwave_frontend/handler/create_source/
additional_column.rs1use super::*;
16
17fn check_additional_column_compatibility(
19 column_def: &IncludeOptionItem,
20 format_encode: Option<&FormatEncodeOptions>,
21) -> Result<()> {
22 if column_def.inner_field.is_some()
24 && !column_def
25 .column_type
26 .real_value()
27 .eq_ignore_ascii_case("header")
28 {
29 return Err(RwError::from(ProtocolError(format!(
30 "Only header column can have inner field, but got {:?}",
31 column_def.column_type.real_value(),
32 ))));
33 }
34
35 if let Some(schema) = format_encode
37 && column_def
38 .column_type
39 .real_value()
40 .eq_ignore_ascii_case("payload")
41 && !matches!(schema.row_encode, Encode::Json)
42 {
43 return Err(RwError::from(ProtocolError(format!(
44 "INCLUDE payload is only allowed when using ENCODE JSON, but got ENCODE {:?}",
45 schema.row_encode
46 ))));
47 }
48 Ok(())
49}
50
51pub fn handle_addition_columns(
53 format_encode: Option<&FormatEncodeOptions>,
54 with_properties: &BTreeMap<String, String>,
55 mut additional_columns: IncludeOption,
56 columns: &mut Vec<ColumnCatalog>,
57 is_cdc_backfill_table: bool,
58) -> Result<()> {
59 let connector_name = with_properties.get_connector().unwrap(); if get_supported_additional_columns(connector_name.as_str(), is_cdc_backfill_table).is_none()
62 && !additional_columns.is_empty()
63 {
64 return Err(RwError::from(ProtocolError(format!(
65 "Connector {} accepts no additional column but got {:?}",
66 connector_name, additional_columns
67 ))));
68 }
69
70 while let Some(item) = additional_columns.pop() {
71 check_additional_column_compatibility(&item, format_encode)?;
72
73 let data_type = item
74 .header_inner_expect_type
75 .map(|dt| bind_data_type(&dt))
76 .transpose()?;
77 if let Some(dt) = &data_type
78 && !matches!(dt, DataType::Bytea | DataType::Varchar)
79 {
80 return Err(
81 ErrorCode::BindError(format!("invalid additional column data type: {dt}")).into(),
82 );
83 }
84 let col = build_additional_column_desc(
85 ColumnId::placeholder(),
86 connector_name.as_str(),
87 item.column_type.real_value().as_str(),
88 item.column_alias.map(|alias| alias.real_value()),
89 item.inner_field.as_deref(),
90 data_type.as_ref(),
91 true,
92 is_cdc_backfill_table,
93 )?;
94 columns.push(ColumnCatalog::visible(col));
95 }
96
97 Ok(())
98}
99
100pub fn check_and_add_timestamp_column(
102 with_properties: &WithOptions,
103 columns: &mut Vec<ColumnCatalog>,
104) {
105 if with_properties.is_kafka_connector() {
106 if columns.iter().any(|col| {
107 matches!(
108 col.column_desc.additional_column.column_type,
109 Some(AdditionalColumnType::Timestamp(_))
110 )
111 }) {
112 return;
114 }
115
116 let col = build_additional_column_desc(
118 ColumnId::placeholder(),
119 KAFKA_CONNECTOR,
120 "timestamp",
121 Some(KAFKA_TIMESTAMP_COLUMN_NAME.to_owned()),
122 None,
123 None,
124 true,
125 false,
126 )
127 .unwrap();
128 columns.push(ColumnCatalog::hidden(col));
129 }
130}