risingwave_frontend/handler/create_source/
external_schema.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! bind columns from external schema
16
17use risingwave_connector::source::ADBC_SNOWFLAKE_CONNECTOR;
18
19use super::*;
20
21mod json;
22use json::*;
23mod avro;
24use avro::extract_avro_table_schema;
25pub mod debezium;
26pub mod iceberg;
27use iceberg::extract_iceberg_columns;
28mod protobuf;
29use protobuf::extract_protobuf_table_schema;
30pub mod nexmark;
31
32/// Define a feature-gated function from a module.
33///
34/// This macro conditionally includes a module and re-exports a function from it.
35/// When the feature is disabled, it generates a stub function that returns an error.
36macro_rules! feature_gated_function {
37    (
38        mod $mod_name:ident,
39        $feature_name:literal,
40        async fn $func_name:ident ( $( $param_name:ident : $param_type:ty ),* $(,)? ) -> $ret_type:ty
41    ) => {
42        #[cfg(feature = $feature_name)]
43        pub mod $mod_name;
44        #[cfg(feature = $feature_name)]
45        use $mod_name::$func_name;
46
47        #[cfg(not(feature = $feature_name))]
48        #[allow(unused_variables)]
49        pub async fn $func_name( $( $param_name : $param_type ),* ) -> $ret_type {
50            Err(anyhow::anyhow!(
51                "Feature `{}` is not enabled at compile time. \
52                Please enable it in `Cargo.toml` and rebuild.",
53                $feature_name
54            ))
55        }
56    };
57}
58
59feature_gated_function!(
60    mod adbc_snowflake,
61    "source-adbc_snowflake",
62    async fn extract_adbc_snowflake_columns(
63        with_properties: &WithOptionsSecResolved,
64    ) -> anyhow::Result<Vec<ColumnCatalog>>
65);
66
67/// Resolves the schema of the source from external schema file.
68/// See <https://www.risingwave.dev/docs/current/sql-create-source> for more information.
69///
70/// Note: the returned schema strictly corresponds to the schema.
71/// Other special columns like additional columns (`INCLUDE`), and `row_id` column are not included.
72pub async fn bind_columns_from_source(
73    session: &SessionImpl,
74    format_encode: &FormatEncodeOptions,
75    with_properties: Either<&WithOptions, &WithOptionsSecResolved>,
76    create_source_type: CreateSourceType,
77) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
78    let (columns_from_resolve_source, mut source_info) =
79        if create_source_type == CreateSourceType::SharedCdc {
80            bind_columns_from_source_for_cdc(session, format_encode)?
81        } else {
82            bind_columns_from_source_for_non_cdc(session, format_encode, with_properties).await?
83        };
84    if create_source_type.is_shared() {
85        // Note: this field should be called is_shared. Check field doc for more details.
86        source_info.cdc_source_job = true;
87        source_info.is_distributed = create_source_type == CreateSourceType::SharedNonCdc;
88    }
89    Ok((columns_from_resolve_source, source_info))
90}
91
92async fn bind_columns_from_source_for_non_cdc(
93    session: &SessionImpl,
94    format_encode: &FormatEncodeOptions,
95    with_properties: Either<&WithOptions, &WithOptionsSecResolved>,
96) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
97    const MESSAGE_NAME_KEY: &str = "message";
98    const KEY_MESSAGE_NAME_KEY: &str = "key.message";
99    const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy";
100
101    let options_with_secret = match with_properties {
102        Either::Left(options) => {
103            let (sec_resolve_props, connection_type, _) = resolve_connection_ref_and_secret_ref(
104                options.clone(),
105                session,
106                Some(TelemetryDatabaseObject::Source),
107            )?;
108            if !SOURCE_ALLOWED_CONNECTION_CONNECTOR.contains(&connection_type) {
109                return Err(RwError::from(ProtocolError(format!(
110                    "connection type {:?} is not allowed, allowed types: {:?}",
111                    connection_type, SOURCE_ALLOWED_CONNECTION_CONNECTOR
112                ))));
113            }
114
115            sec_resolve_props
116        }
117        Either::Right(options_with_secret) => options_with_secret.clone(),
118    };
119
120    let is_kafka: bool = options_with_secret.is_kafka_connector();
121
122    // todo: need to resolve connection ref for schema registry
123    let (sec_resolve_props, connection_type, schema_registry_conn_ref) =
124        resolve_connection_ref_and_secret_ref(
125            WithOptions::try_from(format_encode.row_options())?,
126            session,
127            Some(TelemetryDatabaseObject::Source),
128        )?;
129    ensure_connection_type_allowed(connection_type, &SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY)?;
130
131    let (format_encode_options, format_encode_secret_refs) = sec_resolve_props.into_parts();
132    // Need real secret to access the schema registry
133    let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets(
134        format_encode_options.clone(),
135        format_encode_secret_refs.clone(),
136    )?;
137
138    fn get_key_message_name(options: &mut BTreeMap<String, String>) -> Option<String> {
139        consume_string_from_options(options, KEY_MESSAGE_NAME_KEY)
140            .map(|ele| Some(ele.0))
141            .unwrap_or(None)
142    }
143    fn get_sr_name_strategy_check(
144        options: &mut BTreeMap<String, String>,
145        use_sr: bool,
146    ) -> Result<Option<i32>> {
147        let name_strategy = get_name_strategy_or_default(try_consume_string_from_options(
148            options,
149            NAME_STRATEGY_KEY,
150        ))?;
151        if !use_sr && name_strategy.is_some() {
152            return Err(RwError::from(ProtocolError(
153                "schema registry name strategy only works with schema registry enabled".to_owned(),
154            )));
155        }
156        Ok(name_strategy)
157    }
158
159    let mut stream_source_info = StreamSourceInfo {
160        format: format_to_prost(&format_encode.format) as i32,
161        row_encode: row_encode_to_prost(&format_encode.row_encode) as i32,
162        format_encode_options,
163        format_encode_secret_refs,
164        connection_id: schema_registry_conn_ref,
165        ..Default::default()
166    };
167
168    if format_encode.format == Format::Debezium {
169        try_consume_string_from_options(&mut format_encode_options_to_consume, DEBEZIUM_IGNORE_KEY);
170    }
171
172    let columns = match (&format_encode.format, &format_encode.row_encode) {
173        (Format::Native, Encode::Native)
174        | (Format::Plain, Encode::Bytes)
175        | (Format::DebeziumMongo, Encode::Json) => None,
176        (Format::Plain, Encode::Protobuf) | (Format::Upsert, Encode::Protobuf) => {
177            let (row_schema_location, use_schema_registry) =
178                get_schema_location(&mut format_encode_options_to_consume)?;
179            let message_name = consume_string_from_options(
180                &mut format_encode_options_to_consume,
181                MESSAGE_NAME_KEY,
182            )?;
183            let name_strategy = get_sr_name_strategy_check(
184                &mut format_encode_options_to_consume,
185                use_schema_registry,
186            )?;
187
188            stream_source_info.use_schema_registry = use_schema_registry;
189            stream_source_info
190                .row_schema_location
191                .clone_from(&row_schema_location.0);
192            stream_source_info
193                .proto_message_name
194                .clone_from(&message_name.0);
195            stream_source_info.key_message_name =
196                get_key_message_name(&mut format_encode_options_to_consume);
197            stream_source_info.name_strategy =
198                name_strategy.unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32);
199
200            Some(
201                extract_protobuf_table_schema(
202                    &stream_source_info,
203                    &options_with_secret,
204                    &mut format_encode_options_to_consume,
205                )
206                .await?,
207            )
208        }
209        (format @ (Format::Plain | Format::Upsert | Format::Debezium), Encode::Avro) => {
210            if format_encode_options_to_consume
211                .remove(AWS_GLUE_SCHEMA_ARN_KEY)
212                .is_none()
213            {
214                // Legacy logic that assumes either `schema.location` or confluent `schema.registry`.
215                // The handling of newly added aws glue is centralized in `connector::parser`.
216                // TODO(xiangjinwu): move these option parsing to `connector::parser` as well.
217
218                let (row_schema_location, use_schema_registry) =
219                    get_schema_location(&mut format_encode_options_to_consume)?;
220
221                if matches!(format, Format::Debezium) && !use_schema_registry {
222                    return Err(RwError::from(ProtocolError(
223                        "schema location for DEBEZIUM_AVRO row format is not supported".to_owned(),
224                    )));
225                }
226
227                let message_name = try_consume_string_from_options(
228                    &mut format_encode_options_to_consume,
229                    MESSAGE_NAME_KEY,
230                );
231                let name_strategy = get_sr_name_strategy_check(
232                    &mut format_encode_options_to_consume,
233                    use_schema_registry,
234                )?;
235
236                stream_source_info.use_schema_registry = use_schema_registry;
237                stream_source_info
238                    .row_schema_location
239                    .clone_from(&row_schema_location.0);
240                stream_source_info.proto_message_name =
241                    message_name.unwrap_or(AstString("".into())).0;
242                stream_source_info.key_message_name =
243                    get_key_message_name(&mut format_encode_options_to_consume);
244                stream_source_info.name_strategy =
245                    name_strategy.unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32);
246            }
247
248            Some(
249                extract_avro_table_schema(
250                    &stream_source_info,
251                    &options_with_secret,
252                    &mut format_encode_options_to_consume,
253                    matches!(format, Format::Debezium),
254                )
255                .await?,
256            )
257        }
258        (Format::Plain, Encode::Csv) => {
259            let chars =
260                consume_string_from_options(&mut format_encode_options_to_consume, "delimiter")?.0;
261            let delimiter = get_delimiter(chars.as_str()).context("failed to parse delimiter")?;
262            let has_header = try_consume_string_from_options(
263                &mut format_encode_options_to_consume,
264                "without_header",
265            )
266            .map(|s| s.0 == "false")
267            .unwrap_or(true);
268
269            if is_kafka && has_header {
270                return Err(RwError::from(ProtocolError(
271                    "CSV HEADER is not supported when creating table with Kafka connector"
272                        .to_owned(),
273                )));
274            }
275
276            stream_source_info.csv_delimiter = delimiter as i32;
277            stream_source_info.csv_has_header = has_header;
278
279            None
280        }
281        // For parquet format, this step is implemented in parquet parser.
282        (Format::Plain, Encode::Parquet) => None,
283        (
284            Format::Plain | Format::Upsert | Format::Maxwell | Format::Canal | Format::Debezium,
285            Encode::Json,
286        ) => {
287            if matches!(
288                format_encode.format,
289                Format::Plain | Format::Upsert | Format::Debezium
290            ) {
291                // Parse the value but throw it away.
292                // It would be too late to report error in `SpecificParserConfig::new`,
293                // which leads to recovery loop.
294                // TODO: rely on SpecificParserConfig::new to validate, like Avro
295                TimestamptzHandling::from_options(&format_encode_options_to_consume)
296                    .map_err(|err| InvalidInputSyntax(err.message))?;
297                try_consume_string_from_options(
298                    &mut format_encode_options_to_consume,
299                    TimestamptzHandling::OPTION_KEY,
300                );
301            }
302
303            let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?;
304            stream_source_info.use_schema_registry =
305                json_schema_infer_use_schema_registry(&schema_config);
306
307            extract_json_table_schema(
308                &schema_config,
309                &options_with_secret,
310                &mut format_encode_options_to_consume,
311            )
312            .await?
313        }
314        (Format::None, Encode::None) => {
315            if options_with_secret.is_iceberg_connector() {
316                Some(
317                    extract_iceberg_columns(&options_with_secret)
318                        .await
319                        .map_err(|err| ProtocolError(err.to_report_string()))?,
320                )
321            } else if options_with_secret
322                .get(UPSTREAM_SOURCE_KEY)
323                .is_some_and(|s| s.eq_ignore_ascii_case(ADBC_SNOWFLAKE_CONNECTOR))
324            {
325                Some(
326                    extract_adbc_snowflake_columns(&options_with_secret)
327                        .await
328                        .map_err(|err| ProtocolError(err.to_report_string()))?,
329                )
330            } else {
331                None
332            }
333        }
334        (format, encoding) => {
335            return Err(RwError::from(ProtocolError(format!(
336                "Unknown combination {:?} {:?}",
337                format, encoding
338            ))));
339        }
340    };
341
342    if !format_encode_options_to_consume.is_empty() {
343        let err_string = format!(
344            "Get unknown format_encode_options for {:?} {:?}: {}",
345            format_encode.format,
346            format_encode.row_encode,
347            format_encode_options_to_consume
348                .keys()
349                .cloned()
350                .collect::<Vec<String>>()
351                .join(","),
352        );
353        session.notice_to_user(err_string);
354    }
355    Ok((columns, stream_source_info))
356}
357
358fn bind_columns_from_source_for_cdc(
359    session: &SessionImpl,
360    format_encode: &FormatEncodeOptions,
361) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
362    let with_options = WithOptions::try_from(format_encode.row_options())?;
363    if !with_options.connection_ref().is_empty() {
364        return Err(RwError::from(NotSupported(
365            "CDC connector does not support connection ref yet".to_owned(),
366            "Explicitly specify the connection in WITH clause".to_owned(),
367        )));
368    }
369    let (format_encode_options, format_encode_secret_refs) =
370        resolve_secret_ref_in_with_options(with_options, session)?.into_parts();
371
372    // Need real secret to access the schema registry
373    let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets(
374        format_encode_options.clone(),
375        format_encode_secret_refs.clone(),
376    )?;
377
378    match (&format_encode.format, &format_encode.row_encode) {
379        (Format::Plain, Encode::Json) => (),
380        (format, encoding) => {
381            // Note: parser will also check this. Just be extra safe here
382            return Err(RwError::from(ProtocolError(format!(
383                "Row format for CDC connectors should be either omitted or set to `FORMAT PLAIN ENCODE JSON`, got: {:?} {:?}",
384                format, encoding
385            ))));
386        }
387    };
388
389    let columns = ColumnCatalog::debezium_cdc_source_cols().to_vec();
390    let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?;
391
392    let stream_source_info = StreamSourceInfo {
393        format: format_to_prost(&format_encode.format) as i32,
394        row_encode: row_encode_to_prost(&format_encode.row_encode) as i32,
395        format_encode_options,
396        use_schema_registry: json_schema_infer_use_schema_registry(&schema_config),
397        cdc_source_job: true,
398        is_distributed: false,
399        format_encode_secret_refs,
400        ..Default::default()
401    };
402    if !format_encode_options_to_consume.is_empty() {
403        let err_string = format!(
404            "Get unknown format_encode_options for {:?} {:?}: {}",
405            format_encode.format,
406            format_encode.row_encode,
407            format_encode_options_to_consume
408                .keys()
409                .cloned()
410                .collect::<Vec<String>>()
411                .join(","),
412        );
413        session.notice_to_user(err_string);
414    }
415    Ok((Some(columns), stream_source_info))
416}
417
418fn format_to_prost(format: &Format) -> FormatType {
419    match format {
420        Format::Native => FormatType::Native,
421        Format::Plain => FormatType::Plain,
422        Format::Upsert => FormatType::Upsert,
423        Format::Debezium => FormatType::Debezium,
424        Format::DebeziumMongo => FormatType::DebeziumMongo,
425        Format::Maxwell => FormatType::Maxwell,
426        Format::Canal => FormatType::Canal,
427        Format::None => FormatType::None,
428    }
429}
430fn row_encode_to_prost(row_encode: &Encode) -> EncodeType {
431    match row_encode {
432        Encode::Native => EncodeType::Native,
433        Encode::Json => EncodeType::Json,
434        Encode::Avro => EncodeType::Avro,
435        Encode::Protobuf => EncodeType::Protobuf,
436        Encode::Csv => EncodeType::Csv,
437        Encode::Bytes => EncodeType::Bytes,
438        Encode::Template => EncodeType::Template,
439        Encode::Parquet => EncodeType::Parquet,
440        Encode::None => EncodeType::None,
441        Encode::Text => EncodeType::Text,
442    }
443}
444
445pub fn get_schema_location(
446    format_encode_options: &mut BTreeMap<String, String>,
447) -> Result<(AstString, bool)> {
448    let schema_location = try_consume_string_from_options(format_encode_options, "schema.location");
449    let schema_registry = try_consume_string_from_options(format_encode_options, "schema.registry");
450    match (schema_location, schema_registry) {
451        (None, None) => Err(RwError::from(ProtocolError(
452            "missing either a schema location or a schema registry".to_owned(),
453        ))),
454        (None, Some(schema_registry)) => Ok((schema_registry, true)),
455        (Some(schema_location), None) => Ok((schema_location, false)),
456        (Some(_), Some(_)) => Err(RwError::from(ProtocolError(
457            "only need either the schema location or the schema registry".to_owned(),
458        ))),
459    }
460}
461
462pub fn schema_has_schema_registry(schema: &FormatEncodeOptions) -> bool {
463    match schema.row_encode {
464        Encode::Avro | Encode::Protobuf => true,
465        Encode::Json => {
466            let mut options = WithOptions::try_from(schema.row_options()).unwrap();
467            matches!(get_json_schema_location(options.inner_mut()), Ok(Some(_)))
468        }
469        _ => false,
470    }
471}
472
473#[inline]
474fn get_name_strategy_or_default(name_strategy: Option<AstString>) -> Result<Option<i32>> {
475    match name_strategy {
476        None => Ok(None),
477        Some(name) => Ok(Some(name_strategy_from_str(name.0.as_str())
478            .ok_or_else(|| RwError::from(ProtocolError(format!("\
479            expect strategy name in topic_name_strategy, record_name_strategy and topic_record_name_strategy, but got {}", name))))? as i32)),
480    }
481}