1use super::*;
18
19mod json;
20use json::*;
21mod avro;
22use avro::extract_avro_table_schema;
23pub mod debezium;
24pub mod iceberg;
25use iceberg::extract_iceberg_columns;
26mod protobuf;
27use protobuf::extract_protobuf_table_schema;
28pub mod nexmark;
29
30pub async fn bind_columns_from_source(
36 session: &SessionImpl,
37 format_encode: &FormatEncodeOptions,
38 with_properties: Either<&WithOptions, &WithOptionsSecResolved>,
39 create_source_type: CreateSourceType,
40) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
41 let (columns_from_resolve_source, mut source_info) =
42 if create_source_type == CreateSourceType::SharedCdc {
43 bind_columns_from_source_for_cdc(session, format_encode)?
44 } else {
45 bind_columns_from_source_for_non_cdc(session, format_encode, with_properties).await?
46 };
47 if create_source_type.is_shared() {
48 source_info.cdc_source_job = true;
50 source_info.is_distributed = create_source_type == CreateSourceType::SharedNonCdc;
51 }
52 Ok((columns_from_resolve_source, source_info))
53}
54
55async fn bind_columns_from_source_for_non_cdc(
56 session: &SessionImpl,
57 format_encode: &FormatEncodeOptions,
58 with_properties: Either<&WithOptions, &WithOptionsSecResolved>,
59) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
60 const MESSAGE_NAME_KEY: &str = "message";
61 const KEY_MESSAGE_NAME_KEY: &str = "key.message";
62 const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy";
63
64 let options_with_secret = match with_properties {
65 Either::Left(options) => {
66 let (sec_resolve_props, connection_type, _) = resolve_connection_ref_and_secret_ref(
67 options.clone(),
68 session,
69 TelemetryDatabaseObject::Source,
70 )?;
71 if !SOURCE_ALLOWED_CONNECTION_CONNECTOR.contains(&connection_type) {
72 return Err(RwError::from(ProtocolError(format!(
73 "connection type {:?} is not allowed, allowed types: {:?}",
74 connection_type, SOURCE_ALLOWED_CONNECTION_CONNECTOR
75 ))));
76 }
77
78 sec_resolve_props
79 }
80 Either::Right(options_with_secret) => options_with_secret.clone(),
81 };
82
83 let is_kafka: bool = options_with_secret.is_kafka_connector();
84
85 let (sec_resolve_props, connection_type, schema_registry_conn_ref) =
87 resolve_connection_ref_and_secret_ref(
88 WithOptions::try_from(format_encode.row_options())?,
89 session,
90 TelemetryDatabaseObject::Source,
91 )?;
92 ensure_connection_type_allowed(connection_type, &SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY)?;
93
94 let (format_encode_options, format_encode_secret_refs) = sec_resolve_props.into_parts();
95 let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets(
97 format_encode_options.clone(),
98 format_encode_secret_refs.clone(),
99 )?;
100
101 fn get_key_message_name(options: &mut BTreeMap<String, String>) -> Option<String> {
102 consume_string_from_options(options, KEY_MESSAGE_NAME_KEY)
103 .map(|ele| Some(ele.0))
104 .unwrap_or(None)
105 }
106 fn get_sr_name_strategy_check(
107 options: &mut BTreeMap<String, String>,
108 use_sr: bool,
109 ) -> Result<Option<i32>> {
110 let name_strategy = get_name_strategy_or_default(try_consume_string_from_options(
111 options,
112 NAME_STRATEGY_KEY,
113 ))?;
114 if !use_sr && name_strategy.is_some() {
115 return Err(RwError::from(ProtocolError(
116 "schema registry name strategy only works with schema registry enabled".to_owned(),
117 )));
118 }
119 Ok(name_strategy)
120 }
121
122 let mut stream_source_info = StreamSourceInfo {
123 format: format_to_prost(&format_encode.format) as i32,
124 row_encode: row_encode_to_prost(&format_encode.row_encode) as i32,
125 format_encode_options,
126 format_encode_secret_refs,
127 connection_id: schema_registry_conn_ref,
128 ..Default::default()
129 };
130
131 if format_encode.format == Format::Debezium {
132 try_consume_string_from_options(&mut format_encode_options_to_consume, DEBEZIUM_IGNORE_KEY);
133 }
134
135 let columns = match (&format_encode.format, &format_encode.row_encode) {
136 (Format::Native, Encode::Native)
137 | (Format::Plain, Encode::Bytes)
138 | (Format::DebeziumMongo, Encode::Json) => None,
139 (Format::Plain, Encode::Protobuf) | (Format::Upsert, Encode::Protobuf) => {
140 let (row_schema_location, use_schema_registry) =
141 get_schema_location(&mut format_encode_options_to_consume)?;
142 let message_name = consume_string_from_options(
143 &mut format_encode_options_to_consume,
144 MESSAGE_NAME_KEY,
145 )?;
146 let name_strategy = get_sr_name_strategy_check(
147 &mut format_encode_options_to_consume,
148 use_schema_registry,
149 )?;
150
151 stream_source_info.use_schema_registry = use_schema_registry;
152 stream_source_info
153 .row_schema_location
154 .clone_from(&row_schema_location.0);
155 stream_source_info
156 .proto_message_name
157 .clone_from(&message_name.0);
158 stream_source_info.key_message_name =
159 get_key_message_name(&mut format_encode_options_to_consume);
160 stream_source_info.name_strategy =
161 name_strategy.unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32);
162
163 Some(
164 extract_protobuf_table_schema(
165 &stream_source_info,
166 &options_with_secret,
167 &mut format_encode_options_to_consume,
168 )
169 .await?,
170 )
171 }
172 (format @ (Format::Plain | Format::Upsert | Format::Debezium), Encode::Avro) => {
173 if format_encode_options_to_consume
174 .remove(AWS_GLUE_SCHEMA_ARN_KEY)
175 .is_none()
176 {
177 let (row_schema_location, use_schema_registry) =
182 get_schema_location(&mut format_encode_options_to_consume)?;
183
184 if matches!(format, Format::Debezium) && !use_schema_registry {
185 return Err(RwError::from(ProtocolError(
186 "schema location for DEBEZIUM_AVRO row format is not supported".to_owned(),
187 )));
188 }
189
190 let message_name = try_consume_string_from_options(
191 &mut format_encode_options_to_consume,
192 MESSAGE_NAME_KEY,
193 );
194 let name_strategy = get_sr_name_strategy_check(
195 &mut format_encode_options_to_consume,
196 use_schema_registry,
197 )?;
198
199 stream_source_info.use_schema_registry = use_schema_registry;
200 stream_source_info
201 .row_schema_location
202 .clone_from(&row_schema_location.0);
203 stream_source_info.proto_message_name =
204 message_name.unwrap_or(AstString("".into())).0;
205 stream_source_info.key_message_name =
206 get_key_message_name(&mut format_encode_options_to_consume);
207 stream_source_info.name_strategy =
208 name_strategy.unwrap_or(PbSchemaRegistryNameStrategy::Unspecified as i32);
209 }
210
211 Some(
212 extract_avro_table_schema(
213 &stream_source_info,
214 &options_with_secret,
215 &mut format_encode_options_to_consume,
216 matches!(format, Format::Debezium),
217 )
218 .await?,
219 )
220 }
221 (Format::Plain, Encode::Csv) => {
222 let chars =
223 consume_string_from_options(&mut format_encode_options_to_consume, "delimiter")?.0;
224 let delimiter = get_delimiter(chars.as_str()).context("failed to parse delimiter")?;
225 let has_header = try_consume_string_from_options(
226 &mut format_encode_options_to_consume,
227 "without_header",
228 )
229 .map(|s| s.0 == "false")
230 .unwrap_or(true);
231
232 if is_kafka && has_header {
233 return Err(RwError::from(ProtocolError(
234 "CSV HEADER is not supported when creating table with Kafka connector"
235 .to_owned(),
236 )));
237 }
238
239 stream_source_info.csv_delimiter = delimiter as i32;
240 stream_source_info.csv_has_header = has_header;
241
242 None
243 }
244 (Format::Plain, Encode::Parquet) => None,
246 (
247 Format::Plain | Format::Upsert | Format::Maxwell | Format::Canal | Format::Debezium,
248 Encode::Json,
249 ) => {
250 if matches!(
251 format_encode.format,
252 Format::Plain | Format::Upsert | Format::Debezium
253 ) {
254 TimestamptzHandling::from_options(&format_encode_options_to_consume)
259 .map_err(|err| InvalidInputSyntax(err.message))?;
260 try_consume_string_from_options(
261 &mut format_encode_options_to_consume,
262 TimestamptzHandling::OPTION_KEY,
263 );
264 }
265
266 let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?;
267 stream_source_info.use_schema_registry =
268 json_schema_infer_use_schema_registry(&schema_config);
269
270 extract_json_table_schema(
271 &schema_config,
272 &options_with_secret,
273 &mut format_encode_options_to_consume,
274 )
275 .await?
276 }
277 (Format::None, Encode::None) => {
278 if options_with_secret.is_iceberg_connector() {
279 Some(
280 extract_iceberg_columns(&options_with_secret)
281 .await
282 .map_err(|err| ProtocolError(err.to_report_string()))?,
283 )
284 } else {
285 None
286 }
287 }
288 (format, encoding) => {
289 return Err(RwError::from(ProtocolError(format!(
290 "Unknown combination {:?} {:?}",
291 format, encoding
292 ))));
293 }
294 };
295
296 if !format_encode_options_to_consume.is_empty() {
297 let err_string = format!(
298 "Get unknown format_encode_options for {:?} {:?}: {}",
299 format_encode.format,
300 format_encode.row_encode,
301 format_encode_options_to_consume
302 .keys()
303 .map(|k| k.to_string())
304 .collect::<Vec<String>>()
305 .join(","),
306 );
307 session.notice_to_user(err_string);
308 }
309 Ok((columns, stream_source_info))
310}
311
312fn bind_columns_from_source_for_cdc(
313 session: &SessionImpl,
314 format_encode: &FormatEncodeOptions,
315) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
316 let with_options = WithOptions::try_from(format_encode.row_options())?;
317 if !with_options.connection_ref().is_empty() {
318 return Err(RwError::from(NotSupported(
319 "CDC connector does not support connection ref yet".to_owned(),
320 "Explicitly specify the connection in WITH clause".to_owned(),
321 )));
322 }
323 let (format_encode_options, format_encode_secret_refs) =
324 resolve_secret_ref_in_with_options(with_options, session)?.into_parts();
325
326 let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets(
328 format_encode_options.clone(),
329 format_encode_secret_refs.clone(),
330 )?;
331
332 match (&format_encode.format, &format_encode.row_encode) {
333 (Format::Plain, Encode::Json) => (),
334 (format, encoding) => {
335 return Err(RwError::from(ProtocolError(format!(
337 "Row format for CDC connectors should be either omitted or set to `FORMAT PLAIN ENCODE JSON`, got: {:?} {:?}",
338 format, encoding
339 ))));
340 }
341 };
342
343 let columns = ColumnCatalog::debezium_cdc_source_cols().to_vec();
344 let schema_config = get_json_schema_location(&mut format_encode_options_to_consume)?;
345
346 let stream_source_info = StreamSourceInfo {
347 format: format_to_prost(&format_encode.format) as i32,
348 row_encode: row_encode_to_prost(&format_encode.row_encode) as i32,
349 format_encode_options,
350 use_schema_registry: json_schema_infer_use_schema_registry(&schema_config),
351 cdc_source_job: true,
352 is_distributed: false,
353 format_encode_secret_refs,
354 ..Default::default()
355 };
356 if !format_encode_options_to_consume.is_empty() {
357 let err_string = format!(
358 "Get unknown format_encode_options for {:?} {:?}: {}",
359 format_encode.format,
360 format_encode.row_encode,
361 format_encode_options_to_consume
362 .keys()
363 .map(|k| k.to_string())
364 .collect::<Vec<String>>()
365 .join(","),
366 );
367 session.notice_to_user(err_string);
368 }
369 Ok((Some(columns), stream_source_info))
370}
371
372fn format_to_prost(format: &Format) -> FormatType {
373 match format {
374 Format::Native => FormatType::Native,
375 Format::Plain => FormatType::Plain,
376 Format::Upsert => FormatType::Upsert,
377 Format::Debezium => FormatType::Debezium,
378 Format::DebeziumMongo => FormatType::DebeziumMongo,
379 Format::Maxwell => FormatType::Maxwell,
380 Format::Canal => FormatType::Canal,
381 Format::None => FormatType::None,
382 }
383}
384fn row_encode_to_prost(row_encode: &Encode) -> EncodeType {
385 match row_encode {
386 Encode::Native => EncodeType::Native,
387 Encode::Json => EncodeType::Json,
388 Encode::Avro => EncodeType::Avro,
389 Encode::Protobuf => EncodeType::Protobuf,
390 Encode::Csv => EncodeType::Csv,
391 Encode::Bytes => EncodeType::Bytes,
392 Encode::Template => EncodeType::Template,
393 Encode::Parquet => EncodeType::Parquet,
394 Encode::None => EncodeType::None,
395 Encode::Text => EncodeType::Text,
396 }
397}
398
399pub fn get_schema_location(
400 format_encode_options: &mut BTreeMap<String, String>,
401) -> Result<(AstString, bool)> {
402 let schema_location = try_consume_string_from_options(format_encode_options, "schema.location");
403 let schema_registry = try_consume_string_from_options(format_encode_options, "schema.registry");
404 match (schema_location, schema_registry) {
405 (None, None) => Err(RwError::from(ProtocolError(
406 "missing either a schema location or a schema registry".to_owned(),
407 ))),
408 (None, Some(schema_registry)) => Ok((schema_registry, true)),
409 (Some(schema_location), None) => Ok((schema_location, false)),
410 (Some(_), Some(_)) => Err(RwError::from(ProtocolError(
411 "only need either the schema location or the schema registry".to_owned(),
412 ))),
413 }
414}
415
416pub fn schema_has_schema_registry(schema: &FormatEncodeOptions) -> bool {
417 match schema.row_encode {
418 Encode::Avro | Encode::Protobuf => true,
419 Encode::Json => {
420 let mut options = WithOptions::try_from(schema.row_options()).unwrap();
421 matches!(get_json_schema_location(options.inner_mut()), Ok(Some(_)))
422 }
423 _ => false,
424 }
425}
426
427#[inline]
428fn get_name_strategy_or_default(name_strategy: Option<AstString>) -> Result<Option<i32>> {
429 match name_strategy {
430 None => Ok(None),
431 Some(name) => Ok(Some(name_strategy_from_str(name.0.as_str())
432 .ok_or_else(|| RwError::from(ProtocolError(format!("\
433 expect strategy name in topic_name_strategy, record_name_strategy and topic_record_name_strategy, but got {}", name))))? as i32)),
434 }
435}