1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::LazyLock;
18
19use anyhow::{Context, anyhow};
20use either::Either;
21use external_schema::debezium::extract_debezium_avro_table_pk_columns;
22use external_schema::nexmark::check_nexmark_schema;
23use itertools::Itertools;
24use maplit::{convert_args, hashmap, hashset};
25use pgwire::pg_response::{PgResponse, StatementType};
26use rand::Rng;
27use risingwave_common::array::arrow::{IcebergArrowConvert, arrow_schema_iceberg};
28use risingwave_common::bail_not_implemented;
29use risingwave_common::catalog::{
30 ColumnCatalog, ColumnDesc, ColumnId, INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME,
31 ROW_ID_COLUMN_NAME, TableId, debug_assert_column_ids_distinct,
32};
33use risingwave_common::license::Feature;
34use risingwave_common::secret::LocalSecretManager;
35use risingwave_common::system_param::reader::SystemParamsRead;
36use risingwave_common::types::DataType;
37use risingwave_common::util::iter_util::ZipEqFast;
38use risingwave_connector::parser::additional_columns::{
39 build_additional_column_desc, get_supported_additional_columns,
40 source_add_partition_offset_cols,
41};
42use risingwave_connector::parser::{
43 AvroParserConfig, DEBEZIUM_IGNORE_KEY, DebeziumAvroParserConfig, ProtobufParserConfig,
44 SchemaLocation, SpecificParserConfig, TimestamptzHandling,
45 fetch_json_schema_and_map_to_columns,
46};
47use risingwave_connector::schema::AWS_GLUE_SCHEMA_ARN_KEY;
48use risingwave_connector::schema::schema_registry::{
49 SCHEMA_REGISTRY_BACKOFF_DURATION_KEY, SCHEMA_REGISTRY_BACKOFF_FACTOR_KEY,
50 SCHEMA_REGISTRY_CA_PEM_PATH, SCHEMA_REGISTRY_MAX_DELAY_KEY, SCHEMA_REGISTRY_PASSWORD,
51 SCHEMA_REGISTRY_RETRIES_MAX_KEY, SCHEMA_REGISTRY_USERNAME, SchemaRegistryConfig,
52 name_strategy_from_str,
53};
54use risingwave_connector::source::cdc::{
55 CDC_MONGODB_STRONG_SCHEMA_KEY, CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL,
56 CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY, CDC_WAIT_FOR_STREAMING_START_TIMEOUT,
57 CITUS_CDC_CONNECTOR, MONGODB_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR,
58 SQL_SERVER_CDC_CONNECTOR,
59};
60use risingwave_connector::source::datagen::DATAGEN_CONNECTOR;
61use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
62use risingwave_connector::source::nexmark::source::{EventType, get_event_data_types_with_names};
63use risingwave_connector::source::test_source::TEST_CONNECTOR;
64pub use risingwave_connector::source::{
65 ADBC_SNOWFLAKE_CONNECTOR, UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR,
66};
67use risingwave_connector::source::{
68 AZBLOB_CONNECTOR, ConnectorProperties, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR,
69 KINESIS_CONNECTOR, LEGACY_S3_CONNECTOR, MQTT_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR,
70 OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR,
71};
72use risingwave_connector::{AUTO_SCHEMA_CHANGE_KEY, WithPropertiesExt};
73use risingwave_pb::catalog::connection_params::PbConnectionType;
74use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc};
75use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
76use risingwave_pb::plan_common::source_refresh_mode::{RefreshMode, SourceRefreshModeStreaming};
77use risingwave_pb::plan_common::{EncodeType, FormatType, SourceRefreshMode};
78use risingwave_pb::stream_plan::PbStreamFragmentGraph;
79use risingwave_pb::telemetry::TelemetryDatabaseObject;
80use risingwave_sqlparser::ast::{
81 AstString, ColumnDef, ColumnOption, CreateSourceStatement, Encode, Format, FormatEncodeOptions,
82 ObjectName, SourceWatermark, SqlOptionValue, TableConstraint, Value, get_delimiter,
83};
84use risingwave_sqlparser::parser::{IncludeOption, IncludeOptionItem};
85use thiserror_ext::AsReport;
86
87use super::RwPgResponse;
88use crate::binder::Binder;
89use crate::catalog::CatalogError;
90use crate::catalog::source_catalog::SourceCatalog;
91use crate::error::ErrorCode::{self, Deprecated, InvalidInputSyntax, NotSupported, ProtocolError};
92use crate::error::{Result, RwError};
93use crate::expr::{Expr, ExprRewriter, SessionTimezone};
94use crate::handler::HandlerArgs;
95use crate::handler::create_table::{
96 ColumnIdGenerator, bind_pk_and_row_id_on_relation, bind_sql_column_constraints,
97 bind_sql_columns, bind_sql_pk_names, bind_table_constraints,
98};
99use crate::handler::util::{
100 SourceSchemaCompatExt, check_connector_match_connection_type, ensure_connection_type_allowed,
101 ensure_local_fs_connector_allowed,
102};
103use crate::optimizer::plan_node::generic::SourceNodeKind;
104use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext};
105use crate::session::SessionImpl;
106use crate::session::current::notice_to_user;
107use crate::utils::{
108 OverwriteOptions, resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option,
109 resolve_secret_ref_in_with_options, resolve_source_refresh_mode_in_with_option,
110};
111use crate::{OptimizerContext, WithOptions, WithOptionsSecResolved, bind_data_type, build_graph};
112
113mod external_schema;
114pub use external_schema::{
115 bind_columns_from_source, get_schema_location, schema_has_schema_registry,
116};
117mod validate;
118pub use validate::validate_compatibility;
119use validate::{SOURCE_ALLOWED_CONNECTION_CONNECTOR, SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY};
120mod additional_column;
121use additional_column::check_and_add_timestamp_column;
122pub use additional_column::handle_addition_columns;
123use risingwave_common::catalog::ICEBERG_SOURCE_PREFIX;
124use risingwave_common::id::SourceId;
125
126use crate::stream_fragmenter::GraphJobType;
127
128fn non_generated_sql_columns(columns: &[ColumnDef]) -> Vec<ColumnDef> {
129 columns
130 .iter()
131 .filter(|c| !c.is_generated())
132 .cloned()
133 .collect()
134}
135
136fn try_consume_string_from_options(
137 format_encode_options: &mut BTreeMap<String, String>,
138 key: &str,
139) -> Option<AstString> {
140 format_encode_options.remove(key).map(AstString)
141}
142
143fn try_consume_schema_registry_config_from_options(
144 format_encode_options: &mut BTreeMap<String, String>,
145) {
146 [
147 SCHEMA_REGISTRY_USERNAME,
148 SCHEMA_REGISTRY_PASSWORD,
149 SCHEMA_REGISTRY_CA_PEM_PATH,
150 SCHEMA_REGISTRY_MAX_DELAY_KEY,
151 SCHEMA_REGISTRY_BACKOFF_DURATION_KEY,
152 SCHEMA_REGISTRY_BACKOFF_FACTOR_KEY,
153 SCHEMA_REGISTRY_RETRIES_MAX_KEY,
154 ]
155 .iter()
156 .for_each(|key| {
157 try_consume_string_from_options(format_encode_options, key);
158 });
159}
160
161fn consume_string_from_options(
162 format_encode_options: &mut BTreeMap<String, String>,
163 key: &str,
164) -> Result<AstString> {
165 try_consume_string_from_options(format_encode_options, key).ok_or(RwError::from(ProtocolError(
166 format!("missing field {} in options", key),
167 )))
168}
169
170fn consume_aws_config_from_options(format_encode_options: &mut BTreeMap<String, String>) {
171 format_encode_options.retain(|key, _| !key.starts_with("aws."))
172}
173
174#[derive(Debug, Clone, Copy, PartialEq, Eq)]
175pub enum CreateSourceType {
176 SharedCdc,
177 SharedNonCdc,
179 NonShared,
180 Table,
182}
183
184impl CreateSourceType {
185 pub fn for_newly_created(
187 session: &SessionImpl,
188 with_properties: &impl WithPropertiesExt,
189 ) -> Self {
190 if with_properties.is_shareable_cdc_connector() {
191 CreateSourceType::SharedCdc
192 } else if with_properties.is_shareable_non_cdc_connector()
193 && session
194 .env()
195 .streaming_config()
196 .developer
197 .enable_shared_source
198 && session.config().streaming_use_shared_source()
199 {
200 CreateSourceType::SharedNonCdc
201 } else {
202 CreateSourceType::NonShared
203 }
204 }
205
206 pub fn for_replace(catalog: &SourceCatalog) -> Self {
207 if !catalog.info.is_shared() {
208 CreateSourceType::NonShared
209 } else if catalog.with_properties.is_shareable_cdc_connector() {
210 CreateSourceType::SharedCdc
211 } else {
212 CreateSourceType::SharedNonCdc
213 }
214 }
215
216 pub fn is_shared(&self) -> bool {
217 matches!(
218 self,
219 CreateSourceType::SharedCdc | CreateSourceType::SharedNonCdc
220 )
221 }
222}
223
224pub(crate) fn bind_all_columns(
226 format_encode: &FormatEncodeOptions,
227 cols_from_source: Option<Vec<ColumnCatalog>>,
228 cols_from_sql: Vec<ColumnCatalog>,
229 col_defs_from_sql: &[ColumnDef],
230 wildcard_idx: Option<usize>,
231 sql_column_strategy: SqlColumnStrategy,
232) -> Result<Vec<ColumnCatalog>> {
233 if let Some(cols_from_source) = cols_from_source {
234 let generated_cols_from_sql = cols_from_sql
237 .iter()
238 .filter(|c| {
239 col_defs_from_sql
240 .iter()
241 .find(|d| d.name.real_value() == c.name())
242 .unwrap()
243 .is_generated()
244 })
245 .cloned()
246 .collect_vec();
247
248 match sql_column_strategy {
249 SqlColumnStrategy::FollowUnchecked => {
251 assert!(
252 wildcard_idx.is_none(),
253 "wildcard still exists while strategy is Follows, not correctly purified?"
254 );
255 return Ok(cols_from_sql);
256 }
257
258 SqlColumnStrategy::Ignore => {}
260
261 SqlColumnStrategy::FollowChecked => {
262 let has_regular_cols_from_sql =
263 generated_cols_from_sql.len() != cols_from_sql.len();
264
265 if has_regular_cols_from_sql {
266 if wildcard_idx.is_some() {
267 return Err(RwError::from(NotSupported(
269 "When there's a wildcard (\"*\"), \
270 only generated columns are allowed in user-defined schema from SQL"
271 .to_owned(),
272 "Remove the non-generated columns".to_owned(),
273 )));
274 } else {
275 for col in &cols_from_sql {
278 if generated_cols_from_sql.contains(col) {
279 continue;
280 }
281 let Some(col_from_source) =
282 cols_from_source.iter().find(|c| c.name() == col.name())
283 else {
284 return Err(RwError::from(ProtocolError(format!(
285 "Column \"{}\" is defined in SQL but not found in the source",
286 col.name()
287 ))));
288 };
289
290 if col_from_source.data_type() != col.data_type() {
291 return Err(RwError::from(ProtocolError(format!(
292 "Data type mismatch for column \"{}\". \
293 Defined in SQL as \"{}\", but found in the source as \"{}\"",
294 col.name(),
295 col.data_type(),
296 col_from_source.data_type()
297 ))));
298 }
299 }
300 return Ok(cols_from_sql);
301 }
302 } else {
303 if wildcard_idx.is_some() {
304 } else {
308 notice_to_user("\
316 Neither wildcard (\"*\") nor regular (non-generated) columns appear in the user-defined schema from SQL. \
317 For backward compatibility, all columns from the source will be included at the beginning. \
318 For clarity, consider adding a wildcard (\"*\") to indicate where the columns from the source should be included, \
319 or specifying the columns you want to include from the source.
320 ");
321 }
322 }
323 }
324 }
325
326 let wildcard_idx = wildcard_idx.unwrap_or(0).min(generated_cols_from_sql.len());
331
332 let mut merged_cols = generated_cols_from_sql;
334 let merged_cols_r = merged_cols.split_off(wildcard_idx);
335 merged_cols.extend(cols_from_source);
336 merged_cols.extend(merged_cols_r);
337
338 Ok(merged_cols)
339 } else {
340 if wildcard_idx.is_some() {
341 return Err(RwError::from(NotSupported(
342 "Wildcard in user-defined schema is only allowed when there exists columns from external schema".to_owned(),
343 "Remove the wildcard or use a source with external schema".to_owned(),
344 )));
345 }
346 let non_generated_sql_defined_columns = non_generated_sql_columns(col_defs_from_sql);
347
348 match (&format_encode.format, &format_encode.row_encode) {
349 (Format::DebeziumMongo, Encode::Json) => {
350 let strong_schema = format_encode
351 .row_options
352 .iter()
353 .find(|k| k.name.real_value().to_lowercase() == CDC_MONGODB_STRONG_SCHEMA_KEY)
354 .map(|k| matches!(k.value, SqlOptionValue::Value(Value::Boolean(true))))
355 .unwrap_or(false);
356
357 if strong_schema {
359 let (_, id_column) = non_generated_sql_defined_columns
360 .iter()
361 .enumerate()
362 .find(|(idx, col)| *idx == 0 && col.name.real_value() == "_id")
363 .ok_or_else(|| {
364 RwError::from(ProtocolError(
365 "The `_id` column of the source with row format DebeziumMongoJson must be defined as the first column in SQL".to_owned(),
366 ))
367 })?;
368
369 let id_data_type = bind_data_type(id_column.data_type.as_ref().unwrap())?;
370 if !matches!(
371 id_data_type,
372 DataType::Varchar | DataType::Int32 | DataType::Int64 | DataType::Jsonb
373 ) {
374 return Err(RwError::from(ProtocolError(
375 "the `_id` column of the source with row format DebeziumMongoJson must be [Jsonb | Varchar | Int32 | Int64]".to_owned(),
376 )));
377 }
378
379 let mut columns = Vec::with_capacity(non_generated_sql_defined_columns.len());
380 columns.push(
381 ColumnCatalog {
383 column_desc: ColumnDesc::named("_id", 0.into(), id_data_type),
384 is_hidden: false,
385 },
386 );
387
388 for (idx, col) in non_generated_sql_defined_columns
390 .into_iter()
391 .skip(1)
393 .enumerate()
394 {
395 columns.push(ColumnCatalog {
396 column_desc: ColumnDesc::named(
397 col.name.real_value(),
398 (idx as i32).into(),
399 bind_data_type(col.data_type.as_ref().unwrap())?,
400 ),
401 is_hidden: false,
402 });
403 }
404
405 return Ok(columns);
406 }
407
408 let mut columns = vec![
409 ColumnCatalog {
410 column_desc: ColumnDesc::named("_id", 0.into(), DataType::Varchar),
411 is_hidden: false,
412 },
413 ColumnCatalog {
414 column_desc: ColumnDesc::named("payload", 0.into(), DataType::Jsonb),
415 is_hidden: false,
416 },
417 ];
418
419 if non_generated_sql_defined_columns.len() != 2
420 || non_generated_sql_defined_columns[0].name.real_value() != columns[0].name()
421 || non_generated_sql_defined_columns[1].name.real_value() != columns[1].name()
422 {
423 return Err(RwError::from(ProtocolError(
424 "the not generated columns of the source with row format DebeziumMongoJson
425 must be (_id [Jsonb | Varchar | Int32 | Int64], payload jsonb)."
426 .to_owned(),
427 )));
428 }
429 let key_data_type = bind_data_type(
431 non_generated_sql_defined_columns[0]
432 .data_type
433 .as_ref()
434 .unwrap(),
435 )?;
436 match key_data_type {
437 DataType::Jsonb | DataType::Varchar | DataType::Int32 | DataType::Int64 => {
438 columns[0].column_desc.data_type = key_data_type;
439 }
440 _ => {
441 return Err(RwError::from(ProtocolError(
442 "the `_id` column of the source with row format DebeziumMongoJson
443 must be [Jsonb | Varchar | Int32 | Int64]"
444 .to_owned(),
445 )));
446 }
447 }
448
449 let value_data_type = bind_data_type(
451 non_generated_sql_defined_columns[1]
452 .data_type
453 .as_ref()
454 .unwrap(),
455 )?;
456 if !matches!(value_data_type, DataType::Jsonb) {
457 return Err(RwError::from(ProtocolError(
458 "the `payload` column of the source with row format DebeziumMongoJson
459 must be Jsonb datatype"
460 .to_owned(),
461 )));
462 }
463 Ok(columns)
464 }
465 (Format::Plain, Encode::Bytes) => {
466 let err = Err(RwError::from(ProtocolError(
467 "ENCODE BYTES only accepts one BYTEA type column".to_owned(),
468 )));
469 if non_generated_sql_defined_columns.len() == 1 {
470 let col_data_type = bind_data_type(
472 non_generated_sql_defined_columns[0]
473 .data_type
474 .as_ref()
475 .unwrap(),
476 )?;
477 if col_data_type == DataType::Bytea {
478 Ok(cols_from_sql)
479 } else {
480 err
481 }
482 } else {
483 err
484 }
485 }
486 (_, _) => Ok(cols_from_sql),
487 }
488 }
489}
490
491fn hint_format_encode(format_encode: &FormatEncodeOptions) -> String {
493 format!(
494 r#"Hint: For FORMAT {0} ENCODE {1}, INCLUDE KEY must be specified and the key column must be used as primary key.
495example:
496 CREATE TABLE <table_name> ( PRIMARY KEY ([rw_key | <key_name>]) )
497 INCLUDE KEY [AS <key_name>]
498 WITH (...)
499 FORMAT {0} ENCODE {1}{2}
500"#,
501 format_encode.format,
502 format_encode.row_encode,
503 if format_encode.row_encode == Encode::Json || format_encode.row_encode == Encode::Bytes {
504 "".to_owned()
505 } else {
506 " (...)".to_owned()
507 }
508 )
509}
510
511pub(crate) async fn bind_source_pk(
514 format_encode: &FormatEncodeOptions,
515 source_info: &StreamSourceInfo,
516 columns: &mut [ColumnCatalog],
517 sql_defined_pk_names: Vec<String>,
518 with_properties: &WithOptionsSecResolved,
519) -> Result<Vec<String>> {
520 let sql_defined_pk = !sql_defined_pk_names.is_empty();
521 let include_key_column_name: Option<String> = {
522 columns.iter().find_map(|catalog| {
525 if matches!(
526 catalog.column_desc.additional_column.column_type,
527 Some(AdditionalColumnType::Key(_))
528 ) {
529 Some(catalog.name().to_owned())
530 } else {
531 None
532 }
533 })
534 };
535 let additional_column_names = columns
536 .iter()
537 .filter_map(|col| {
538 if col.column_desc.additional_column.column_type.is_some() {
539 Some(col.name().to_owned())
540 } else {
541 None
542 }
543 })
544 .collect_vec();
545
546 let res = match (&format_encode.format, &format_encode.row_encode) {
547 (Format::Native, Encode::Native) | (Format::None, Encode::None) | (Format::Plain, _) => {
548 sql_defined_pk_names
549 }
550
551 (Format::Upsert, Encode::Json | Encode::Avro | Encode::Protobuf) => {
554 if let Some(ref key_column_name) = include_key_column_name
555 && sql_defined_pk
556 {
557 if sql_defined_pk_names.len() != 1
562 || !key_column_name.eq(sql_defined_pk_names[0].as_str())
563 {
564 return Err(RwError::from(ProtocolError(format!(
565 "Only \"{}\" can be used as primary key\n\n{}",
566 key_column_name,
567 hint_format_encode(format_encode)
568 ))));
569 }
570 sql_defined_pk_names
571 } else {
572 return if let Some(include_key_column_name) = include_key_column_name {
574 Err(RwError::from(ProtocolError(format!(
575 "Primary key must be specified to {}\n\n{}",
576 include_key_column_name,
577 hint_format_encode(format_encode)
578 ))))
579 } else {
580 Err(RwError::from(ProtocolError(format!(
581 "INCLUDE KEY clause not set\n\n{}",
582 hint_format_encode(format_encode)
583 ))))
584 };
585 }
586 }
587
588 (Format::Debezium, Encode::Json) => {
589 if !additional_column_names.is_empty() {
590 return Err(RwError::from(ProtocolError(format!(
591 "FORMAT DEBEZIUM forbids additional columns, but got {:?}",
592 additional_column_names
593 ))));
594 }
595 if !sql_defined_pk {
596 return Err(RwError::from(ProtocolError(
597 "Primary key must be specified when creating source with FORMAT DEBEZIUM."
598 .to_owned(),
599 )));
600 }
601 sql_defined_pk_names
602 }
603 (Format::Debezium, Encode::Avro) => {
604 if !additional_column_names.is_empty() {
605 return Err(RwError::from(ProtocolError(format!(
606 "FORMAT DEBEZIUM forbids additional columns, but got {:?}",
607 additional_column_names
608 ))));
609 }
610 if sql_defined_pk {
611 sql_defined_pk_names
612 } else {
613 let pk_names =
614 extract_debezium_avro_table_pk_columns(source_info, with_properties).await?;
615 for pk_name in &pk_names {
617 columns
618 .iter()
619 .find(|c: &&ColumnCatalog| c.name().eq(pk_name))
620 .ok_or_else(|| {
621 RwError::from(ProtocolError(format!(
622 "avro's key column {} not exists in avro's row schema",
623 pk_name
624 )))
625 })?;
626 }
627 pk_names
628 }
629 }
630 (Format::DebeziumMongo, Encode::Json) => {
631 if sql_defined_pk {
632 sql_defined_pk_names
633 } else {
634 vec!["_id".to_owned()]
635 }
636 }
637
638 (Format::Maxwell, Encode::Json) => {
639 if !additional_column_names.is_empty() {
640 return Err(RwError::from(ProtocolError(format!(
641 "FORMAT MAXWELL forbids additional columns, but got {:?}",
642 additional_column_names
643 ))));
644 }
645 if !sql_defined_pk {
646 return Err(RwError::from(ProtocolError(
647 "Primary key must be specified when creating source with FORMAT MAXWELL ENCODE JSON.".to_owned(),
648 )));
649 }
650 sql_defined_pk_names
651 }
652
653 (Format::Canal, Encode::Json) => {
654 if !additional_column_names.is_empty() {
655 return Err(RwError::from(ProtocolError(format!(
656 "FORMAT CANAL forbids additional columns, but got {:?}",
657 additional_column_names
658 ))));
659 }
660 if !sql_defined_pk {
661 return Err(RwError::from(ProtocolError(
662 "Primary key must be specified when creating source with FORMAT CANAL ENCODE JSON.".to_owned(),
663 )));
664 }
665 sql_defined_pk_names
666 }
667 (format, encoding) => {
668 return Err(RwError::from(ProtocolError(format!(
669 "Unknown combination {:?} {:?}",
670 format, encoding
671 ))));
672 }
673 };
674 Ok(res)
675}
676
677pub(super) fn bind_source_watermark(
678 session: &SessionImpl,
679 name: String,
680 source_watermarks: Vec<SourceWatermark>,
681 column_catalogs: &[ColumnCatalog],
682) -> Result<Vec<WatermarkDesc>> {
683 let mut binder = Binder::new_for_ddl(session);
684 binder.bind_columns_to_context(name.clone(), column_catalogs)?;
685
686 let mut session_tz = SessionTimezone::new(session.config().timezone());
687
688 let watermark_descs = source_watermarks
689 .into_iter()
690 .map(|source_watermark| {
691 let col_name = source_watermark.column.real_value();
692 let watermark_idx = binder.get_column_binding_index(name.clone(), &col_name)?;
693
694 let expr = binder.bind_expr(&source_watermark.expr)?;
695 let expr = session_tz.rewrite_expr(expr);
699 let watermark_col_type = column_catalogs[watermark_idx].data_type();
700 let watermark_expr_type = &expr.return_type();
701 if watermark_col_type != watermark_expr_type {
702 Err(RwError::from(ErrorCode::BindError(
703 format!("The return value type of the watermark expression must be identical to the watermark column data type. Current data type of watermark return value: `{}`, column `{}`",watermark_expr_type, watermark_col_type),
704 )))
705 } else {
706 let expr_proto = expr.to_expr_proto();
707 Ok::<_, RwError>(WatermarkDesc {
708 watermark_idx: watermark_idx as u32,
709 expr: Some(expr_proto),
710 with_ttl: source_watermark.with_ttl,
711 })
712 }
713 })
714 .try_collect()?;
715 Ok(watermark_descs)
716}
717
718pub(super) fn check_format_encode(
724 props: &WithOptionsSecResolved,
725 row_id_index: Option<usize>,
726 columns: &[ColumnCatalog],
727) -> Result<()> {
728 let Some(connector) = props.get_connector() else {
729 return Ok(());
730 };
731
732 if connector == NEXMARK_CONNECTOR {
733 check_nexmark_schema(props, row_id_index, columns)
734 } else {
735 Ok(())
736 }
737}
738
739pub fn bind_connector_props(
740 handler_args: &HandlerArgs,
741 format_encode: &FormatEncodeOptions,
742 is_create_source: bool,
743) -> Result<(WithOptions, SourceRefreshMode)> {
744 let mut with_properties = handler_args.with_options.clone().into_connector_props();
745 validate_compatibility(format_encode, &mut with_properties)?;
746 let refresh_mode = {
747 let refresh_mode = resolve_source_refresh_mode_in_with_option(&mut with_properties)?;
748 if is_create_source && refresh_mode.is_some() {
749 return Err(RwError::from(ProtocolError(
750 "`refresh_mode` only supported for CREATE TABLE".to_owned(),
751 )));
752 }
753
754 refresh_mode.unwrap_or(SourceRefreshMode {
755 refresh_mode: Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})),
756 })
757 };
758
759 let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
760
761 if !is_create_source && with_properties.is_shareable_cdc_connector() {
762 return Err(RwError::from(ProtocolError(format!(
763 "directly creating a CDC table for connector {} is no longer supported; \
764 please `CREATE SOURCE` to create a shared CDC source first, \
765 then `CREATE TABLE ... FROM <source> TABLE '<database>.<table>'`",
766 with_properties.get_connector().unwrap(),
767 ))));
768 }
769 if is_create_source && create_cdc_source_job {
770 if let Some(value) = with_properties.get(AUTO_SCHEMA_CHANGE_KEY)
771 && value.parse::<bool>().map_err(|_| {
772 ErrorCode::InvalidInputSyntax(format!(
773 "invalid value of '{}' option",
774 AUTO_SCHEMA_CHANGE_KEY
775 ))
776 })?
777 {
778 Feature::CdcAutoSchemaChange.check_available()?;
779 }
780
781 with_properties.insert(CDC_SNAPSHOT_MODE_KEY.into(), CDC_SNAPSHOT_BACKFILL.into());
783 with_properties.insert(CDC_SHARING_MODE_KEY.into(), "true".into());
785 if with_properties.enable_transaction_metadata() {
787 with_properties.insert(CDC_TRANSACTIONAL_KEY.into(), "true".into());
788 }
789 if !with_properties.contains_key(CDC_WAIT_FOR_STREAMING_START_TIMEOUT) {
791 with_properties.insert(
792 CDC_WAIT_FOR_STREAMING_START_TIMEOUT.into(),
793 handler_args
794 .session
795 .config()
796 .cdc_source_wait_streaming_start_timeout()
797 .to_string(),
798 );
799 }
800 }
801 if with_properties.is_mysql_cdc_connector() {
802 with_properties
806 .entry("server.id".to_owned())
807 .or_insert(rand::rng().random_range(1..u32::MAX).to_string());
808 }
809 Ok((with_properties, refresh_mode))
810}
811
812pub enum SqlColumnStrategy {
815 FollowUnchecked,
820
821 FollowChecked,
828
829 Ignore,
834}
835
836#[expect(clippy::too_many_arguments)]
839pub async fn bind_create_source_or_table_with_connector(
840 handler_args: HandlerArgs,
841 full_name: ObjectName,
842 format_encode: FormatEncodeOptions,
843 with_properties: WithOptions,
844 sql_columns_defs: &[ColumnDef],
845 constraints: Vec<TableConstraint>,
846 wildcard_idx: Option<usize>,
847 source_watermarks: Vec<SourceWatermark>,
848 columns_from_resolve_source: Option<Vec<ColumnCatalog>>,
849 source_info: StreamSourceInfo,
850 include_column_options: IncludeOption,
851 col_id_gen: &mut ColumnIdGenerator,
852 create_source_type: CreateSourceType,
853 source_rate_limit: Option<u32>,
854 sql_column_strategy: SqlColumnStrategy,
855 refresh_mode: SourceRefreshMode,
856) -> Result<SourceCatalog> {
857 let session = &handler_args.session;
858 let db_name: &str = &session.database();
859 let (schema_name, source_name) = Binder::resolve_schema_qualified_name(db_name, &full_name)?;
860 let (database_id, schema_id) =
861 session.get_database_and_schema_id_for_create(schema_name.clone())?;
862
863 let is_create_source = create_source_type != CreateSourceType::Table;
864
865 if is_create_source {
866 if with_properties.is_batch_connector() {
868 return Err(ErrorCode::BindError(
869 "can't CREATE SOURCE with refreshable batch connector\n\nHint: use CREATE TABLE instead"
870 .to_owned(),
871 )
872 .into());
873 }
874
875 match format_encode.format {
876 Format::Debezium | Format::DebeziumMongo | Format::Maxwell | Format::Canal => {
878 return Err(ErrorCode::BindError(format!(
879 "can't CREATE SOURCE with FORMAT {}.\n\nHint: use CREATE TABLE instead\n\n{}",
880 format_encode.format,
881 hint_format_encode(&format_encode)
882 ))
883 .into());
884 }
885 Format::Upsert => {
887 notice_to_user(format!(
888 "Streaming queries on sources with `FORMAT {}` may have limitations. If your query isn't supported, consider using `CREATE TABLE` instead.",
889 format_encode.format
890 ));
891 }
892 _ => {}
893 }
894 }
895
896 let sql_pk_names = bind_sql_pk_names(sql_columns_defs, bind_table_constraints(&constraints)?)?;
897
898 if with_properties.is_iceberg_connector() {
899 if is_create_source && !sql_pk_names.is_empty() {
900 return Err(ErrorCode::NotSupported(
901 "PRIMARY KEY is not supported for Iceberg CREATE SOURCE in continuous ingestion mode."
902 .to_owned(),
903 "Iceberg streaming ingestion only supports append-only sources. Remove the PRIMARY KEY clause."
904 .to_owned(),
905 )
906 .into());
907 }
908
909 if !sql_columns_defs.is_empty() {
913 return Err(RwError::from(InvalidInputSyntax(
914 r#"Schema is automatically inferred for iceberg source and should not be specified
915
916HINT: use `CREATE SOURCE <name> WITH (...)` instead of `CREATE SOURCE <name> (<columns>) WITH (...)`."#.to_owned(),
917 )));
918 }
919 }
920
921 if with_properties.is_batch_connector()
923 && with_properties
924 .get(UPSTREAM_SOURCE_KEY)
925 .is_some_and(|s| s.eq_ignore_ascii_case(ADBC_SNOWFLAKE_CONNECTOR))
926 && !sql_columns_defs.is_empty()
927 {
928 return Err(RwError::from(InvalidInputSyntax(
929 r#"Schema is automatically inferred for ADBC Snowflake source and should not be specified
930
931HINT: use `CREATE TABLE <name> WITH (...)` instead of `CREATE TABLE <name> (<columns>) WITH (...)`."#.to_owned(),
932 )));
933 }
934 let columns_from_sql = bind_sql_columns(sql_columns_defs, false)?;
935
936 let mut columns = bind_all_columns(
937 &format_encode,
938 columns_from_resolve_source,
939 columns_from_sql,
940 sql_columns_defs,
941 wildcard_idx,
942 sql_column_strategy,
943 )?;
944
945 handle_addition_columns(
947 Some(&format_encode),
948 &with_properties,
949 include_column_options,
950 &mut columns,
951 false,
952 )?;
953
954 if columns.is_empty() {
955 return Err(RwError::from(ProtocolError(
956 "Schema definition is required, either from SQL or schema registry.".to_owned(),
957 )));
958 }
959
960 if is_create_source {
962 check_and_add_timestamp_column(&with_properties, &mut columns);
964
965 if create_source_type == CreateSourceType::SharedNonCdc {
968 let (columns_exist, additional_columns) = source_add_partition_offset_cols(
969 &columns,
970 &with_properties.get_connector().unwrap(),
971 true, );
973 for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) {
974 if !existed {
975 columns.push(ColumnCatalog::hidden(c));
976 }
977 }
978 }
979 }
980
981 let mut with_properties = with_properties;
983 resolve_privatelink_in_with_option(&mut with_properties)?;
984
985 if session
987 .env()
988 .system_params_manager()
989 .get_params()
990 .load()
991 .enforce_secret()
992 && Feature::SecretManagement.check_available().is_ok()
993 {
994 ConnectorProperties::enforce_secret_source(&with_properties)?;
996 }
997
998 let (with_properties, connection_type, connector_conn_ref) =
999 resolve_connection_ref_and_secret_ref(
1000 with_properties,
1001 session,
1002 Some(TelemetryDatabaseObject::Source),
1003 )?;
1004 ensure_connection_type_allowed(connection_type, &SOURCE_ALLOWED_CONNECTION_CONNECTOR)?;
1005
1006 if !matches!(connection_type, PbConnectionType::Unspecified) {
1008 let Some(connector) = with_properties.get_connector() else {
1009 return Err(RwError::from(ProtocolError(format!(
1010 "missing field '{}' in WITH clause",
1011 UPSTREAM_SOURCE_KEY
1012 ))));
1013 };
1014 check_connector_match_connection_type(connector.as_str(), &connection_type)?;
1015 }
1016
1017 let pk_names = bind_source_pk(
1018 &format_encode,
1019 &source_info,
1020 &mut columns,
1021 sql_pk_names,
1022 &with_properties,
1023 )
1024 .await?;
1025
1026 if let Some(duplicated_name) = columns.iter().map(|c| c.name()).duplicates().next() {
1029 return Err(ErrorCode::InvalidInputSyntax(format!(
1030 "column \"{}\" specified more than once",
1031 duplicated_name
1032 ))
1033 .into());
1034 }
1035
1036 for c in &mut columns {
1038 let original_data_type = c.data_type().clone();
1039 col_id_gen.generate(c)?;
1040 if is_create_source {
1044 c.column_desc.data_type = original_data_type;
1045 }
1046 }
1047 debug_assert_column_ids_distinct(&columns);
1048
1049 let (mut columns, pk_col_ids, row_id_index) =
1050 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
1051
1052 let watermark_descs =
1053 bind_source_watermark(session, source_name.clone(), source_watermarks, &columns)?;
1054 assert!(watermark_descs.len() <= 1);
1056 if is_create_source && watermark_descs.iter().any(|d| d.with_ttl) {
1057 return Err(ErrorCode::NotSupported(
1058 "WITH TTL is not supported in WATERMARK clause for CREATE SOURCE.".to_owned(),
1059 "Use `CREATE TABLE ... WATERMARK ... WITH TTL` instead.".to_owned(),
1060 )
1061 .into());
1062 }
1063
1064 let append_only = row_id_index.is_some();
1065 if is_create_source && !append_only && !watermark_descs.is_empty() {
1066 return Err(ErrorCode::NotSupported(
1067 "Defining watermarks on source requires the source connector to be append only."
1068 .to_owned(),
1069 "Use the key words `FORMAT PLAIN`".to_owned(),
1070 )
1071 .into());
1072 }
1073
1074 bind_sql_column_constraints(
1075 session,
1076 source_name.clone(),
1077 &mut columns,
1078 sql_columns_defs,
1080 &pk_col_ids,
1081 )?;
1082 check_format_encode(&with_properties, row_id_index, &columns)?;
1083
1084 let definition = handler_args.normalized_sql.clone();
1085
1086 let associated_table_id = if is_create_source {
1087 None
1088 } else {
1089 Some(TableId::placeholder())
1090 };
1091 let source = SourceCatalog {
1092 id: SourceId::placeholder(),
1093 name: source_name,
1094 schema_id,
1095 database_id,
1096 columns,
1097 pk_col_ids,
1098 append_only,
1099 owner: session.user_id(),
1100 info: source_info,
1101 row_id_index,
1102 with_properties,
1103 watermark_descs,
1104 associated_table_id,
1105 definition,
1106 connection_id: connector_conn_ref,
1107 created_at_epoch: None,
1108 initialized_at_epoch: None,
1109 version: INITIAL_SOURCE_VERSION_ID,
1110 created_at_cluster_version: None,
1111 initialized_at_cluster_version: None,
1112 rate_limit: source_rate_limit,
1113 refresh_mode: Some(refresh_mode),
1114 };
1115 Ok(source)
1116}
1117
1118pub async fn handle_create_source(
1119 mut handler_args: HandlerArgs,
1120 stmt: CreateSourceStatement,
1121) -> Result<RwPgResponse> {
1122 let session = handler_args.session.clone();
1123 let overwrite_options = OverwriteOptions::new(&mut handler_args);
1124
1125 if let Either::Right(resp) = session.check_relation_name_duplicated(
1126 stmt.source_name.clone(),
1127 StatementType::CREATE_SOURCE,
1128 stmt.if_not_exists,
1129 )? {
1130 return Ok(resp);
1131 }
1132
1133 if stmt
1134 .source_name
1135 .base_name()
1136 .starts_with(ICEBERG_SOURCE_PREFIX)
1137 {
1138 return Err(RwError::from(InvalidInputSyntax(format!(
1139 "Source name cannot start with reserved prefix '{}'",
1140 ICEBERG_SOURCE_PREFIX
1141 ))));
1142 }
1143
1144 if handler_args.with_options.is_empty() {
1145 return Err(RwError::from(InvalidInputSyntax(
1146 "missing WITH clause".to_owned(),
1147 )));
1148 }
1149
1150 let format_encode = stmt.format_encode.into_v2_with_warning();
1151 let (with_properties, refresh_mode) =
1152 bind_connector_props(&handler_args, &format_encode, true)?;
1153 if let Some(connector) = with_properties.get_connector() {
1154 ensure_local_fs_connector_allowed(&session, &connector)?;
1155 }
1156
1157 let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
1158 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
1159 &session,
1160 &format_encode,
1161 Either::Left(&with_properties),
1162 create_source_type,
1163 )
1164 .await?;
1165 let mut col_id_gen = ColumnIdGenerator::new_initial();
1166
1167 if stmt.columns.iter().any(|col| {
1168 col.options
1169 .iter()
1170 .any(|def| matches!(def.option, ColumnOption::NotNull))
1171 }) {
1172 return Err(RwError::from(InvalidInputSyntax(
1173 "NOT NULL constraint is not supported in source schema".to_owned(),
1174 )));
1175 }
1176
1177 let source_catalog = bind_create_source_or_table_with_connector(
1178 handler_args.clone(),
1179 stmt.source_name,
1180 format_encode,
1181 with_properties,
1182 &stmt.columns,
1183 stmt.constraints,
1184 stmt.wildcard_idx,
1185 stmt.source_watermarks,
1186 columns_from_resolve_source,
1187 source_info,
1188 stmt.include_column_options,
1189 &mut col_id_gen,
1190 create_source_type,
1191 overwrite_options.source_rate_limit,
1192 SqlColumnStrategy::FollowChecked,
1193 refresh_mode,
1194 )
1195 .await?;
1196
1197 if stmt.temporary {
1199 if session.get_temporary_source(&source_catalog.name).is_some() {
1200 return Err(CatalogError::duplicated("source", source_catalog.name.clone()).into());
1201 }
1202 session.create_temporary_source(source_catalog);
1203 return Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE));
1204 }
1205
1206 let source = source_catalog.to_prost();
1207
1208 let catalog_writer = session.catalog_writer()?;
1209
1210 if create_source_type.is_shared() {
1211 let graph = generate_stream_graph_for_source(handler_args, source_catalog)?;
1212 catalog_writer
1213 .create_source(source, Some(graph), stmt.if_not_exists)
1214 .await?;
1215 } else {
1216 catalog_writer
1218 .create_source(source, None, stmt.if_not_exists)
1219 .await?;
1220 }
1221
1222 Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE))
1223}
1224
1225pub(super) fn generate_stream_graph_for_source(
1226 handler_args: HandlerArgs,
1227 source_catalog: SourceCatalog,
1228) -> Result<PbStreamFragmentGraph> {
1229 let context = OptimizerContext::from_handler_args(handler_args);
1230 let source_node = LogicalSource::with_catalog(
1231 Rc::new(source_catalog),
1232 SourceNodeKind::CreateSharedSource,
1233 context.into(),
1234 None,
1235 )?;
1236
1237 let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?;
1238 let graph = build_graph(stream_plan, Some(GraphJobType::Source))?;
1239 Ok(graph)
1240}
1241
1242#[cfg(test)]
1243pub mod tests {
1244 use std::collections::HashMap;
1245 use std::sync::Arc;
1246
1247 use risingwave_common::catalog::{
1248 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME,
1249 };
1250 use risingwave_common::config::FrontendConfig;
1251 use risingwave_common::types::{DataType, StructType};
1252 use risingwave_pb::plan_common::EncodeType;
1253
1254 use crate::catalog::root_catalog::SchemaPath;
1255 use crate::catalog::source_catalog::SourceCatalog;
1256 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
1257
1258 const GET_COLUMN_FROM_CATALOG: fn(&Arc<SourceCatalog>) -> HashMap<&str, DataType> =
1259 |catalog: &Arc<SourceCatalog>| -> HashMap<&str, DataType> {
1260 catalog
1261 .columns
1262 .iter()
1263 .map(|col| (col.name(), col.data_type().clone()))
1264 .collect::<HashMap<&str, DataType>>()
1265 };
1266
1267 #[tokio::test]
1268 async fn test_create_source_handler() {
1269 let proto_file = create_proto_file(PROTO_FILE_DATA);
1270 let sql = format!(
1271 r#"CREATE SOURCE t
1272 WITH (connector = 'kinesis')
1273 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
1274 proto_file.path().to_str().unwrap()
1275 );
1276 let frontend = LocalFrontend::new(Default::default()).await;
1277 frontend.run_sql(sql).await.unwrap();
1278
1279 let session = frontend.session_ref();
1280 let catalog_reader = session.env().catalog_reader().read_guard();
1281 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1282
1283 let (source, _) = catalog_reader
1285 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
1286 .unwrap();
1287 assert_eq!(source.name, "t");
1288
1289 let columns = GET_COLUMN_FROM_CATALOG(source);
1290
1291 let city_type = StructType::new(vec![
1292 ("address", DataType::Varchar),
1293 ("zipcode", DataType::Varchar),
1294 ])
1295 .into();
1297 let expected_columns = maplit::hashmap! {
1298 ROW_ID_COLUMN_NAME => DataType::Serial,
1299 "id" => DataType::Int32,
1300 "zipcode" => DataType::Int64,
1301 "rate" => DataType::Float32,
1302 "country" => StructType::new(
1303 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
1304 )
1305 .into(),
1307 };
1308 assert_eq!(columns, expected_columns, "{columns:#?}");
1309 }
1310
1311 #[tokio::test]
1312 async fn test_create_mqtt_source_with_protobuf() {
1313 let proto_file = create_proto_file(PROTO_FILE_DATA);
1314 let sql = format!(
1315 r#"CREATE SOURCE t_mqtt
1316 WITH (
1317 connector = 'mqtt',
1318 url = 'mqtt://localhost:1883',
1319 topic = 'test_topic'
1320 )
1321 FORMAT PLAIN ENCODE PROTOBUF (
1322 message = '.test.TestRecord',
1323 schema.location = 'file://{}'
1324 )"#,
1325 proto_file.path().to_str().unwrap()
1326 );
1327 let frontend = LocalFrontend::new(Default::default()).await;
1328 frontend.run_sql(sql).await.unwrap();
1329
1330 let session = frontend.session_ref();
1331 let catalog_reader = session.env().catalog_reader().read_guard();
1332 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1333
1334 let (source, _) = catalog_reader
1335 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t_mqtt")
1336 .unwrap();
1337
1338 assert_eq!(source.name, "t_mqtt");
1339 assert_eq!(source.info.row_encode, EncodeType::Protobuf as i32);
1340 }
1341
1342 #[tokio::test]
1343 async fn test_create_posix_fs_source_requires_frontend_config() {
1344 let frontend = LocalFrontend::with_frontend_config(
1345 Default::default(),
1346 FrontendConfig {
1347 unsafe_enable_local_fs_connector: false,
1348 ..Default::default()
1349 },
1350 )
1351 .await;
1352 let err = frontend
1353 .run_sql(
1354 r#"CREATE SOURCE local_files (
1355 line VARCHAR
1356 ) WITH (
1357 connector = 'posix_fs',
1358 posix_fs.root = '/tmp',
1359 match_pattern = '*.csv'
1360 ) FORMAT PLAIN ENCODE CSV (without_header = 'true')"#
1361 .to_owned(),
1362 )
1363 .await
1364 .unwrap_err();
1365
1366 assert!(
1367 err.to_string()
1368 .contains("frontend.unsafe_enable_local_fs_connector = true"),
1369 "{err:?}"
1370 );
1371 }
1372
1373 #[tokio::test]
1374 async fn test_duplicate_props_options() {
1375 let proto_file = create_proto_file(PROTO_FILE_DATA);
1376 let sql = format!(
1377 r#"CREATE SOURCE t
1378 WITH (
1379 connector = 'kinesis',
1380 aws.region='user_test_topic',
1381 endpoint='172.10.1.1:9090,172.10.1.2:9090',
1382 aws.credentials.access_key_id = 'your_access_key_1',
1383 aws.credentials.secret_access_key = 'your_secret_key_1'
1384 )
1385 FORMAT PLAIN ENCODE PROTOBUF (
1386 message = '.test.TestRecord',
1387 aws.credentials.access_key_id = 'your_access_key_2',
1388 aws.credentials.secret_access_key = 'your_secret_key_2',
1389 schema.location = 'file://{}',
1390 )"#,
1391 proto_file.path().to_str().unwrap()
1392 );
1393 let frontend = LocalFrontend::new(Default::default()).await;
1394 frontend.run_sql(sql).await.unwrap();
1395
1396 let session = frontend.session_ref();
1397 let catalog_reader = session.env().catalog_reader().read_guard();
1398 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1399
1400 let (source, _) = catalog_reader
1402 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
1403 .unwrap();
1404 assert_eq!(source.name, "t");
1405
1406 assert_eq!(
1408 source
1409 .info
1410 .format_encode_options
1411 .get("aws.credentials.access_key_id")
1412 .unwrap(),
1413 "your_access_key_2"
1414 );
1415 assert_eq!(
1416 source
1417 .info
1418 .format_encode_options
1419 .get("aws.credentials.secret_access_key")
1420 .unwrap(),
1421 "your_secret_key_2"
1422 );
1423
1424 assert_eq!(
1426 source
1427 .with_properties
1428 .get("aws.credentials.access_key_id")
1429 .unwrap(),
1430 "your_access_key_1"
1431 );
1432 assert_eq!(
1433 source
1434 .with_properties
1435 .get("aws.credentials.secret_access_key")
1436 .unwrap(),
1437 "your_secret_key_1"
1438 );
1439
1440 assert!(!source.with_properties.contains_key("schema.location"));
1442 }
1443
1444 #[tokio::test]
1445 async fn test_multi_table_cdc_create_source_handler() {
1446 let sql =
1447 "CREATE SOURCE t2 WITH (connector = 'mysql-cdc') FORMAT PLAIN ENCODE JSON".to_owned();
1448 let frontend = LocalFrontend::new(Default::default()).await;
1449 let session = frontend.session_ref();
1450
1451 frontend
1452 .run_sql_with_session(session.clone(), sql)
1453 .await
1454 .unwrap();
1455 let catalog_reader = session.env().catalog_reader().read_guard();
1456 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1457
1458 let (source, _) = catalog_reader
1460 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t2")
1461 .unwrap();
1462 assert_eq!(source.name, "t2");
1463
1464 let columns = source
1465 .columns
1466 .iter()
1467 .map(|col| (col.name(), col.data_type().clone()))
1468 .collect::<Vec<(&str, DataType)>>();
1469
1470 expect_test::expect![[r#"
1471 [
1472 (
1473 "payload",
1474 Jsonb,
1475 ),
1476 (
1477 "_rw_offset",
1478 Varchar,
1479 ),
1480 (
1481 "_rw_table_name",
1482 Varchar,
1483 ),
1484 (
1485 "_row_id",
1486 Serial,
1487 ),
1488 ]
1489 "#]]
1490 .assert_debug_eq(&columns);
1491 }
1492
1493 #[tokio::test]
1494 async fn test_source_addition_columns() {
1495 let sql =
1497 "CREATE SOURCE s (v1 int) include key as _rw_kafka_key with (connector = 'kafka') format plain encode json".to_owned();
1498 let frontend = LocalFrontend::new(Default::default()).await;
1499 frontend.run_sql(sql).await.unwrap();
1500 let session = frontend.session_ref();
1501 let catalog_reader = session.env().catalog_reader().read_guard();
1502 let (source, _) = catalog_reader
1503 .get_source_by_name(
1504 DEFAULT_DATABASE_NAME,
1505 SchemaPath::Name(DEFAULT_SCHEMA_NAME),
1506 "s",
1507 )
1508 .unwrap();
1509 assert_eq!(source.name, "s");
1510
1511 let columns = source
1512 .columns
1513 .iter()
1514 .map(|col| (col.name(), col.data_type().clone()))
1515 .collect::<Vec<(&str, DataType)>>();
1516
1517 expect_test::expect![[r#"
1518 [
1519 (
1520 "v1",
1521 Int32,
1522 ),
1523 (
1524 "_rw_kafka_key",
1525 Bytea,
1526 ),
1527 (
1528 "_rw_kafka_timestamp",
1529 Timestamptz,
1530 ),
1531 (
1532 "_rw_kafka_partition",
1533 Varchar,
1534 ),
1535 (
1536 "_rw_kafka_offset",
1537 Varchar,
1538 ),
1539 (
1540 "_row_id",
1541 Serial,
1542 ),
1543 ]
1544 "#]]
1545 .assert_debug_eq(&columns);
1546 drop(catalog_reader);
1547
1548 let sql =
1549 "CREATE SOURCE s_pulsar (v1 int) include header 'tenant' as pulsar_header with (connector = 'pulsar') format plain encode json".to_owned();
1550 frontend.run_sql(sql).await.unwrap();
1551 let catalog_reader = session.env().catalog_reader().read_guard();
1552 let (source, _) = catalog_reader
1553 .get_source_by_name(
1554 DEFAULT_DATABASE_NAME,
1555 SchemaPath::Name(DEFAULT_SCHEMA_NAME),
1556 "s_pulsar",
1557 )
1558 .unwrap();
1559 assert_eq!(source.name, "s_pulsar");
1560
1561 let columns = source
1562 .columns
1563 .iter()
1564 .map(|col| (col.name(), col.data_type().clone()))
1565 .collect::<Vec<(&str, DataType)>>();
1566
1567 expect_test::expect![[r#"
1568 [
1569 (
1570 "v1",
1571 Int32,
1572 ),
1573 (
1574 "pulsar_header",
1575 Bytea,
1576 ),
1577 (
1578 "_row_id",
1579 Serial,
1580 ),
1581 ]
1582 "#]]
1583 .assert_debug_eq(&columns);
1584 drop(catalog_reader);
1585
1586 let sql =
1587 "CREATE SOURCE s3 (v1 int) include timestamp 'header1' as header_col with (connector = 'kafka') format plain encode json".to_owned();
1588 match frontend.run_sql(sql).await {
1589 Err(e) => {
1590 assert_eq!(
1591 e.to_string(),
1592 "Protocol error: Only header column can have inner field, but got \"timestamp\""
1593 )
1594 }
1595 _ => unreachable!(),
1596 }
1597 }
1598}