risingwave_frontend/handler/create_source/
external_schema.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! bind columns from external schema
16
17use risingwave_connector::source::ADBC_SNOWFLAKE_CONNECTOR;
18
19use super::*;
20
21mod json;
22use json::*;
23mod avro;
24use avro::extract_avro_table_schema;
25pub mod debezium;
26pub mod iceberg;
27use iceberg::extract_iceberg_columns;
28mod protobuf;
29use protobuf::extract_protobuf_table_schema;
30pub mod nexmark;
31
32/// Define a feature-gated function from a module.
33///
34/// This macro conditionally includes a module and re-exports a function from it.
35/// When the feature is disabled, it generates a stub function that returns an error.
36macro_rules! feature_gated_function {
37    (
38        mod $mod_name:ident,
39        $feature_name:literal,
40        async fn $func_name:ident ( $( $param_name:ident : $param_type:ty ),* $(,)? ) -> $ret_type:ty
41    ) => {
42        #[cfg(feature = $feature_name)]
43        pub mod $mod_name;
44        #[cfg(feature = $feature_name)]
45        use $mod_name::$func_name;
46
47        #[cfg(not(feature = $feature_name))]
48        #[allow(unused_variables)]
49        pub async fn $func_name( $( $param_name : $param_type ),* ) -> $ret_type {
50            Err(anyhow::anyhow!(
51                "Feature `{}` is not enabled at compile time. \
52                Please enable it in `Cargo.toml` and rebuild.",
53                $feature_name
54            ))
55        }
56    };
57}
58
59feature_gated_function!(
60    mod adbc_snowflake,
61    "source-adbc_snowflake",
62    async fn extract_adbc_snowflake_columns(
63        with_properties: &WithOptionsSecResolved,
64    ) -> anyhow::Result<Vec<ColumnCatalog>>
65);
66
67/// Resolves the schema of the source from external schema file.
68/// See <https://docs.risingwave.com/sql/commands/sql-create-source> for more information.
69///
70/// Note: the returned schema strictly corresponds to the schema.
71/// Other special columns like additional columns (`INCLUDE`), and `row_id` column are not included.
72pub async fn bind_columns_from_source(
73    session: &SessionImpl,
74    format_encode: &FormatEncodeOptions,
75    with_properties: Either<&WithOptions, &WithOptionsSecResolved>,
76    create_source_type: CreateSourceType,
77) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
78    let (columns_from_resolve_source, mut source_info) =
79        if create_source_type == CreateSourceType::SharedCdc {
80            bind_columns_from_source_for_cdc(session, format_encode)?
81        } else {
82            bind_columns_from_source_for_non_cdc(session, format_encode, with_properties).await?
83        };
84    if create_source_type.is_shared() {
85        // Note: this field should be called is_shared. Check field doc for more details.
86        source_info.cdc_source_job = true;
87        source_info.is_distributed = create_source_type == CreateSourceType::SharedNonCdc;
88    }
89    Ok((columns_from_resolve_source, source_info))
90}
91
92async fn bind_columns_from_source_for_non_cdc(
93    session: &SessionImpl,
94    format_encode: &FormatEncodeOptions,
95    with_properties: Either<&WithOptions, &WithOptionsSecResolved>,
96) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
97    const MESSAGE_NAME_KEY: &str = "message";
98    const KEY_MESSAGE_NAME_KEY: &str = "key.message";
99    const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy";
100
101    let options_with_secret = match with_properties {
102        Either::Left(options) => {
103            let (sec_resolve_props, connection_type, _) = resolve_connection_ref_and_secret_ref(
104                options.clone(),
105                session,
106                Some(TelemetryDatabaseObject::Source),
107            )?;
108            if !SOURCE_ALLOWED_CONNECTION_CONNECTOR.contains(&connection_type) {
109                return Err(RwError::from(ProtocolError(format!(
110                    "connection type {:?} is not allowed, allowed types: {:?}",
111                    connection_type, SOURCE_ALLOWED_CONNECTION_CONNECTOR
112                ))));
113            }
114
115            sec_resolve_props
116        }
117        Either::Right(options_with_secret) => options_with_secret.clone(),
118    };
119
120    let is_kafka: bool = options_with_secret.is_kafka_connector();
121
122    // todo: need to resolve connection ref for schema registry
123    let (sec_resolve_props, connection_type, schema_registry_conn_ref) =
124        resolve_connection_ref_and_secret_ref(
125            WithOptions::try_from(format_encode.row_options())?,
126            session,
127            Some(TelemetryDatabaseObject::Source),
128        )?;
129    ensure_connection_type_allowed(connection_type, &SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY)?;
130
131    let (format_encode_options, format_encode_secret_refs) = sec_resolve_props.into_parts();
132    // Need real secret to access the schema registry
133    let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets(
134        format_encode_options.clone(),
135        format_encode_secret_refs.clone(),
136    )?;
137
138    fn get_key_message_name(options: &mut BTreeMap<String, String>) -> Option<String> {
139        consume_string_from_options(options, KEY_MESSAGE_NAME_KEY)
140            .map(|ele| Some(ele.0))
141            .unwrap_or(None)
142    }
143    fn get_sr_name_strategy_check(
144        options: &mut BTreeMap<String, String>,
145        use_sr: bool,
146    ) -> Result<Option<i32>> {
147        let name_strategy = get_name_strategy_or_default(try_consume_string_from_options(
148            options,
149            NAME_STRATEGY_KEY,
150        ))?;
151        if !use_sr && name_strategy.is_some() {
152            return Err(RwError::from(ProtocolError(
153                "schema registry name strategy only works with schema registry enabled".to_owned(),
154            )));
155        }
156        Ok(name_strategy)
157    }
158
159    let mut stream_source_info = StreamSourceInfo {
160        format: format_to_prost(&format_encode.format) as i32,
161        row_encode: row_encode_to_prost(&format_encode.row_encode) as i32,
162        format_encode_options,
163        format_encode_secret_refs,
164        connection_id: schema_registry_conn_ref,
165        ..Default::default()
166    };
167
168    if format_encode.format == Format::Debezium {
169        try_consume_string_from_options(&mut format_encode_options_to_consume, DEBEZIUM_IGNORE_KEY);
170    }
171
172    let columns = match (&format_encode.format, &format_encode.row_encode) {
173        (Format::Native, Encode::Native)
174        | (Format::Plain, Encode::Bytes)
175        | (Format::DebeziumMongo, Encode::Json) => None,
176        (Format::Plain, Encode::Protobuf) | (Format::Upsert, Encode::Protobuf) => {
177            let (row_schema_location, use_schema_registry) =
178                get_schema_location(&mut format_encode_options_to_consume)?;
179            let message_name = consume_string_from_options(
180                &mut format_encode_options_to_consume,
181                MESSAGE_NAME_KEY,
182            )?;
183            let name_strategy = get_sr_name_strategy_check(
184                &mut format_encode_options_to_consume,
185                use_schema_registry,
186            )?;
187
188            stream_source_info.use_schema_registry = use_schema_registry;
189            stream_source_info
190                .row_schema_location
191                .clone_from(&row_schema_location.0);
192            stream_source_info
193                .proto_message_name
194                .clone_from(&message_name.0);
195            stream_source_info.key_message_name =
196                get_key_message_name(&mut format_encode_options_to_consume);
197            stream_source_info.name_strategy =
198                name_strategy.unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32);
199
200            Some(
201                extract_protobuf_table_schema(
202                    &stream_source_info,
203                    &options_with_secret,
204                    &mut format_encode_options_to_consume,
205                )
206                .await?,
207            )
208        }
209        (format @ (Format::Plain | Format::Upsert | Format::Debezium), Encode::Avro) => {
210            if format_encode_options_to_consume
211                .remove(AWS_GLUE_SCHEMA_ARN_KEY)
212                .is_none()
213            {
214                // Legacy logic that assumes either `schema.location` or confluent `schema.registry`.
215                // The handling of newly added aws glue is centralized in `connector::parser`.
216                // TODO(xiangjinwu): move these option parsing to `connector::parser` as well.
217
218                let (row_schema_location, use_schema_registry) =
219                    get_schema_location(&mut format_encode_options_to_consume)?;
220
221                if matches!(format, Format::Debezium) && !use_schema_registry {
222                    return Err(RwError::from(ProtocolError(
223                        "schema location for DEBEZIUM_AVRO row format is not supported".to_owned(),
224                    )));
225                }
226
227                let message_name = try_consume_string_from_options(
228                    &mut format_encode_options_to_consume,
229                    MESSAGE_NAME_KEY,
230                );
231                let name_strategy = get_sr_name_strategy_check(
232                    &mut format_encode_options_to_consume,
233                    use_schema_registry,
234                )?;
235
236                stream_source_info.use_schema_registry = use_schema_registry;
237                stream_source_info
238                    .row_schema_location
239                    .clone_from(&row_schema_location.0);
240                stream_source_info.proto_message_name =
241                    message_name.unwrap_or(AstString("".into())).0;
242                stream_source_info.key_message_name =
243                    get_key_message_name(&mut format_encode_options_to_consume);
244                stream_source_info.name_strategy =
245                    name_strategy.unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32);
246            }
247
248            Some(
249                extract_avro_table_schema(
250                    &stream_source_info,
251                    &options_with_secret,
252                    &mut format_encode_options_to_consume,
253                    matches!(format, Format::Debezium),
254                )
255                .await?,
256            )
257        }
258        (Format::Plain, Encode::Csv) => {
259            let chars =
260                consume_string_from_options(&mut format_encode_options_to_consume, "delimiter")?.0;
261            let delimiter = get_delimiter(chars.as_str()).context("failed to parse delimiter")?;
262            let has_header = try_consume_string_from_options(
263                &mut format_encode_options_to_consume,
264                "without_header",
265            )
266            .map(|s| s.0 == "false")
267            .unwrap_or(true);
268
269            if is_kafka && has_header {
270                return Err(RwError::from(ProtocolError(
271                    "CSV HEADER is not supported when creating table with Kafka connector"
272                        .to_owned(),
273                )));
274            }
275
276            stream_source_info.csv_delimiter = delimiter as i32;
277            stream_source_info.csv_has_header = has_header;
278
279            None
280        }
281        // For parquet format, this step is implemented in parquet parser.
282        (Format::Plain, Encode::Parquet) => None,
283        (
284            Format::Plain | Format::Upsert | Format::Maxwell | Format::Canal | Format::Debezium,
285            Encode::Json,
286        ) => {
287            if matches!(
288                format_encode.format,
289                Format::Plain | Format::Upsert | Format::Debezium
290            ) {
291                // Parse the value but throw it away.
292                // It would be too late to report error in `SpecificParserConfig::new`,
293                // which leads to recovery loop.
294                // TODO: rely on SpecificParserConfig::new to validate, like Avro
295                if let Some(value) =
296                    format_encode_options_to_consume.get(TimestamptzHandling::OPTION_KEY)
297                {
298                    TimestamptzHandling::from_options(value)
299                        .map_err(|err| InvalidInputSyntax(err.message))?;
300                }
301                try_consume_string_from_options(
302                    &mut format_encode_options_to_consume,
303                    TimestamptzHandling::OPTION_KEY,
304                );
305            }
306
307            let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?;
308            stream_source_info.use_schema_registry =
309                json_schema_infer_use_schema_registry(&schema_config);
310
311            extract_json_table_schema(
312                &schema_config,
313                &options_with_secret,
314                &mut format_encode_options_to_consume,
315            )
316            .await?
317        }
318        (Format::None, Encode::None) => {
319            if options_with_secret.is_iceberg_connector() {
320                Some(
321                    extract_iceberg_columns(&options_with_secret)
322                        .await
323                        .map_err(|err| ProtocolError(err.to_report_string()))?,
324                )
325            } else if options_with_secret
326                .get(UPSTREAM_SOURCE_KEY)
327                .is_some_and(|s| s.eq_ignore_ascii_case(ADBC_SNOWFLAKE_CONNECTOR))
328            {
329                Some(
330                    extract_adbc_snowflake_columns(&options_with_secret)
331                        .await
332                        .map_err(|err| ProtocolError(err.to_report_string()))?,
333                )
334            } else {
335                None
336            }
337        }
338        (format, encoding) => {
339            return Err(RwError::from(ProtocolError(format!(
340                "Unknown combination {:?} {:?}",
341                format, encoding
342            ))));
343        }
344    };
345
346    if !format_encode_options_to_consume.is_empty() {
347        let err_string = format!(
348            "Get unknown format_encode_options for {:?} {:?}: {}",
349            format_encode.format,
350            format_encode.row_encode,
351            format_encode_options_to_consume
352                .keys()
353                .cloned()
354                .collect::<Vec<String>>()
355                .join(","),
356        );
357        session.notice_to_user(err_string);
358    }
359    Ok((columns, stream_source_info))
360}
361
362fn bind_columns_from_source_for_cdc(
363    session: &SessionImpl,
364    format_encode: &FormatEncodeOptions,
365) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
366    let with_options = WithOptions::try_from(format_encode.row_options())?;
367    if !with_options.connection_ref().is_empty() {
368        return Err(RwError::from(NotSupported(
369            "CDC connector does not support connection ref yet".to_owned(),
370            "Explicitly specify the connection in WITH clause".to_owned(),
371        )));
372    }
373    let (format_encode_options, format_encode_secret_refs) =
374        resolve_secret_ref_in_with_options(with_options, session)?.into_parts();
375
376    // Need real secret to access the schema registry
377    let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets(
378        format_encode_options.clone(),
379        format_encode_secret_refs.clone(),
380    )?;
381
382    match (&format_encode.format, &format_encode.row_encode) {
383        (Format::Plain, Encode::Json) => (),
384        (format, encoding) => {
385            // Note: parser will also check this. Just be extra safe here
386            return Err(RwError::from(ProtocolError(format!(
387                "Row format for CDC connectors should be either omitted or set to `FORMAT PLAIN ENCODE JSON`, got: {:?} {:?}",
388                format, encoding
389            ))));
390        }
391    };
392
393    let columns = ColumnCatalog::debezium_cdc_source_cols().to_vec();
394    let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?;
395
396    let stream_source_info = StreamSourceInfo {
397        format: format_to_prost(&format_encode.format) as i32,
398        row_encode: row_encode_to_prost(&format_encode.row_encode) as i32,
399        format_encode_options,
400        use_schema_registry: json_schema_infer_use_schema_registry(&schema_config),
401        cdc_source_job: true,
402        is_distributed: false,
403        format_encode_secret_refs,
404        ..Default::default()
405    };
406    if !format_encode_options_to_consume.is_empty() {
407        let err_string = format!(
408            "Get unknown format_encode_options for {:?} {:?}: {}",
409            format_encode.format,
410            format_encode.row_encode,
411            format_encode_options_to_consume
412                .keys()
413                .cloned()
414                .collect::<Vec<String>>()
415                .join(","),
416        );
417        session.notice_to_user(err_string);
418    }
419    Ok((Some(columns), stream_source_info))
420}
421
422fn format_to_prost(format: &Format) -> FormatType {
423    match format {
424        Format::Native => FormatType::Native,
425        Format::Plain => FormatType::Plain,
426        Format::Upsert => FormatType::Upsert,
427        Format::Debezium => FormatType::Debezium,
428        Format::DebeziumMongo => FormatType::DebeziumMongo,
429        Format::Maxwell => FormatType::Maxwell,
430        Format::Canal => FormatType::Canal,
431        Format::None => FormatType::None,
432    }
433}
434fn row_encode_to_prost(row_encode: &Encode) -> EncodeType {
435    match row_encode {
436        Encode::Native => EncodeType::Native,
437        Encode::Json => EncodeType::Json,
438        Encode::Avro => EncodeType::Avro,
439        Encode::Protobuf => EncodeType::Protobuf,
440        Encode::Csv => EncodeType::Csv,
441        Encode::Bytes => EncodeType::Bytes,
442        Encode::Template => EncodeType::Template,
443        Encode::Parquet => EncodeType::Parquet,
444        Encode::None => EncodeType::None,
445        Encode::Text => EncodeType::Text,
446    }
447}
448
449pub fn get_schema_location(
450    format_encode_options: &mut BTreeMap<String, String>,
451) -> Result<(AstString, bool)> {
452    let schema_location = try_consume_string_from_options(format_encode_options, "schema.location");
453    let schema_registry = try_consume_string_from_options(format_encode_options, "schema.registry");
454    match (schema_location, schema_registry) {
455        (None, None) => Err(RwError::from(ProtocolError(
456            "missing either a schema location or a schema registry".to_owned(),
457        ))),
458        (None, Some(schema_registry)) => Ok((schema_registry, true)),
459        (Some(schema_location), None) => Ok((schema_location, false)),
460        (Some(_), Some(_)) => Err(RwError::from(ProtocolError(
461            "only need either the schema location or the schema registry".to_owned(),
462        ))),
463    }
464}
465
466pub fn schema_has_schema_registry(schema: &FormatEncodeOptions) -> bool {
467    match schema.row_encode {
468        Encode::Avro | Encode::Protobuf => true,
469        Encode::Json => {
470            let mut options = WithOptions::try_from(schema.row_options()).unwrap();
471            matches!(get_json_schema_location(options.inner_mut()), Ok(Some(_)))
472        }
473        _ => false,
474    }
475}
476
477#[inline]
478fn get_name_strategy_or_default(name_strategy: Option<AstString>) -> Result<Option<i32>> {
479    match name_strategy {
480        None => Ok(None),
481        Some(name) => Ok(Some(name_strategy_from_str(name.0.as_str())
482            .ok_or_else(|| RwError::from(ProtocolError(format!("\
483            expect strategy name in topic_name_strategy, record_name_strategy and topic_record_name_strategy, but got {}", name))))? as i32)),
484    }
485}