risingwave_frontend/handler/create_source/
external_schema.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! bind columns from external schema
16
17use super::*;
18
19mod json;
20use json::*;
21mod avro;
22use avro::extract_avro_table_schema;
23pub mod debezium;
24pub mod iceberg;
25use iceberg::extract_iceberg_columns;
26mod protobuf;
27use protobuf::extract_protobuf_table_schema;
28pub mod nexmark;
29
30/// Resolves the schema of the source from external schema file.
31/// See <https://www.risingwave.dev/docs/current/sql-create-source> for more information.
32///
33/// Note: the returned schema strictly corresponds to the schema.
34/// Other special columns like additional columns (`INCLUDE`), and `row_id` column are not included.
35pub async fn bind_columns_from_source(
36    session: &SessionImpl,
37    format_encode: &FormatEncodeOptions,
38    with_properties: Either<&WithOptions, &WithOptionsSecResolved>,
39    create_source_type: CreateSourceType,
40) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
41    let (columns_from_resolve_source, mut source_info) =
42        if create_source_type == CreateSourceType::SharedCdc {
43            bind_columns_from_source_for_cdc(session, format_encode)?
44        } else {
45            bind_columns_from_source_for_non_cdc(session, format_encode, with_properties).await?
46        };
47    if create_source_type.is_shared() {
48        // Note: this field should be called is_shared. Check field doc for more details.
49        source_info.cdc_source_job = true;
50        source_info.is_distributed = create_source_type == CreateSourceType::SharedNonCdc;
51    }
52    Ok((columns_from_resolve_source, source_info))
53}
54
55async fn bind_columns_from_source_for_non_cdc(
56    session: &SessionImpl,
57    format_encode: &FormatEncodeOptions,
58    with_properties: Either<&WithOptions, &WithOptionsSecResolved>,
59) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
60    const MESSAGE_NAME_KEY: &str = "message";
61    const KEY_MESSAGE_NAME_KEY: &str = "key.message";
62    const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy";
63
64    let options_with_secret = match with_properties {
65        Either::Left(options) => {
66            let (sec_resolve_props, connection_type, _) = resolve_connection_ref_and_secret_ref(
67                options.clone(),
68                session,
69                TelemetryDatabaseObject::Source,
70            )?;
71            if !SOURCE_ALLOWED_CONNECTION_CONNECTOR.contains(&connection_type) {
72                return Err(RwError::from(ProtocolError(format!(
73                    "connection type {:?} is not allowed, allowed types: {:?}",
74                    connection_type, SOURCE_ALLOWED_CONNECTION_CONNECTOR
75                ))));
76            }
77
78            sec_resolve_props
79        }
80        Either::Right(options_with_secret) => options_with_secret.clone(),
81    };
82
83    let is_kafka: bool = options_with_secret.is_kafka_connector();
84
85    // todo: need to resolve connection ref for schema registry
86    let (sec_resolve_props, connection_type, schema_registry_conn_ref) =
87        resolve_connection_ref_and_secret_ref(
88            WithOptions::try_from(format_encode.row_options())?,
89            session,
90            TelemetryDatabaseObject::Source,
91        )?;
92    ensure_connection_type_allowed(connection_type, &SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY)?;
93
94    let (format_encode_options, format_encode_secret_refs) = sec_resolve_props.into_parts();
95    // Need real secret to access the schema registry
96    let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets(
97        format_encode_options.clone(),
98        format_encode_secret_refs.clone(),
99    )?;
100
101    fn get_key_message_name(options: &mut BTreeMap<String, String>) -> Option<String> {
102        consume_string_from_options(options, KEY_MESSAGE_NAME_KEY)
103            .map(|ele| Some(ele.0))
104            .unwrap_or(None)
105    }
106    fn get_sr_name_strategy_check(
107        options: &mut BTreeMap<String, String>,
108        use_sr: bool,
109    ) -> Result<Option<i32>> {
110        let name_strategy = get_name_strategy_or_default(try_consume_string_from_options(
111            options,
112            NAME_STRATEGY_KEY,
113        ))?;
114        if !use_sr && name_strategy.is_some() {
115            return Err(RwError::from(ProtocolError(
116                "schema registry name strategy only works with schema registry enabled".to_owned(),
117            )));
118        }
119        Ok(name_strategy)
120    }
121
122    let mut stream_source_info = StreamSourceInfo {
123        format: format_to_prost(&format_encode.format) as i32,
124        row_encode: row_encode_to_prost(&format_encode.row_encode) as i32,
125        format_encode_options,
126        format_encode_secret_refs,
127        connection_id: schema_registry_conn_ref,
128        ..Default::default()
129    };
130
131    if format_encode.format == Format::Debezium {
132        try_consume_string_from_options(&mut format_encode_options_to_consume, DEBEZIUM_IGNORE_KEY);
133    }
134
135    let columns = match (&format_encode.format, &format_encode.row_encode) {
136        (Format::Native, Encode::Native)
137        | (Format::Plain, Encode::Bytes)
138        | (Format::DebeziumMongo, Encode::Json) => None,
139        (Format::Plain, Encode::Protobuf) | (Format::Upsert, Encode::Protobuf) => {
140            let (row_schema_location, use_schema_registry) =
141                get_schema_location(&mut format_encode_options_to_consume)?;
142            let message_name = consume_string_from_options(
143                &mut format_encode_options_to_consume,
144                MESSAGE_NAME_KEY,
145            )?;
146            let name_strategy = get_sr_name_strategy_check(
147                &mut format_encode_options_to_consume,
148                use_schema_registry,
149            )?;
150
151            stream_source_info.use_schema_registry = use_schema_registry;
152            stream_source_info
153                .row_schema_location
154                .clone_from(&row_schema_location.0);
155            stream_source_info
156                .proto_message_name
157                .clone_from(&message_name.0);
158            stream_source_info.key_message_name =
159                get_key_message_name(&mut format_encode_options_to_consume);
160            stream_source_info.name_strategy =
161                name_strategy.unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32);
162
163            Some(
164                extract_protobuf_table_schema(
165                    &stream_source_info,
166                    &options_with_secret,
167                    &mut format_encode_options_to_consume,
168                )
169                .await?,
170            )
171        }
172        (format @ (Format::Plain | Format::Upsert | Format::Debezium), Encode::Avro) => {
173            if format_encode_options_to_consume
174                .remove(AWS_GLUE_SCHEMA_ARN_KEY)
175                .is_none()
176            {
177                // Legacy logic that assumes either `schema.location` or confluent `schema.registry`.
178                // The handling of newly added aws glue is centralized in `connector::parser`.
179                // TODO(xiangjinwu): move these option parsing to `connector::parser` as well.
180
181                let (row_schema_location, use_schema_registry) =
182                    get_schema_location(&mut format_encode_options_to_consume)?;
183
184                if matches!(format, Format::Debezium) && !use_schema_registry {
185                    return Err(RwError::from(ProtocolError(
186                        "schema location for DEBEZIUM_AVRO row format is not supported".to_owned(),
187                    )));
188                }
189
190                let message_name = try_consume_string_from_options(
191                    &mut format_encode_options_to_consume,
192                    MESSAGE_NAME_KEY,
193                );
194                let name_strategy = get_sr_name_strategy_check(
195                    &mut format_encode_options_to_consume,
196                    use_schema_registry,
197                )?;
198
199                stream_source_info.use_schema_registry = use_schema_registry;
200                stream_source_info
201                    .row_schema_location
202                    .clone_from(&row_schema_location.0);
203                stream_source_info.proto_message_name =
204                    message_name.unwrap_or(AstString("".into())).0;
205                stream_source_info.key_message_name =
206                    get_key_message_name(&mut format_encode_options_to_consume);
207                stream_source_info.name_strategy =
208                    name_strategy.unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32);
209            }
210
211            Some(
212                extract_avro_table_schema(
213                    &stream_source_info,
214                    &options_with_secret,
215                    &mut format_encode_options_to_consume,
216                    matches!(format, Format::Debezium),
217                )
218                .await?,
219            )
220        }
221        (Format::Plain, Encode::Csv) => {
222            let chars =
223                consume_string_from_options(&mut format_encode_options_to_consume, "delimiter")?.0;
224            let delimiter = get_delimiter(chars.as_str()).context("failed to parse delimiter")?;
225            let has_header = try_consume_string_from_options(
226                &mut format_encode_options_to_consume,
227                "without_header",
228            )
229            .map(|s| s.0 == "false")
230            .unwrap_or(true);
231
232            if is_kafka && has_header {
233                return Err(RwError::from(ProtocolError(
234                    "CSV HEADER is not supported when creating table with Kafka connector"
235                        .to_owned(),
236                )));
237            }
238
239            stream_source_info.csv_delimiter = delimiter as i32;
240            stream_source_info.csv_has_header = has_header;
241
242            None
243        }
244        // For parquet format, this step is implemented in parquet parser.
245        (Format::Plain, Encode::Parquet) => None,
246        (
247            Format::Plain | Format::Upsert | Format::Maxwell | Format::Canal | Format::Debezium,
248            Encode::Json,
249        ) => {
250            if matches!(
251                format_encode.format,
252                Format::Plain | Format::Upsert | Format::Debezium
253            ) {
254                // Parse the value but throw it away.
255                // It would be too late to report error in `SpecificParserConfig::new`,
256                // which leads to recovery loop.
257                // TODO: rely on SpecificParserConfig::new to validate, like Avro
258                TimestamptzHandling::from_options(&format_encode_options_to_consume)
259                    .map_err(|err| InvalidInputSyntax(err.message))?;
260                try_consume_string_from_options(
261                    &mut format_encode_options_to_consume,
262                    TimestamptzHandling::OPTION_KEY,
263                );
264            }
265
266            let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?;
267            stream_source_info.use_schema_registry =
268                json_schema_infer_use_schema_registry(&schema_config);
269
270            extract_json_table_schema(
271                &schema_config,
272                &options_with_secret,
273                &mut format_encode_options_to_consume,
274            )
275            .await?
276        }
277        (Format::None, Encode::None) => {
278            if options_with_secret.is_iceberg_connector() {
279                Some(
280                    extract_iceberg_columns(&options_with_secret)
281                        .await
282                        .map_err(|err| ProtocolError(err.to_report_string()))?,
283                )
284            } else {
285                None
286            }
287        }
288        (format, encoding) => {
289            return Err(RwError::from(ProtocolError(format!(
290                "Unknown combination {:?} {:?}",
291                format, encoding
292            ))));
293        }
294    };
295
296    if !format_encode_options_to_consume.is_empty() {
297        let err_string = format!(
298            "Get unknown format_encode_options for {:?} {:?}: {}",
299            format_encode.format,
300            format_encode.row_encode,
301            format_encode_options_to_consume
302                .keys()
303                .map(|k| k.to_string())
304                .collect::<Vec<String>>()
305                .join(","),
306        );
307        session.notice_to_user(err_string);
308    }
309    Ok((columns, stream_source_info))
310}
311
312fn bind_columns_from_source_for_cdc(
313    session: &SessionImpl,
314    format_encode: &FormatEncodeOptions,
315) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
316    let with_options = WithOptions::try_from(format_encode.row_options())?;
317    if !with_options.connection_ref().is_empty() {
318        return Err(RwError::from(NotSupported(
319            "CDC connector does not support connection ref yet".to_owned(),
320            "Explicitly specify the connection in WITH clause".to_owned(),
321        )));
322    }
323    let (format_encode_options, format_encode_secret_refs) =
324        resolve_secret_ref_in_with_options(with_options, session)?.into_parts();
325
326    // Need real secret to access the schema registry
327    let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets(
328        format_encode_options.clone(),
329        format_encode_secret_refs.clone(),
330    )?;
331
332    match (&format_encode.format, &format_encode.row_encode) {
333        (Format::Plain, Encode::Json) => (),
334        (format, encoding) => {
335            // Note: parser will also check this. Just be extra safe here
336            return Err(RwError::from(ProtocolError(format!(
337                "Row format for CDC connectors should be either omitted or set to `FORMAT PLAIN ENCODE JSON`, got: {:?} {:?}",
338                format, encoding
339            ))));
340        }
341    };
342
343    let columns = ColumnCatalog::debezium_cdc_source_cols().to_vec();
344    let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?;
345
346    let stream_source_info = StreamSourceInfo {
347        format: format_to_prost(&format_encode.format) as i32,
348        row_encode: row_encode_to_prost(&format_encode.row_encode) as i32,
349        format_encode_options,
350        use_schema_registry: json_schema_infer_use_schema_registry(&schema_config),
351        cdc_source_job: true,
352        is_distributed: false,
353        format_encode_secret_refs,
354        ..Default::default()
355    };
356    if !format_encode_options_to_consume.is_empty() {
357        let err_string = format!(
358            "Get unknown format_encode_options for {:?} {:?}: {}",
359            format_encode.format,
360            format_encode.row_encode,
361            format_encode_options_to_consume
362                .keys()
363                .map(|k| k.to_string())
364                .collect::<Vec<String>>()
365                .join(","),
366        );
367        session.notice_to_user(err_string);
368    }
369    Ok((Some(columns), stream_source_info))
370}
371
372fn format_to_prost(format: &Format) -> FormatType {
373    match format {
374        Format::Native => FormatType::Native,
375        Format::Plain => FormatType::Plain,
376        Format::Upsert => FormatType::Upsert,
377        Format::Debezium => FormatType::Debezium,
378        Format::DebeziumMongo => FormatType::DebeziumMongo,
379        Format::Maxwell => FormatType::Maxwell,
380        Format::Canal => FormatType::Canal,
381        Format::None => FormatType::None,
382    }
383}
384fn row_encode_to_prost(row_encode: &Encode) -> EncodeType {
385    match row_encode {
386        Encode::Native => EncodeType::Native,
387        Encode::Json => EncodeType::Json,
388        Encode::Avro => EncodeType::Avro,
389        Encode::Protobuf => EncodeType::Protobuf,
390        Encode::Csv => EncodeType::Csv,
391        Encode::Bytes => EncodeType::Bytes,
392        Encode::Template => EncodeType::Template,
393        Encode::Parquet => EncodeType::Parquet,
394        Encode::None => EncodeType::None,
395        Encode::Text => EncodeType::Text,
396    }
397}
398
399pub fn get_schema_location(
400    format_encode_options: &mut BTreeMap<String, String>,
401) -> Result<(AstString, bool)> {
402    let schema_location = try_consume_string_from_options(format_encode_options, "schema.location");
403    let schema_registry = try_consume_string_from_options(format_encode_options, "schema.registry");
404    match (schema_location, schema_registry) {
405        (None, None) => Err(RwError::from(ProtocolError(
406            "missing either a schema location or a schema registry".to_owned(),
407        ))),
408        (None, Some(schema_registry)) => Ok((schema_registry, true)),
409        (Some(schema_location), None) => Ok((schema_location, false)),
410        (Some(_), Some(_)) => Err(RwError::from(ProtocolError(
411            "only need either the schema location or the schema registry".to_owned(),
412        ))),
413    }
414}
415
416pub fn schema_has_schema_registry(schema: &FormatEncodeOptions) -> bool {
417    match schema.row_encode {
418        Encode::Avro | Encode::Protobuf => true,
419        Encode::Json => {
420            let mut options = WithOptions::try_from(schema.row_options()).unwrap();
421            matches!(get_json_schema_location(options.inner_mut()), Ok(Some(_)))
422        }
423        _ => false,
424    }
425}
426
427#[inline]
428fn get_name_strategy_or_default(name_strategy: Option<AstString>) -> Result<Option<i32>> {
429    match name_strategy {
430        None => Ok(None),
431        Some(name) => Ok(Some(name_strategy_from_str(name.0.as_str())
432            .ok_or_else(|| RwError::from(ProtocolError(format!("\
433            expect strategy name in topic_name_strategy, record_name_strategy and topic_record_name_strategy, but got {}", name))))? as i32)),
434    }
435}