1use risingwave_connector::source::ADBC_SNOWFLAKE_CONNECTOR;
18
19use super::*;
20
21mod json;
22use json::*;
23mod avro;
24use avro::extract_avro_table_schema;
25pub mod debezium;
26pub mod iceberg;
27use iceberg::extract_iceberg_columns;
28mod protobuf;
29use protobuf::extract_protobuf_table_schema;
30pub mod nexmark;
31
32macro_rules! feature_gated_function {
37 (
38 mod $mod_name:ident,
39 $feature_name:literal,
40 async fn $func_name:ident ( $( $param_name:ident : $param_type:ty ),* $(,)? ) -> $ret_type:ty
41 ) => {
42 #[cfg(feature = $feature_name)]
43 pub mod $mod_name;
44 #[cfg(feature = $feature_name)]
45 use $mod_name::$func_name;
46
47 #[cfg(not(feature = $feature_name))]
48 #[allow(unused_variables)]
49 pub async fn $func_name( $( $param_name : $param_type ),* ) -> $ret_type {
50 Err(anyhow::anyhow!(
51 "Feature `{}` is not enabled at compile time. \
52 Please enable it in `Cargo.toml` and rebuild.",
53 $feature_name
54 ))
55 }
56 };
57}
58
59feature_gated_function!(
60 mod adbc_snowflake,
61 "source-adbc_snowflake",
62 async fn extract_adbc_snowflake_columns(
63 with_properties: &WithOptionsSecResolved,
64 ) -> anyhow::Result<Vec<ColumnCatalog>>
65);
66
67pub async fn bind_columns_from_source(
73 session: &SessionImpl,
74 format_encode: &FormatEncodeOptions,
75 with_properties: Either<&WithOptions, &WithOptionsSecResolved>,
76 create_source_type: CreateSourceType,
77) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
78 let (columns_from_resolve_source, mut source_info) =
79 if create_source_type == CreateSourceType::SharedCdc {
80 bind_columns_from_source_for_cdc(session, format_encode)?
81 } else {
82 bind_columns_from_source_for_non_cdc(session, format_encode, with_properties).await?
83 };
84 if create_source_type.is_shared() {
85 source_info.cdc_source_job = true;
87 source_info.is_distributed = create_source_type == CreateSourceType::SharedNonCdc;
88 }
89 Ok((columns_from_resolve_source, source_info))
90}
91
92async fn bind_columns_from_source_for_non_cdc(
93 session: &SessionImpl,
94 format_encode: &FormatEncodeOptions,
95 with_properties: Either<&WithOptions, &WithOptionsSecResolved>,
96) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
97 const MESSAGE_NAME_KEY: &str = "message";
98 const KEY_MESSAGE_NAME_KEY: &str = "key.message";
99 const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy";
100
101 let options_with_secret = match with_properties {
102 Either::Left(options) => {
103 let (sec_resolve_props, connection_type, _) = resolve_connection_ref_and_secret_ref(
104 options.clone(),
105 session,
106 Some(TelemetryDatabaseObject::Source),
107 )?;
108 if !SOURCE_ALLOWED_CONNECTION_CONNECTOR.contains(&connection_type) {
109 return Err(RwError::from(ProtocolError(format!(
110 "connection type {:?} is not allowed, allowed types: {:?}",
111 connection_type, SOURCE_ALLOWED_CONNECTION_CONNECTOR
112 ))));
113 }
114
115 sec_resolve_props
116 }
117 Either::Right(options_with_secret) => options_with_secret.clone(),
118 };
119
120 let is_kafka: bool = options_with_secret.is_kafka_connector();
121
122 let (sec_resolve_props, connection_type, schema_registry_conn_ref) =
124 resolve_connection_ref_and_secret_ref(
125 WithOptions::try_from(format_encode.row_options())?,
126 session,
127 Some(TelemetryDatabaseObject::Source),
128 )?;
129 ensure_connection_type_allowed(connection_type, &SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY)?;
130
131 let (format_encode_options, format_encode_secret_refs) = sec_resolve_props.into_parts();
132 let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets(
134 format_encode_options.clone(),
135 format_encode_secret_refs.clone(),
136 )?;
137
138 fn get_key_message_name(options: &mut BTreeMap<String, String>) -> Option<String> {
139 consume_string_from_options(options, KEY_MESSAGE_NAME_KEY)
140 .map(|ele| Some(ele.0))
141 .unwrap_or(None)
142 }
143 fn get_sr_name_strategy_check(
144 options: &mut BTreeMap<String, String>,
145 use_sr: bool,
146 ) -> Result<Option<i32>> {
147 let name_strategy = get_name_strategy_or_default(try_consume_string_from_options(
148 options,
149 NAME_STRATEGY_KEY,
150 ))?;
151 if !use_sr && name_strategy.is_some() {
152 return Err(RwError::from(ProtocolError(
153 "schema registry name strategy only works with schema registry enabled".to_owned(),
154 )));
155 }
156 Ok(name_strategy)
157 }
158
159 let mut stream_source_info = StreamSourceInfo {
160 format: format_to_prost(&format_encode.format) as i32,
161 row_encode: row_encode_to_prost(&format_encode.row_encode) as i32,
162 format_encode_options,
163 format_encode_secret_refs,
164 connection_id: schema_registry_conn_ref,
165 ..Default::default()
166 };
167
168 if format_encode.format == Format::Debezium {
169 try_consume_string_from_options(&mut format_encode_options_to_consume, DEBEZIUM_IGNORE_KEY);
170 }
171
172 let columns = match (&format_encode.format, &format_encode.row_encode) {
173 (Format::Native, Encode::Native)
174 | (Format::Plain, Encode::Bytes)
175 | (Format::DebeziumMongo, Encode::Json) => None,
176 (Format::Plain, Encode::Protobuf) | (Format::Upsert, Encode::Protobuf) => {
177 let (row_schema_location, use_schema_registry) =
178 get_schema_location(&mut format_encode_options_to_consume)?;
179 let message_name = consume_string_from_options(
180 &mut format_encode_options_to_consume,
181 MESSAGE_NAME_KEY,
182 )?;
183 let name_strategy = get_sr_name_strategy_check(
184 &mut format_encode_options_to_consume,
185 use_schema_registry,
186 )?;
187
188 stream_source_info.use_schema_registry = use_schema_registry;
189 stream_source_info
190 .row_schema_location
191 .clone_from(&row_schema_location.0);
192 stream_source_info
193 .proto_message_name
194 .clone_from(&message_name.0);
195 stream_source_info.key_message_name =
196 get_key_message_name(&mut format_encode_options_to_consume);
197 stream_source_info.name_strategy =
198 name_strategy.unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32);
199
200 Some(
201 extract_protobuf_table_schema(
202 &stream_source_info,
203 &options_with_secret,
204 &mut format_encode_options_to_consume,
205 )
206 .await?,
207 )
208 }
209 (format @ (Format::Plain | Format::Upsert | Format::Debezium), Encode::Avro) => {
210 if format_encode_options_to_consume
211 .remove(AWS_GLUE_SCHEMA_ARN_KEY)
212 .is_none()
213 {
214 let (row_schema_location, use_schema_registry) =
219 get_schema_location(&mut format_encode_options_to_consume)?;
220
221 if matches!(format, Format::Debezium) && !use_schema_registry {
222 return Err(RwError::from(ProtocolError(
223 "schema location for DEBEZIUM_AVRO row format is not supported".to_owned(),
224 )));
225 }
226
227 let message_name = try_consume_string_from_options(
228 &mut format_encode_options_to_consume,
229 MESSAGE_NAME_KEY,
230 );
231 let name_strategy = get_sr_name_strategy_check(
232 &mut format_encode_options_to_consume,
233 use_schema_registry,
234 )?;
235
236 stream_source_info.use_schema_registry = use_schema_registry;
237 stream_source_info
238 .row_schema_location
239 .clone_from(&row_schema_location.0);
240 stream_source_info.proto_message_name =
241 message_name.unwrap_or(AstString("".into())).0;
242 stream_source_info.key_message_name =
243 get_key_message_name(&mut format_encode_options_to_consume);
244 stream_source_info.name_strategy =
245 name_strategy.unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32);
246 }
247
248 Some(
249 extract_avro_table_schema(
250 &stream_source_info,
251 &options_with_secret,
252 &mut format_encode_options_to_consume,
253 matches!(format, Format::Debezium),
254 )
255 .await?,
256 )
257 }
258 (Format::Plain, Encode::Csv) => {
259 let chars =
260 consume_string_from_options(&mut format_encode_options_to_consume, "delimiter")?.0;
261 let delimiter = get_delimiter(chars.as_str()).context("failed to parse delimiter")?;
262 let has_header = try_consume_string_from_options(
263 &mut format_encode_options_to_consume,
264 "without_header",
265 )
266 .map(|s| s.0 == "false")
267 .unwrap_or(true);
268
269 if is_kafka && has_header {
270 return Err(RwError::from(ProtocolError(
271 "CSV HEADER is not supported when creating table with Kafka connector"
272 .to_owned(),
273 )));
274 }
275
276 stream_source_info.csv_delimiter = delimiter as i32;
277 stream_source_info.csv_has_header = has_header;
278
279 None
280 }
281 (Format::Plain, Encode::Parquet) => None,
283 (
284 Format::Plain | Format::Upsert | Format::Maxwell | Format::Canal | Format::Debezium,
285 Encode::Json,
286 ) => {
287 if matches!(
288 format_encode.format,
289 Format::Plain | Format::Upsert | Format::Debezium
290 ) {
291 TimestamptzHandling::from_options(&format_encode_options_to_consume)
296 .map_err(|err| InvalidInputSyntax(err.message))?;
297 try_consume_string_from_options(
298 &mut format_encode_options_to_consume,
299 TimestamptzHandling::OPTION_KEY,
300 );
301 }
302
303 let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?;
304 stream_source_info.use_schema_registry =
305 json_schema_infer_use_schema_registry(&schema_config);
306
307 extract_json_table_schema(
308 &schema_config,
309 &options_with_secret,
310 &mut format_encode_options_to_consume,
311 )
312 .await?
313 }
314 (Format::None, Encode::None) => {
315 if options_with_secret.is_iceberg_connector() {
316 Some(
317 extract_iceberg_columns(&options_with_secret)
318 .await
319 .map_err(|err| ProtocolError(err.to_report_string()))?,
320 )
321 } else if options_with_secret
322 .get(UPSTREAM_SOURCE_KEY)
323 .is_some_and(|s| s.eq_ignore_ascii_case(ADBC_SNOWFLAKE_CONNECTOR))
324 {
325 Some(
326 extract_adbc_snowflake_columns(&options_with_secret)
327 .await
328 .map_err(|err| ProtocolError(err.to_report_string()))?,
329 )
330 } else {
331 None
332 }
333 }
334 (format, encoding) => {
335 return Err(RwError::from(ProtocolError(format!(
336 "Unknown combination {:?} {:?}",
337 format, encoding
338 ))));
339 }
340 };
341
342 if !format_encode_options_to_consume.is_empty() {
343 let err_string = format!(
344 "Get unknown format_encode_options for {:?} {:?}: {}",
345 format_encode.format,
346 format_encode.row_encode,
347 format_encode_options_to_consume
348 .keys()
349 .cloned()
350 .collect::<Vec<String>>()
351 .join(","),
352 );
353 session.notice_to_user(err_string);
354 }
355 Ok((columns, stream_source_info))
356}
357
358fn bind_columns_from_source_for_cdc(
359 session: &SessionImpl,
360 format_encode: &FormatEncodeOptions,
361) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
362 let with_options = WithOptions::try_from(format_encode.row_options())?;
363 if !with_options.connection_ref().is_empty() {
364 return Err(RwError::from(NotSupported(
365 "CDC connector does not support connection ref yet".to_owned(),
366 "Explicitly specify the connection in WITH clause".to_owned(),
367 )));
368 }
369 let (format_encode_options, format_encode_secret_refs) =
370 resolve_secret_ref_in_with_options(with_options, session)?.into_parts();
371
372 let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets(
374 format_encode_options.clone(),
375 format_encode_secret_refs.clone(),
376 )?;
377
378 match (&format_encode.format, &format_encode.row_encode) {
379 (Format::Plain, Encode::Json) => (),
380 (format, encoding) => {
381 return Err(RwError::from(ProtocolError(format!(
383 "Row format for CDC connectors should be either omitted or set to `FORMAT PLAIN ENCODE JSON`, got: {:?} {:?}",
384 format, encoding
385 ))));
386 }
387 };
388
389 let columns = ColumnCatalog::debezium_cdc_source_cols().to_vec();
390 let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?;
391
392 let stream_source_info = StreamSourceInfo {
393 format: format_to_prost(&format_encode.format) as i32,
394 row_encode: row_encode_to_prost(&format_encode.row_encode) as i32,
395 format_encode_options,
396 use_schema_registry: json_schema_infer_use_schema_registry(&schema_config),
397 cdc_source_job: true,
398 is_distributed: false,
399 format_encode_secret_refs,
400 ..Default::default()
401 };
402 if !format_encode_options_to_consume.is_empty() {
403 let err_string = format!(
404 "Get unknown format_encode_options for {:?} {:?}: {}",
405 format_encode.format,
406 format_encode.row_encode,
407 format_encode_options_to_consume
408 .keys()
409 .cloned()
410 .collect::<Vec<String>>()
411 .join(","),
412 );
413 session.notice_to_user(err_string);
414 }
415 Ok((Some(columns), stream_source_info))
416}
417
418fn format_to_prost(format: &Format) -> FormatType {
419 match format {
420 Format::Native => FormatType::Native,
421 Format::Plain => FormatType::Plain,
422 Format::Upsert => FormatType::Upsert,
423 Format::Debezium => FormatType::Debezium,
424 Format::DebeziumMongo => FormatType::DebeziumMongo,
425 Format::Maxwell => FormatType::Maxwell,
426 Format::Canal => FormatType::Canal,
427 Format::None => FormatType::None,
428 }
429}
430fn row_encode_to_prost(row_encode: &Encode) -> EncodeType {
431 match row_encode {
432 Encode::Native => EncodeType::Native,
433 Encode::Json => EncodeType::Json,
434 Encode::Avro => EncodeType::Avro,
435 Encode::Protobuf => EncodeType::Protobuf,
436 Encode::Csv => EncodeType::Csv,
437 Encode::Bytes => EncodeType::Bytes,
438 Encode::Template => EncodeType::Template,
439 Encode::Parquet => EncodeType::Parquet,
440 Encode::None => EncodeType::None,
441 Encode::Text => EncodeType::Text,
442 }
443}
444
445pub fn get_schema_location(
446 format_encode_options: &mut BTreeMap<String, String>,
447) -> Result<(AstString, bool)> {
448 let schema_location = try_consume_string_from_options(format_encode_options, "schema.location");
449 let schema_registry = try_consume_string_from_options(format_encode_options, "schema.registry");
450 match (schema_location, schema_registry) {
451 (None, None) => Err(RwError::from(ProtocolError(
452 "missing either a schema location or a schema registry".to_owned(),
453 ))),
454 (None, Some(schema_registry)) => Ok((schema_registry, true)),
455 (Some(schema_location), None) => Ok((schema_location, false)),
456 (Some(_), Some(_)) => Err(RwError::from(ProtocolError(
457 "only need either the schema location or the schema registry".to_owned(),
458 ))),
459 }
460}
461
462pub fn schema_has_schema_registry(schema: &FormatEncodeOptions) -> bool {
463 match schema.row_encode {
464 Encode::Avro | Encode::Protobuf => true,
465 Encode::Json => {
466 let mut options = WithOptions::try_from(schema.row_options()).unwrap();
467 matches!(get_json_schema_location(options.inner_mut()), Ok(Some(_)))
468 }
469 _ => false,
470 }
471}
472
473#[inline]
474fn get_name_strategy_or_default(name_strategy: Option<AstString>) -> Result<Option<i32>> {
475 match name_strategy {
476 None => Ok(None),
477 Some(name) => Ok(Some(name_strategy_from_str(name.0.as_str())
478 .ok_or_else(|| RwError::from(ProtocolError(format!("\
479 expect strategy name in topic_name_strategy, record_name_strategy and topic_record_name_strategy, but got {}", name))))? as i32)),
480 }
481}