1use risingwave_connector::source::ADBC_SNOWFLAKE_CONNECTOR;
18
19use super::*;
20
21mod json;
22use json::*;
23mod avro;
24use avro::extract_avro_table_schema;
25pub mod debezium;
26pub mod iceberg;
27use iceberg::extract_iceberg_columns;
28mod protobuf;
29use protobuf::extract_protobuf_table_schema;
30pub mod nexmark;
31
32macro_rules! feature_gated_function {
37 (
38 mod $mod_name:ident,
39 $feature_name:literal,
40 async fn $func_name:ident ( $( $param_name:ident : $param_type:ty ),* $(,)? ) -> $ret_type:ty
41 ) => {
42 #[cfg(feature = $feature_name)]
43 pub mod $mod_name;
44 #[cfg(feature = $feature_name)]
45 use $mod_name::$func_name;
46
47 #[cfg(not(feature = $feature_name))]
48 #[allow(unused_variables)]
49 pub async fn $func_name( $( $param_name : $param_type ),* ) -> $ret_type {
50 Err(anyhow::anyhow!(
51 "Feature `{}` is not enabled at compile time. \
52 Please enable it in `Cargo.toml` and rebuild.",
53 $feature_name
54 ))
55 }
56 };
57}
58
59feature_gated_function!(
60 mod adbc_snowflake,
61 "source-adbc_snowflake",
62 async fn extract_adbc_snowflake_columns(
63 with_properties: &WithOptionsSecResolved,
64 ) -> anyhow::Result<Vec<ColumnCatalog>>
65);
66
67pub async fn bind_columns_from_source(
73 session: &SessionImpl,
74 format_encode: &FormatEncodeOptions,
75 with_properties: Either<&WithOptions, &WithOptionsSecResolved>,
76 create_source_type: CreateSourceType,
77) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
78 let (columns_from_resolve_source, mut source_info) =
79 if create_source_type == CreateSourceType::SharedCdc {
80 bind_columns_from_source_for_cdc(session, format_encode)?
81 } else {
82 bind_columns_from_source_for_non_cdc(session, format_encode, with_properties).await?
83 };
84 if create_source_type.is_shared() {
85 source_info.cdc_source_job = true;
87 source_info.is_distributed = create_source_type == CreateSourceType::SharedNonCdc;
88 }
89 Ok((columns_from_resolve_source, source_info))
90}
91
92async fn bind_columns_from_source_for_non_cdc(
93 session: &SessionImpl,
94 format_encode: &FormatEncodeOptions,
95 with_properties: Either<&WithOptions, &WithOptionsSecResolved>,
96) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
97 const MESSAGE_NAME_KEY: &str = "message";
98 const KEY_MESSAGE_NAME_KEY: &str = "key.message";
99 const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy";
100
101 let options_with_secret = match with_properties {
102 Either::Left(options) => {
103 let (sec_resolve_props, connection_type, _) = resolve_connection_ref_and_secret_ref(
104 options.clone(),
105 session,
106 Some(TelemetryDatabaseObject::Source),
107 )?;
108 if !SOURCE_ALLOWED_CONNECTION_CONNECTOR.contains(&connection_type) {
109 return Err(RwError::from(ProtocolError(format!(
110 "connection type {:?} is not allowed, allowed types: {:?}",
111 connection_type, SOURCE_ALLOWED_CONNECTION_CONNECTOR
112 ))));
113 }
114
115 sec_resolve_props
116 }
117 Either::Right(options_with_secret) => options_with_secret.clone(),
118 };
119
120 let is_kafka: bool = options_with_secret.is_kafka_connector();
121
122 let (sec_resolve_props, connection_type, schema_registry_conn_ref) =
124 resolve_connection_ref_and_secret_ref(
125 WithOptions::try_from(format_encode.row_options())?,
126 session,
127 Some(TelemetryDatabaseObject::Source),
128 )?;
129 ensure_connection_type_allowed(connection_type, &SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY)?;
130
131 let (format_encode_options, format_encode_secret_refs) = sec_resolve_props.into_parts();
132 let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets(
134 format_encode_options.clone(),
135 format_encode_secret_refs.clone(),
136 )?;
137
138 fn get_key_message_name(options: &mut BTreeMap<String, String>) -> Option<String> {
139 consume_string_from_options(options, KEY_MESSAGE_NAME_KEY)
140 .map(|ele| Some(ele.0))
141 .unwrap_or(None)
142 }
143 fn get_sr_name_strategy_check(
144 options: &mut BTreeMap<String, String>,
145 use_sr: bool,
146 ) -> Result<Option<i32>> {
147 let name_strategy = get_name_strategy_or_default(try_consume_string_from_options(
148 options,
149 NAME_STRATEGY_KEY,
150 ))?;
151 if !use_sr && name_strategy.is_some() {
152 return Err(RwError::from(ProtocolError(
153 "schema registry name strategy only works with schema registry enabled".to_owned(),
154 )));
155 }
156 Ok(name_strategy)
157 }
158
159 let mut stream_source_info = StreamSourceInfo {
160 format: format_to_prost(&format_encode.format) as i32,
161 row_encode: row_encode_to_prost(&format_encode.row_encode) as i32,
162 format_encode_options,
163 format_encode_secret_refs,
164 connection_id: schema_registry_conn_ref,
165 ..Default::default()
166 };
167
168 if format_encode.format == Format::Debezium {
169 try_consume_string_from_options(&mut format_encode_options_to_consume, DEBEZIUM_IGNORE_KEY);
170 }
171
172 let columns = match (&format_encode.format, &format_encode.row_encode) {
173 (Format::Native, Encode::Native)
174 | (Format::Plain, Encode::Bytes)
175 | (Format::DebeziumMongo, Encode::Json) => None,
176 (Format::Plain, Encode::Protobuf) | (Format::Upsert, Encode::Protobuf) => {
177 let (row_schema_location, use_schema_registry) =
178 get_schema_location(&mut format_encode_options_to_consume)?;
179 let message_name = consume_string_from_options(
180 &mut format_encode_options_to_consume,
181 MESSAGE_NAME_KEY,
182 )?;
183 let name_strategy = get_sr_name_strategy_check(
184 &mut format_encode_options_to_consume,
185 use_schema_registry,
186 )?;
187
188 stream_source_info.use_schema_registry = use_schema_registry;
189 stream_source_info
190 .row_schema_location
191 .clone_from(&row_schema_location.0);
192 stream_source_info
193 .proto_message_name
194 .clone_from(&message_name.0);
195 stream_source_info.key_message_name =
196 get_key_message_name(&mut format_encode_options_to_consume);
197 stream_source_info.name_strategy =
198 name_strategy.unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32);
199
200 Some(
201 extract_protobuf_table_schema(
202 &stream_source_info,
203 &options_with_secret,
204 &mut format_encode_options_to_consume,
205 )
206 .await?,
207 )
208 }
209 (format @ (Format::Plain | Format::Upsert | Format::Debezium), Encode::Avro) => {
210 if format_encode_options_to_consume
211 .remove(AWS_GLUE_SCHEMA_ARN_KEY)
212 .is_none()
213 {
214 let (row_schema_location, use_schema_registry) =
219 get_schema_location(&mut format_encode_options_to_consume)?;
220
221 if matches!(format, Format::Debezium) && !use_schema_registry {
222 return Err(RwError::from(ProtocolError(
223 "schema location for DEBEZIUM_AVRO row format is not supported".to_owned(),
224 )));
225 }
226
227 let message_name = try_consume_string_from_options(
228 &mut format_encode_options_to_consume,
229 MESSAGE_NAME_KEY,
230 );
231 let name_strategy = get_sr_name_strategy_check(
232 &mut format_encode_options_to_consume,
233 use_schema_registry,
234 )?;
235
236 stream_source_info.use_schema_registry = use_schema_registry;
237 stream_source_info
238 .row_schema_location
239 .clone_from(&row_schema_location.0);
240 stream_source_info.proto_message_name =
241 message_name.unwrap_or(AstString("".into())).0;
242 stream_source_info.key_message_name =
243 get_key_message_name(&mut format_encode_options_to_consume);
244 stream_source_info.name_strategy =
245 name_strategy.unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32);
246 }
247
248 Some(
249 extract_avro_table_schema(
250 &stream_source_info,
251 &options_with_secret,
252 &mut format_encode_options_to_consume,
253 matches!(format, Format::Debezium),
254 )
255 .await?,
256 )
257 }
258 (Format::Plain, Encode::Csv) => {
259 let chars =
260 consume_string_from_options(&mut format_encode_options_to_consume, "delimiter")?.0;
261 let delimiter = get_delimiter(chars.as_str()).context("failed to parse delimiter")?;
262 let has_header = try_consume_string_from_options(
263 &mut format_encode_options_to_consume,
264 "without_header",
265 )
266 .map(|s| s.0 == "false")
267 .unwrap_or(true);
268
269 if is_kafka && has_header {
270 return Err(RwError::from(ProtocolError(
271 "CSV HEADER is not supported when creating table with Kafka connector"
272 .to_owned(),
273 )));
274 }
275
276 stream_source_info.csv_delimiter = delimiter as i32;
277 stream_source_info.csv_has_header = has_header;
278
279 None
280 }
281 (Format::Plain, Encode::Parquet) => None,
283 (
284 Format::Plain | Format::Upsert | Format::Maxwell | Format::Canal | Format::Debezium,
285 Encode::Json,
286 ) => {
287 if matches!(
288 format_encode.format,
289 Format::Plain | Format::Upsert | Format::Debezium
290 ) {
291 if let Some(value) =
296 format_encode_options_to_consume.get(TimestamptzHandling::OPTION_KEY)
297 {
298 TimestamptzHandling::from_options(value)
299 .map_err(|err| InvalidInputSyntax(err.message))?;
300 }
301 try_consume_string_from_options(
302 &mut format_encode_options_to_consume,
303 TimestamptzHandling::OPTION_KEY,
304 );
305 }
306
307 let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?;
308 stream_source_info.use_schema_registry =
309 json_schema_infer_use_schema_registry(&schema_config);
310
311 extract_json_table_schema(
312 &schema_config,
313 &options_with_secret,
314 &mut format_encode_options_to_consume,
315 )
316 .await?
317 }
318 (Format::None, Encode::None) => {
319 if options_with_secret.is_iceberg_connector() {
320 Some(
321 extract_iceberg_columns(&options_with_secret)
322 .await
323 .map_err(|err| ProtocolError(err.to_report_string()))?,
324 )
325 } else if options_with_secret
326 .get(UPSTREAM_SOURCE_KEY)
327 .is_some_and(|s| s.eq_ignore_ascii_case(ADBC_SNOWFLAKE_CONNECTOR))
328 {
329 Some(
330 extract_adbc_snowflake_columns(&options_with_secret)
331 .await
332 .map_err(|err| ProtocolError(err.to_report_string()))?,
333 )
334 } else {
335 None
336 }
337 }
338 (format, encoding) => {
339 return Err(RwError::from(ProtocolError(format!(
340 "Unknown combination {:?} {:?}",
341 format, encoding
342 ))));
343 }
344 };
345
346 if !format_encode_options_to_consume.is_empty() {
347 let err_string = format!(
348 "Get unknown format_encode_options for {:?} {:?}: {}",
349 format_encode.format,
350 format_encode.row_encode,
351 format_encode_options_to_consume
352 .keys()
353 .cloned()
354 .collect::<Vec<String>>()
355 .join(","),
356 );
357 session.notice_to_user(err_string);
358 }
359 Ok((columns, stream_source_info))
360}
361
362fn bind_columns_from_source_for_cdc(
363 session: &SessionImpl,
364 format_encode: &FormatEncodeOptions,
365) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
366 let with_options = WithOptions::try_from(format_encode.row_options())?;
367 if !with_options.connection_ref().is_empty() {
368 return Err(RwError::from(NotSupported(
369 "CDC connector does not support connection ref yet".to_owned(),
370 "Explicitly specify the connection in WITH clause".to_owned(),
371 )));
372 }
373 let (format_encode_options, format_encode_secret_refs) =
374 resolve_secret_ref_in_with_options(with_options, session)?.into_parts();
375
376 let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets(
378 format_encode_options.clone(),
379 format_encode_secret_refs.clone(),
380 )?;
381
382 match (&format_encode.format, &format_encode.row_encode) {
383 (Format::Plain, Encode::Json) => (),
384 (format, encoding) => {
385 return Err(RwError::from(ProtocolError(format!(
387 "Row format for CDC connectors should be either omitted or set to `FORMAT PLAIN ENCODE JSON`, got: {:?} {:?}",
388 format, encoding
389 ))));
390 }
391 };
392
393 let columns = ColumnCatalog::debezium_cdc_source_cols().to_vec();
394 let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?;
395
396 let stream_source_info = StreamSourceInfo {
397 format: format_to_prost(&format_encode.format) as i32,
398 row_encode: row_encode_to_prost(&format_encode.row_encode) as i32,
399 format_encode_options,
400 use_schema_registry: json_schema_infer_use_schema_registry(&schema_config),
401 cdc_source_job: true,
402 is_distributed: false,
403 format_encode_secret_refs,
404 ..Default::default()
405 };
406 if !format_encode_options_to_consume.is_empty() {
407 let err_string = format!(
408 "Get unknown format_encode_options for {:?} {:?}: {}",
409 format_encode.format,
410 format_encode.row_encode,
411 format_encode_options_to_consume
412 .keys()
413 .cloned()
414 .collect::<Vec<String>>()
415 .join(","),
416 );
417 session.notice_to_user(err_string);
418 }
419 Ok((Some(columns), stream_source_info))
420}
421
422fn format_to_prost(format: &Format) -> FormatType {
423 match format {
424 Format::Native => FormatType::Native,
425 Format::Plain => FormatType::Plain,
426 Format::Upsert => FormatType::Upsert,
427 Format::Debezium => FormatType::Debezium,
428 Format::DebeziumMongo => FormatType::DebeziumMongo,
429 Format::Maxwell => FormatType::Maxwell,
430 Format::Canal => FormatType::Canal,
431 Format::None => FormatType::None,
432 }
433}
434fn row_encode_to_prost(row_encode: &Encode) -> EncodeType {
435 match row_encode {
436 Encode::Native => EncodeType::Native,
437 Encode::Json => EncodeType::Json,
438 Encode::Avro => EncodeType::Avro,
439 Encode::Protobuf => EncodeType::Protobuf,
440 Encode::Csv => EncodeType::Csv,
441 Encode::Bytes => EncodeType::Bytes,
442 Encode::Template => EncodeType::Template,
443 Encode::Parquet => EncodeType::Parquet,
444 Encode::None => EncodeType::None,
445 Encode::Text => EncodeType::Text,
446 }
447}
448
449pub fn get_schema_location(
450 format_encode_options: &mut BTreeMap<String, String>,
451) -> Result<(AstString, bool)> {
452 let schema_location = try_consume_string_from_options(format_encode_options, "schema.location");
453 let schema_registry = try_consume_string_from_options(format_encode_options, "schema.registry");
454 match (schema_location, schema_registry) {
455 (None, None) => Err(RwError::from(ProtocolError(
456 "missing either a schema location or a schema registry".to_owned(),
457 ))),
458 (None, Some(schema_registry)) => Ok((schema_registry, true)),
459 (Some(schema_location), None) => Ok((schema_location, false)),
460 (Some(_), Some(_)) => Err(RwError::from(ProtocolError(
461 "only need either the schema location or the schema registry".to_owned(),
462 ))),
463 }
464}
465
466pub fn schema_has_schema_registry(schema: &FormatEncodeOptions) -> bool {
467 match schema.row_encode {
468 Encode::Avro | Encode::Protobuf => true,
469 Encode::Json => {
470 let mut options = WithOptions::try_from(schema.row_options()).unwrap();
471 matches!(get_json_schema_location(options.inner_mut()), Ok(Some(_)))
472 }
473 _ => false,
474 }
475}
476
477#[inline]
478fn get_name_strategy_or_default(name_strategy: Option<AstString>) -> Result<Option<i32>> {
479 match name_strategy {
480 None => Ok(None),
481 Some(name) => Ok(Some(name_strategy_from_str(name.0.as_str())
482 .ok_or_else(|| RwError::from(ProtocolError(format!("\
483 expect strategy name in topic_name_strategy, record_name_strategy and topic_record_name_strategy, but got {}", name))))? as i32)),
484 }
485}