1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::LazyLock;
18
19use anyhow::{Context, anyhow};
20use either::Either;
21use external_schema::debezium::extract_debezium_avro_table_pk_columns;
22use external_schema::nexmark::check_nexmark_schema;
23use itertools::Itertools;
24use maplit::{convert_args, hashmap, hashset};
25use pgwire::pg_response::{PgResponse, StatementType};
26use rand::Rng;
27use risingwave_common::array::arrow::{IcebergArrowConvert, arrow_schema_iceberg};
28use risingwave_common::bail_not_implemented;
29use risingwave_common::catalog::{
30 ColumnCatalog, ColumnDesc, ColumnId, INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME,
31 ROW_ID_COLUMN_NAME, TableId, debug_assert_column_ids_distinct,
32};
33use risingwave_common::license::Feature;
34use risingwave_common::secret::LocalSecretManager;
35use risingwave_common::types::DataType;
36use risingwave_common::util::iter_util::ZipEqFast;
37use risingwave_connector::WithPropertiesExt;
38use risingwave_connector::parser::additional_columns::{
39 build_additional_column_desc, get_supported_additional_columns,
40 source_add_partition_offset_cols,
41};
42use risingwave_connector::parser::{
43 AvroParserConfig, DEBEZIUM_IGNORE_KEY, DebeziumAvroParserConfig, ProtobufParserConfig,
44 SchemaLocation, SpecificParserConfig, TimestamptzHandling,
45 fetch_json_schema_and_map_to_columns,
46};
47use risingwave_connector::schema::AWS_GLUE_SCHEMA_ARN_KEY;
48use risingwave_connector::schema::schema_registry::{
49 SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_USERNAME, SchemaRegistryAuth, name_strategy_from_str,
50};
51use risingwave_connector::source::cdc::{
52 CDC_AUTO_SCHEMA_CHANGE_KEY, CDC_MONGODB_STRONG_SCHEMA_KEY, CDC_SHARING_MODE_KEY,
53 CDC_SNAPSHOT_BACKFILL, CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY,
54 CDC_WAIT_FOR_STREAMING_START_TIMEOUT, CITUS_CDC_CONNECTOR, MONGODB_CDC_CONNECTOR,
55 MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR,
56};
57use risingwave_connector::source::datagen::DATAGEN_CONNECTOR;
58use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
59use risingwave_connector::source::nexmark::source::{EventType, get_event_data_types_with_names};
60use risingwave_connector::source::test_source::TEST_CONNECTOR;
61use risingwave_connector::source::{
62 AZBLOB_CONNECTOR, ConnectorProperties, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR,
63 KINESIS_CONNECTOR, LEGACY_S3_CONNECTOR, MQTT_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR,
64 OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR,
65};
66pub use risingwave_connector::source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR};
67use risingwave_pb::catalog::connection_params::PbConnectionType;
68use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc};
69use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
70use risingwave_pb::plan_common::{EncodeType, FormatType};
71use risingwave_pb::stream_plan::PbStreamFragmentGraph;
72use risingwave_pb::telemetry::TelemetryDatabaseObject;
73use risingwave_sqlparser::ast::{
74 AstString, ColumnDef, ColumnOption, CreateSourceStatement, Encode, Format, FormatEncodeOptions,
75 ObjectName, SourceWatermark, SqlOptionValue, TableConstraint, Value, get_delimiter,
76};
77use risingwave_sqlparser::parser::{IncludeOption, IncludeOptionItem};
78use thiserror_ext::AsReport;
79
80use super::RwPgResponse;
81use crate::binder::Binder;
82use crate::catalog::CatalogError;
83use crate::catalog::source_catalog::SourceCatalog;
84use crate::error::ErrorCode::{self, Deprecated, InvalidInputSyntax, NotSupported, ProtocolError};
85use crate::error::{Result, RwError};
86use crate::expr::Expr;
87use crate::handler::HandlerArgs;
88use crate::handler::create_table::{
89 ColumnIdGenerator, bind_pk_and_row_id_on_relation, bind_sql_column_constraints,
90 bind_sql_columns, bind_sql_pk_names, bind_table_constraints,
91};
92use crate::handler::util::{
93 SourceSchemaCompatExt, check_connector_match_connection_type, ensure_connection_type_allowed,
94};
95use crate::optimizer::plan_node::generic::SourceNodeKind;
96use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext};
97use crate::session::SessionImpl;
98use crate::session::current::notice_to_user;
99use crate::utils::{
100 OverwriteOptions, resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option,
101 resolve_secret_ref_in_with_options,
102};
103use crate::{OptimizerContext, WithOptions, WithOptionsSecResolved, bind_data_type, build_graph};
104
105mod external_schema;
106pub use external_schema::{
107 bind_columns_from_source, get_schema_location, schema_has_schema_registry,
108};
109mod validate;
110pub use validate::validate_compatibility;
111use validate::{SOURCE_ALLOWED_CONNECTION_CONNECTOR, SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY};
112mod additional_column;
113use additional_column::check_and_add_timestamp_column;
114pub use additional_column::handle_addition_columns;
115
116fn non_generated_sql_columns(columns: &[ColumnDef]) -> Vec<ColumnDef> {
117 columns
118 .iter()
119 .filter(|c| !c.is_generated())
120 .cloned()
121 .collect()
122}
123
124fn try_consume_string_from_options(
125 format_encode_options: &mut BTreeMap<String, String>,
126 key: &str,
127) -> Option<AstString> {
128 format_encode_options.remove(key).map(AstString)
129}
130
131fn consume_string_from_options(
132 format_encode_options: &mut BTreeMap<String, String>,
133 key: &str,
134) -> Result<AstString> {
135 try_consume_string_from_options(format_encode_options, key).ok_or(RwError::from(ProtocolError(
136 format!("missing field {} in options", key),
137 )))
138}
139
140fn consume_aws_config_from_options(format_encode_options: &mut BTreeMap<String, String>) {
141 format_encode_options.retain(|key, _| !key.starts_with("aws."))
142}
143
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145pub enum CreateSourceType {
146 SharedCdc,
147 SharedNonCdc,
149 NonShared,
150 Table,
152}
153
154impl CreateSourceType {
155 pub fn for_newly_created(
157 session: &SessionImpl,
158 with_properties: &impl WithPropertiesExt,
159 ) -> Self {
160 if with_properties.is_shareable_cdc_connector() {
161 CreateSourceType::SharedCdc
162 } else if with_properties.is_shareable_non_cdc_connector()
163 && session
164 .env()
165 .streaming_config()
166 .developer
167 .enable_shared_source
168 && session.config().streaming_use_shared_source()
169 {
170 CreateSourceType::SharedNonCdc
171 } else {
172 CreateSourceType::NonShared
173 }
174 }
175
176 pub fn for_replace(catalog: &SourceCatalog) -> Self {
177 if !catalog.info.is_shared() {
178 CreateSourceType::NonShared
179 } else if catalog.with_properties.is_shareable_cdc_connector() {
180 CreateSourceType::SharedCdc
181 } else {
182 CreateSourceType::SharedNonCdc
183 }
184 }
185
186 pub fn is_shared(&self) -> bool {
187 matches!(
188 self,
189 CreateSourceType::SharedCdc | CreateSourceType::SharedNonCdc
190 )
191 }
192}
193
194pub(crate) fn bind_all_columns(
196 format_encode: &FormatEncodeOptions,
197 cols_from_source: Option<Vec<ColumnCatalog>>,
198 cols_from_sql: Vec<ColumnCatalog>,
199 col_defs_from_sql: &[ColumnDef],
200 wildcard_idx: Option<usize>,
201 sql_column_strategy: SqlColumnStrategy,
202) -> Result<Vec<ColumnCatalog>> {
203 if let Some(cols_from_source) = cols_from_source {
204 let generated_cols_from_sql = cols_from_sql
207 .iter()
208 .filter(|c| {
209 col_defs_from_sql
210 .iter()
211 .find(|d| d.name.real_value() == c.name())
212 .unwrap()
213 .is_generated()
214 })
215 .cloned()
216 .collect_vec();
217
218 #[allow(clippy::collapsible_else_if)]
219 match sql_column_strategy {
220 SqlColumnStrategy::FollowUnchecked => {
222 assert!(
223 wildcard_idx.is_none(),
224 "wildcard still exists while strategy is Follows, not correctly purified?"
225 );
226 return Ok(cols_from_sql);
227 }
228
229 SqlColumnStrategy::Ignore => {}
231
232 SqlColumnStrategy::FollowChecked => {
233 let has_regular_cols_from_sql =
234 generated_cols_from_sql.len() != cols_from_sql.len();
235
236 if has_regular_cols_from_sql {
237 if wildcard_idx.is_some() {
238 return Err(RwError::from(NotSupported(
240 "When there's a wildcard (\"*\"), \
241 only generated columns are allowed in user-defined schema from SQL"
242 .to_owned(),
243 "Remove the non-generated columns".to_owned(),
244 )));
245 } else {
246 for col in &cols_from_sql {
249 if generated_cols_from_sql.contains(col) {
250 continue;
251 }
252 let Some(col_from_source) =
253 cols_from_source.iter().find(|c| c.name() == col.name())
254 else {
255 return Err(RwError::from(ProtocolError(format!(
256 "Column \"{}\" is defined in SQL but not found in the source",
257 col.name()
258 ))));
259 };
260
261 if col_from_source.data_type() != col.data_type() {
262 return Err(RwError::from(ProtocolError(format!(
263 "Data type mismatch for column \"{}\". \
264 Defined in SQL as \"{}\", but found in the source as \"{}\"",
265 col.name(),
266 col.data_type(),
267 col_from_source.data_type()
268 ))));
269 }
270 }
271 return Ok(cols_from_sql);
272 }
273 } else {
274 if wildcard_idx.is_some() {
275 } else {
279 notice_to_user("\
287 Neither wildcard (\"*\") nor regular (non-generated) columns appear in the user-defined schema from SQL. \
288 For backward compatibility, all columns from the source will be included at the beginning. \
289 For clarity, consider adding a wildcard (\"*\") to indicate where the columns from the source should be included, \
290 or specifying the columns you want to include from the source.
291 ");
292 }
293 }
294 }
295 }
296
297 let wildcard_idx = wildcard_idx.unwrap_or(0).min(generated_cols_from_sql.len());
302
303 let mut merged_cols = generated_cols_from_sql;
305 let merged_cols_r = merged_cols.split_off(wildcard_idx);
306 merged_cols.extend(cols_from_source);
307 merged_cols.extend(merged_cols_r);
308
309 Ok(merged_cols)
310 } else {
311 if wildcard_idx.is_some() {
312 return Err(RwError::from(NotSupported(
313 "Wildcard in user-defined schema is only allowed when there exists columns from external schema".to_owned(),
314 "Remove the wildcard or use a source with external schema".to_owned(),
315 )));
316 }
317 let non_generated_sql_defined_columns = non_generated_sql_columns(col_defs_from_sql);
318
319 match (&format_encode.format, &format_encode.row_encode) {
320 (Format::DebeziumMongo, Encode::Json) => {
321 let strong_schema = format_encode
322 .row_options
323 .iter()
324 .find(|k| k.name.real_value().to_lowercase() == CDC_MONGODB_STRONG_SCHEMA_KEY)
325 .map(|k| matches!(k.value, SqlOptionValue::Value(Value::Boolean(true))))
326 .unwrap_or(false);
327
328 if strong_schema {
330 let (_, id_column) = non_generated_sql_defined_columns
331 .iter()
332 .enumerate()
333 .find(|(idx, col)| *idx == 0 && col.name.real_value() == "_id")
334 .ok_or_else(|| {
335 RwError::from(ProtocolError(
336 "The `_id` column of the source with row format DebeziumMongoJson must be defined as the first column in SQL".to_owned(),
337 ))
338 })?;
339
340 let id_data_type = bind_data_type(id_column.data_type.as_ref().unwrap())?;
341 if !matches!(
342 id_data_type,
343 DataType::Varchar | DataType::Int32 | DataType::Int64 | DataType::Jsonb
344 ) {
345 return Err(RwError::from(ProtocolError(
346 "the `_id` column of the source with row format DebeziumMongoJson must be [Jsonb | Varchar | Int32 | Int64]".to_owned(),
347 )));
348 }
349
350 let mut columns = Vec::with_capacity(non_generated_sql_defined_columns.len());
351 columns.push(
352 ColumnCatalog {
354 column_desc: ColumnDesc::named("_id", 0.into(), id_data_type),
355 is_hidden: false,
356 },
357 );
358
359 for (idx, col) in non_generated_sql_defined_columns
361 .into_iter()
362 .skip(1)
364 .enumerate()
365 {
366 columns.push(ColumnCatalog {
367 column_desc: ColumnDesc::named(
368 col.name.real_value(),
369 (idx as i32).into(),
370 bind_data_type(col.data_type.as_ref().unwrap())?,
371 ),
372 is_hidden: false,
373 });
374 }
375
376 return Ok(columns);
377 }
378
379 let mut columns = vec![
380 ColumnCatalog {
381 column_desc: ColumnDesc::named("_id", 0.into(), DataType::Varchar),
382 is_hidden: false,
383 },
384 ColumnCatalog {
385 column_desc: ColumnDesc::named("payload", 0.into(), DataType::Jsonb),
386 is_hidden: false,
387 },
388 ];
389
390 if non_generated_sql_defined_columns.len() != 2
391 || non_generated_sql_defined_columns[0].name.real_value() != columns[0].name()
392 || non_generated_sql_defined_columns[1].name.real_value() != columns[1].name()
393 {
394 return Err(RwError::from(ProtocolError(
395 "the not generated columns of the source with row format DebeziumMongoJson
396 must be (_id [Jsonb | Varchar | Int32 | Int64], payload jsonb)."
397 .to_owned(),
398 )));
399 }
400 let key_data_type = bind_data_type(
402 non_generated_sql_defined_columns[0]
403 .data_type
404 .as_ref()
405 .unwrap(),
406 )?;
407 match key_data_type {
408 DataType::Jsonb | DataType::Varchar | DataType::Int32 | DataType::Int64 => {
409 columns[0].column_desc.data_type = key_data_type.clone();
410 }
411 _ => {
412 return Err(RwError::from(ProtocolError(
413 "the `_id` column of the source with row format DebeziumMongoJson
414 must be [Jsonb | Varchar | Int32 | Int64]"
415 .to_owned(),
416 )));
417 }
418 }
419
420 let value_data_type = bind_data_type(
422 non_generated_sql_defined_columns[1]
423 .data_type
424 .as_ref()
425 .unwrap(),
426 )?;
427 if !matches!(value_data_type, DataType::Jsonb) {
428 return Err(RwError::from(ProtocolError(
429 "the `payload` column of the source with row format DebeziumMongoJson
430 must be Jsonb datatype"
431 .to_owned(),
432 )));
433 }
434 Ok(columns)
435 }
436 (Format::Plain, Encode::Bytes) => {
437 let err = Err(RwError::from(ProtocolError(
438 "ENCODE BYTES only accepts one BYTEA type column".to_owned(),
439 )));
440 if non_generated_sql_defined_columns.len() == 1 {
441 let col_data_type = bind_data_type(
443 non_generated_sql_defined_columns[0]
444 .data_type
445 .as_ref()
446 .unwrap(),
447 )?;
448 if col_data_type == DataType::Bytea {
449 Ok(cols_from_sql)
450 } else {
451 err
452 }
453 } else {
454 err
455 }
456 }
457 (_, _) => Ok(cols_from_sql),
458 }
459 }
460}
461
462fn hint_format_encode(format_encode: &FormatEncodeOptions) -> String {
464 format!(
465 r#"Hint: For FORMAT {0} ENCODE {1}, INCLUDE KEY must be specified and the key column must be used as primary key.
466example:
467 CREATE TABLE <table_name> ( PRIMARY KEY ([rw_key | <key_name>]) )
468 INCLUDE KEY [AS <key_name>]
469 WITH (...)
470 FORMAT {0} ENCODE {1}{2}
471"#,
472 format_encode.format,
473 format_encode.row_encode,
474 if format_encode.row_encode == Encode::Json || format_encode.row_encode == Encode::Bytes {
475 "".to_owned()
476 } else {
477 " (...)".to_owned()
478 }
479 )
480}
481
482pub(crate) async fn bind_source_pk(
485 format_encode: &FormatEncodeOptions,
486 source_info: &StreamSourceInfo,
487 columns: &mut [ColumnCatalog],
488 sql_defined_pk_names: Vec<String>,
489 with_properties: &WithOptionsSecResolved,
490) -> Result<Vec<String>> {
491 let sql_defined_pk = !sql_defined_pk_names.is_empty();
492 let include_key_column_name: Option<String> = {
493 columns.iter().find_map(|catalog| {
496 if matches!(
497 catalog.column_desc.additional_column.column_type,
498 Some(AdditionalColumnType::Key(_))
499 ) {
500 Some(catalog.name().to_owned())
501 } else {
502 None
503 }
504 })
505 };
506 let additional_column_names = columns
507 .iter()
508 .filter_map(|col| {
509 if col.column_desc.additional_column.column_type.is_some() {
510 Some(col.name().to_owned())
511 } else {
512 None
513 }
514 })
515 .collect_vec();
516
517 let res = match (&format_encode.format, &format_encode.row_encode) {
518 (Format::Native, Encode::Native) | (Format::None, Encode::None) | (Format::Plain, _) => {
519 sql_defined_pk_names
520 }
521
522 (Format::Upsert, Encode::Json | Encode::Avro | Encode::Protobuf) => {
525 if let Some(ref key_column_name) = include_key_column_name
526 && sql_defined_pk
527 {
528 if sql_defined_pk_names.len() != 1
533 || !key_column_name.eq(sql_defined_pk_names[0].as_str())
534 {
535 return Err(RwError::from(ProtocolError(format!(
536 "Only \"{}\" can be used as primary key\n\n{}",
537 key_column_name,
538 hint_format_encode(format_encode)
539 ))));
540 }
541 sql_defined_pk_names
542 } else {
543 return if let Some(include_key_column_name) = include_key_column_name {
545 Err(RwError::from(ProtocolError(format!(
546 "Primary key must be specified to {}\n\n{}",
547 include_key_column_name,
548 hint_format_encode(format_encode)
549 ))))
550 } else {
551 Err(RwError::from(ProtocolError(format!(
552 "INCLUDE KEY clause not set\n\n{}",
553 hint_format_encode(format_encode)
554 ))))
555 };
556 }
557 }
558
559 (Format::Debezium, Encode::Json) => {
560 if !additional_column_names.is_empty() {
561 return Err(RwError::from(ProtocolError(format!(
562 "FORMAT DEBEZIUM forbids additional columns, but got {:?}",
563 additional_column_names
564 ))));
565 }
566 if !sql_defined_pk {
567 return Err(RwError::from(ProtocolError(
568 "Primary key must be specified when creating source with FORMAT DEBEZIUM."
569 .to_owned(),
570 )));
571 }
572 sql_defined_pk_names
573 }
574 (Format::Debezium, Encode::Avro) => {
575 if !additional_column_names.is_empty() {
576 return Err(RwError::from(ProtocolError(format!(
577 "FORMAT DEBEZIUM forbids additional columns, but got {:?}",
578 additional_column_names
579 ))));
580 }
581 if sql_defined_pk {
582 sql_defined_pk_names
583 } else {
584 let pk_names =
585 extract_debezium_avro_table_pk_columns(source_info, with_properties).await?;
586 for pk_name in &pk_names {
588 columns
589 .iter()
590 .find(|c: &&ColumnCatalog| c.name().eq(pk_name))
591 .ok_or_else(|| {
592 RwError::from(ProtocolError(format!(
593 "avro's key column {} not exists in avro's row schema",
594 pk_name
595 )))
596 })?;
597 }
598 pk_names
599 }
600 }
601 (Format::DebeziumMongo, Encode::Json) => {
602 if sql_defined_pk {
603 sql_defined_pk_names
604 } else {
605 vec!["_id".to_owned()]
606 }
607 }
608
609 (Format::Maxwell, Encode::Json) => {
610 if !additional_column_names.is_empty() {
611 return Err(RwError::from(ProtocolError(format!(
612 "FORMAT MAXWELL forbids additional columns, but got {:?}",
613 additional_column_names
614 ))));
615 }
616 if !sql_defined_pk {
617 return Err(RwError::from(ProtocolError(
618 "Primary key must be specified when creating source with FORMAT MAXWELL ENCODE JSON.".to_owned(),
619 )));
620 }
621 sql_defined_pk_names
622 }
623
624 (Format::Canal, Encode::Json) => {
625 if !additional_column_names.is_empty() {
626 return Err(RwError::from(ProtocolError(format!(
627 "FORMAT CANAL forbids additional columns, but got {:?}",
628 additional_column_names
629 ))));
630 }
631 if !sql_defined_pk {
632 return Err(RwError::from(ProtocolError(
633 "Primary key must be specified when creating source with FORMAT CANAL ENCODE JSON.".to_owned(),
634 )));
635 }
636 sql_defined_pk_names
637 }
638 (format, encoding) => {
639 return Err(RwError::from(ProtocolError(format!(
640 "Unknown combination {:?} {:?}",
641 format, encoding
642 ))));
643 }
644 };
645 Ok(res)
646}
647
648pub(super) fn bind_source_watermark(
649 session: &SessionImpl,
650 name: String,
651 source_watermarks: Vec<SourceWatermark>,
652 column_catalogs: &[ColumnCatalog],
653) -> Result<Vec<WatermarkDesc>> {
654 let mut binder = Binder::new_for_ddl(session);
655 binder.bind_columns_to_context(name.clone(), column_catalogs)?;
656
657 let watermark_descs = source_watermarks
658 .into_iter()
659 .map(|source_watermark| {
660 let col_name = source_watermark.column.real_value();
661 let watermark_idx = binder.get_column_binding_index(name.clone(), &col_name)?;
662
663 let expr = binder.bind_expr(source_watermark.expr)?;
664 let watermark_col_type = column_catalogs[watermark_idx].data_type();
665 let watermark_expr_type = &expr.return_type();
666 if watermark_col_type != watermark_expr_type {
667 Err(RwError::from(ErrorCode::BindError(
668 format!("The return value type of the watermark expression must be identical to the watermark column data type. Current data type of watermark return value: `{}`, column `{}`",watermark_expr_type, watermark_col_type),
669 )))
670 } else {
671 let expr_proto = expr.to_expr_proto();
672 Ok::<_, RwError>(WatermarkDesc {
673 watermark_idx: watermark_idx as u32,
674 expr: Some(expr_proto),
675 })
676 }
677 })
678 .try_collect()?;
679 Ok(watermark_descs)
680}
681
682pub(super) fn check_format_encode(
688 props: &WithOptionsSecResolved,
689 row_id_index: Option<usize>,
690 columns: &[ColumnCatalog],
691) -> Result<()> {
692 let Some(connector) = props.get_connector() else {
693 return Ok(());
694 };
695
696 if connector == NEXMARK_CONNECTOR {
697 check_nexmark_schema(props, row_id_index, columns)
698 } else {
699 Ok(())
700 }
701}
702
703pub fn bind_connector_props(
704 handler_args: &HandlerArgs,
705 format_encode: &FormatEncodeOptions,
706 is_create_source: bool,
707) -> Result<WithOptions> {
708 let mut with_properties = handler_args.with_options.clone().into_connector_props();
709 validate_compatibility(format_encode, &mut with_properties)?;
710 let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
711
712 if !is_create_source && with_properties.is_shareable_only_cdc_connector() {
713 return Err(RwError::from(ProtocolError(format!(
714 "connector {} does not support `CREATE TABLE`, please use `CREATE SOURCE` instead",
715 with_properties.get_connector().unwrap(),
716 ))));
717 }
718 if is_create_source && create_cdc_source_job {
719 if let Some(value) = with_properties.get(CDC_AUTO_SCHEMA_CHANGE_KEY)
720 && value
721 .parse::<bool>()
722 .map_err(|_| anyhow!("invalid value of '{}' option", CDC_AUTO_SCHEMA_CHANGE_KEY))?
723 {
724 Feature::CdcAutoSchemaChange
725 .check_available()
726 .map_err(|e| anyhow::anyhow!(e))?;
727 }
728
729 with_properties.insert(CDC_SNAPSHOT_MODE_KEY.into(), CDC_SNAPSHOT_BACKFILL.into());
731 with_properties.insert(CDC_SHARING_MODE_KEY.into(), "true".into());
733 if with_properties.enable_transaction_metadata() {
735 with_properties.insert(CDC_TRANSACTIONAL_KEY.into(), "true".into());
736 }
737 with_properties.insert(
738 CDC_WAIT_FOR_STREAMING_START_TIMEOUT.into(),
739 handler_args
740 .session
741 .config()
742 .cdc_source_wait_streaming_start_timeout()
743 .to_string(),
744 );
745 }
746 if with_properties.is_mysql_cdc_connector() {
747 with_properties
751 .entry("server.id".to_owned())
752 .or_insert(rand::rng().random_range(1..u32::MAX).to_string());
753 }
754 Ok(with_properties)
755}
756
757pub enum SqlColumnStrategy {
760 FollowUnchecked,
765
766 FollowChecked,
773
774 Ignore,
779}
780
781#[allow(clippy::too_many_arguments)]
782pub async fn bind_create_source_or_table_with_connector(
783 handler_args: HandlerArgs,
784 full_name: ObjectName,
785 format_encode: FormatEncodeOptions,
786 with_properties: WithOptions,
787 sql_columns_defs: &[ColumnDef],
788 constraints: Vec<TableConstraint>,
789 wildcard_idx: Option<usize>,
790 source_watermarks: Vec<SourceWatermark>,
791 columns_from_resolve_source: Option<Vec<ColumnCatalog>>,
792 source_info: StreamSourceInfo,
793 include_column_options: IncludeOption,
794 col_id_gen: &mut ColumnIdGenerator,
795 create_source_type: CreateSourceType,
796 source_rate_limit: Option<u32>,
797 sql_column_strategy: SqlColumnStrategy,
798) -> Result<SourceCatalog> {
799 let session = &handler_args.session;
800 let db_name: &str = &session.database();
801 let (schema_name, source_name) = Binder::resolve_schema_qualified_name(db_name, full_name)?;
802 let (database_id, schema_id) =
803 session.get_database_and_schema_id_for_create(schema_name.clone())?;
804
805 let is_create_source = create_source_type != CreateSourceType::Table;
806 if !is_create_source && with_properties.is_iceberg_connector() {
807 return Err(ErrorCode::BindError(
808 "can't CREATE TABLE with iceberg connector\n\nHint: use CREATE SOURCE instead"
809 .to_owned(),
810 )
811 .into());
812 }
813 if is_create_source {
814 match format_encode.format {
815 Format::Upsert
816 | Format::Debezium
817 | Format::DebeziumMongo
818 | Format::Maxwell
819 | Format::Canal => {
820 return Err(ErrorCode::BindError(format!(
821 "can't CREATE SOURCE with FORMAT {}.\n\nHint: use CREATE TABLE instead\n\n{}",
822 format_encode.format,
823 hint_format_encode(&format_encode)
824 ))
825 .into());
826 }
827 _ => {
828 }
830 }
831 }
832
833 let sql_pk_names = bind_sql_pk_names(sql_columns_defs, bind_table_constraints(&constraints)?)?;
834
835 let columns_from_sql = bind_sql_columns(sql_columns_defs, false)?;
836
837 let mut columns = bind_all_columns(
838 &format_encode,
839 columns_from_resolve_source,
840 columns_from_sql,
841 sql_columns_defs,
842 wildcard_idx,
843 sql_column_strategy,
844 )?;
845
846 handle_addition_columns(
848 Some(&format_encode),
849 &with_properties,
850 include_column_options,
851 &mut columns,
852 false,
853 )?;
854
855 if columns.is_empty() {
856 return Err(RwError::from(ProtocolError(
857 "Schema definition is required, either from SQL or schema registry.".to_owned(),
858 )));
859 }
860
861 if is_create_source {
863 check_and_add_timestamp_column(&with_properties, &mut columns);
865
866 if create_source_type == CreateSourceType::SharedNonCdc {
869 let (columns_exist, additional_columns) = source_add_partition_offset_cols(
870 &columns,
871 &with_properties.get_connector().unwrap(),
872 true, );
874 for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) {
875 if !existed {
876 columns.push(ColumnCatalog::hidden(c));
877 }
878 }
879 }
880 }
881
882 let mut with_properties = with_properties;
884 resolve_privatelink_in_with_option(&mut with_properties)?;
885
886 let (with_properties, connection_type, connector_conn_ref) =
887 resolve_connection_ref_and_secret_ref(
888 with_properties,
889 session,
890 TelemetryDatabaseObject::Source,
891 )?;
892 ensure_connection_type_allowed(connection_type, &SOURCE_ALLOWED_CONNECTION_CONNECTOR)?;
893
894 if !matches!(connection_type, PbConnectionType::Unspecified) {
896 let connector = with_properties.get_connector().unwrap();
897 check_connector_match_connection_type(connector.as_str(), &connection_type)?;
898 }
899
900 let pk_names = bind_source_pk(
901 &format_encode,
902 &source_info,
903 &mut columns,
904 sql_pk_names,
905 &with_properties,
906 )
907 .await?;
908
909 if is_create_source && !pk_names.is_empty() {
910 return Err(ErrorCode::InvalidInputSyntax(
911 "Source does not support PRIMARY KEY constraint, please use \"CREATE TABLE\" instead"
912 .to_owned(),
913 )
914 .into());
915 }
916
917 if let Some(duplicated_name) = columns.iter().map(|c| c.name()).duplicates().next() {
920 return Err(ErrorCode::InvalidInputSyntax(format!(
921 "column \"{}\" specified more than once",
922 duplicated_name
923 ))
924 .into());
925 }
926
927 for c in &mut columns {
929 let original_data_type = c.data_type().clone();
930 col_id_gen.generate(c)?;
931 if is_create_source {
935 c.column_desc.data_type = original_data_type;
936 }
937 }
938 debug_assert_column_ids_distinct(&columns);
939
940 let (mut columns, pk_col_ids, row_id_index) =
941 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
942
943 let watermark_descs =
944 bind_source_watermark(session, source_name.clone(), source_watermarks, &columns)?;
945 assert!(watermark_descs.len() <= 1);
947
948 bind_sql_column_constraints(
949 session,
950 source_name.clone(),
951 &mut columns,
952 sql_columns_defs.to_vec(),
954 &pk_col_ids,
955 )?;
956 check_format_encode(&with_properties, row_id_index, &columns)?;
957
958 let definition = handler_args.normalized_sql.clone();
959
960 let associated_table_id = if is_create_source {
961 None
962 } else {
963 Some(TableId::placeholder())
964 };
965 let source = SourceCatalog {
966 id: TableId::placeholder().table_id,
967 name: source_name,
968 schema_id,
969 database_id,
970 columns,
971 pk_col_ids,
972 append_only: row_id_index.is_some(),
973 owner: session.user_id(),
974 info: source_info,
975 row_id_index,
976 with_properties,
977 watermark_descs,
978 associated_table_id,
979 definition,
980 connection_id: connector_conn_ref,
981 created_at_epoch: None,
982 initialized_at_epoch: None,
983 version: INITIAL_SOURCE_VERSION_ID,
984 created_at_cluster_version: None,
985 initialized_at_cluster_version: None,
986 rate_limit: source_rate_limit,
987 };
988 Ok(source)
989}
990
991pub async fn handle_create_source(
992 mut handler_args: HandlerArgs,
993 stmt: CreateSourceStatement,
994) -> Result<RwPgResponse> {
995 let session = handler_args.session.clone();
996 let overwrite_options = OverwriteOptions::new(&mut handler_args);
997
998 if let Either::Right(resp) = session.check_relation_name_duplicated(
999 stmt.source_name.clone(),
1000 StatementType::CREATE_SOURCE,
1001 stmt.if_not_exists,
1002 )? {
1003 return Ok(resp);
1004 }
1005
1006 if handler_args.with_options.is_empty() {
1007 return Err(RwError::from(InvalidInputSyntax(
1008 "missing WITH clause".to_owned(),
1009 )));
1010 }
1011
1012 let format_encode = stmt.format_encode.into_v2_with_warning();
1013 let with_properties = bind_connector_props(&handler_args, &format_encode, true)?;
1014
1015 let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
1016 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
1017 &session,
1018 &format_encode,
1019 Either::Left(&with_properties),
1020 create_source_type,
1021 )
1022 .await?;
1023 let mut col_id_gen = ColumnIdGenerator::new_initial();
1024
1025 if stmt.columns.iter().any(|col| {
1026 col.options
1027 .iter()
1028 .any(|def| matches!(def.option, ColumnOption::NotNull))
1029 }) {
1030 return Err(RwError::from(InvalidInputSyntax(
1031 "NOT NULL constraint is not supported in source schema".to_owned(),
1032 )));
1033 }
1034
1035 let source_catalog = bind_create_source_or_table_with_connector(
1036 handler_args.clone(),
1037 stmt.source_name,
1038 format_encode,
1039 with_properties,
1040 &stmt.columns,
1041 stmt.constraints,
1042 stmt.wildcard_idx,
1043 stmt.source_watermarks,
1044 columns_from_resolve_source,
1045 source_info,
1046 stmt.include_column_options,
1047 &mut col_id_gen,
1048 create_source_type,
1049 overwrite_options.source_rate_limit,
1050 SqlColumnStrategy::FollowChecked,
1051 )
1052 .await?;
1053
1054 if stmt.temporary {
1056 if session.get_temporary_source(&source_catalog.name).is_some() {
1057 return Err(CatalogError::duplicated("source", source_catalog.name.clone()).into());
1058 }
1059 session.create_temporary_source(source_catalog);
1060 return Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE));
1061 }
1062
1063 let source = source_catalog.to_prost();
1064
1065 let catalog_writer = session.catalog_writer()?;
1066
1067 if create_source_type.is_shared() {
1068 let graph = generate_stream_graph_for_source(handler_args, source_catalog)?;
1069 catalog_writer.create_source(source, Some(graph)).await?;
1070 } else {
1071 catalog_writer.create_source(source, None).await?;
1073 }
1074
1075 Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE))
1076}
1077
1078pub(super) fn generate_stream_graph_for_source(
1079 handler_args: HandlerArgs,
1080 source_catalog: SourceCatalog,
1081) -> Result<PbStreamFragmentGraph> {
1082 let context = OptimizerContext::from_handler_args(handler_args);
1083 let source_node = LogicalSource::with_catalog(
1084 Rc::new(source_catalog),
1085 SourceNodeKind::CreateSharedSource,
1086 context.into(),
1087 None,
1088 )?;
1089
1090 let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?;
1091 let graph = build_graph(stream_plan)?;
1092 Ok(graph)
1093}
1094
1095#[cfg(test)]
1096pub mod tests {
1097 use std::collections::HashMap;
1098 use std::sync::Arc;
1099
1100 use risingwave_common::catalog::{
1101 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME,
1102 };
1103 use risingwave_common::types::{DataType, StructType};
1104
1105 use crate::catalog::root_catalog::SchemaPath;
1106 use crate::catalog::source_catalog::SourceCatalog;
1107 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
1108
1109 const GET_COLUMN_FROM_CATALOG: fn(&Arc<SourceCatalog>) -> HashMap<&str, DataType> =
1110 |catalog: &Arc<SourceCatalog>| -> HashMap<&str, DataType> {
1111 catalog
1112 .columns
1113 .iter()
1114 .map(|col| (col.name(), col.data_type().clone()))
1115 .collect::<HashMap<&str, DataType>>()
1116 };
1117
1118 #[tokio::test]
1119 async fn test_create_source_handler() {
1120 let proto_file = create_proto_file(PROTO_FILE_DATA);
1121 let sql = format!(
1122 r#"CREATE SOURCE t
1123 WITH (connector = 'kinesis')
1124 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
1125 proto_file.path().to_str().unwrap()
1126 );
1127 let frontend = LocalFrontend::new(Default::default()).await;
1128 frontend.run_sql(sql).await.unwrap();
1129
1130 let session = frontend.session_ref();
1131 let catalog_reader = session.env().catalog_reader().read_guard();
1132 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1133
1134 let (source, _) = catalog_reader
1136 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
1137 .unwrap();
1138 assert_eq!(source.name, "t");
1139
1140 let columns = GET_COLUMN_FROM_CATALOG(source);
1141
1142 let city_type = StructType::new(vec![
1143 ("address", DataType::Varchar),
1144 ("zipcode", DataType::Varchar),
1145 ])
1146 .into();
1148 let expected_columns = maplit::hashmap! {
1149 ROW_ID_COLUMN_NAME => DataType::Serial,
1150 "id" => DataType::Int32,
1151 "zipcode" => DataType::Int64,
1152 "rate" => DataType::Float32,
1153 "country" => StructType::new(
1154 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
1155 )
1156 .into(),
1158 };
1159 assert_eq!(columns, expected_columns, "{columns:#?}");
1160 }
1161
1162 #[tokio::test]
1163 async fn test_duplicate_props_options() {
1164 let proto_file = create_proto_file(PROTO_FILE_DATA);
1165 let sql = format!(
1166 r#"CREATE SOURCE t
1167 WITH (
1168 connector = 'kinesis',
1169 aws.region='user_test_topic',
1170 endpoint='172.10.1.1:9090,172.10.1.2:9090',
1171 aws.credentials.access_key_id = 'your_access_key_1',
1172 aws.credentials.secret_access_key = 'your_secret_key_1'
1173 )
1174 FORMAT PLAIN ENCODE PROTOBUF (
1175 message = '.test.TestRecord',
1176 aws.credentials.access_key_id = 'your_access_key_2',
1177 aws.credentials.secret_access_key = 'your_secret_key_2',
1178 schema.location = 'file://{}',
1179 )"#,
1180 proto_file.path().to_str().unwrap()
1181 );
1182 let frontend = LocalFrontend::new(Default::default()).await;
1183 frontend.run_sql(sql).await.unwrap();
1184
1185 let session = frontend.session_ref();
1186 let catalog_reader = session.env().catalog_reader().read_guard();
1187 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1188
1189 let (source, _) = catalog_reader
1191 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
1192 .unwrap();
1193 assert_eq!(source.name, "t");
1194
1195 assert_eq!(
1197 source
1198 .info
1199 .format_encode_options
1200 .get("aws.credentials.access_key_id")
1201 .unwrap(),
1202 "your_access_key_2"
1203 );
1204 assert_eq!(
1205 source
1206 .info
1207 .format_encode_options
1208 .get("aws.credentials.secret_access_key")
1209 .unwrap(),
1210 "your_secret_key_2"
1211 );
1212
1213 assert_eq!(
1215 source
1216 .with_properties
1217 .get("aws.credentials.access_key_id")
1218 .unwrap(),
1219 "your_access_key_1"
1220 );
1221 assert_eq!(
1222 source
1223 .with_properties
1224 .get("aws.credentials.secret_access_key")
1225 .unwrap(),
1226 "your_secret_key_1"
1227 );
1228
1229 assert!(!source.with_properties.contains_key("schema.location"));
1231 }
1232
1233 #[tokio::test]
1234 async fn test_multi_table_cdc_create_source_handler() {
1235 let sql =
1236 "CREATE SOURCE t2 WITH (connector = 'mysql-cdc') FORMAT PLAIN ENCODE JSON".to_owned();
1237 let frontend = LocalFrontend::new(Default::default()).await;
1238 let session = frontend.session_ref();
1239
1240 frontend
1241 .run_sql_with_session(session.clone(), sql)
1242 .await
1243 .unwrap();
1244 let catalog_reader = session.env().catalog_reader().read_guard();
1245 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1246
1247 let (source, _) = catalog_reader
1249 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t2")
1250 .unwrap();
1251 assert_eq!(source.name, "t2");
1252
1253 let columns = source
1254 .columns
1255 .iter()
1256 .map(|col| (col.name(), col.data_type().clone()))
1257 .collect::<Vec<(&str, DataType)>>();
1258
1259 expect_test::expect![[r#"
1260 [
1261 (
1262 "payload",
1263 Jsonb,
1264 ),
1265 (
1266 "_rw_offset",
1267 Varchar,
1268 ),
1269 (
1270 "_rw_table_name",
1271 Varchar,
1272 ),
1273 (
1274 "_row_id",
1275 Serial,
1276 ),
1277 ]
1278 "#]]
1279 .assert_debug_eq(&columns);
1280 }
1281
1282 #[tokio::test]
1283 async fn test_source_addition_columns() {
1284 let sql =
1286 "CREATE SOURCE s (v1 int) include key as _rw_kafka_key with (connector = 'kafka') format plain encode json".to_owned();
1287 let frontend = LocalFrontend::new(Default::default()).await;
1288 frontend.run_sql(sql).await.unwrap();
1289 let session = frontend.session_ref();
1290 let catalog_reader = session.env().catalog_reader().read_guard();
1291 let (source, _) = catalog_reader
1292 .get_source_by_name(
1293 DEFAULT_DATABASE_NAME,
1294 SchemaPath::Name(DEFAULT_SCHEMA_NAME),
1295 "s",
1296 )
1297 .unwrap();
1298 assert_eq!(source.name, "s");
1299
1300 let columns = source
1301 .columns
1302 .iter()
1303 .map(|col| (col.name(), col.data_type().clone()))
1304 .collect::<Vec<(&str, DataType)>>();
1305
1306 expect_test::expect![[r#"
1307 [
1308 (
1309 "v1",
1310 Int32,
1311 ),
1312 (
1313 "_rw_kafka_key",
1314 Bytea,
1315 ),
1316 (
1317 "_rw_kafka_timestamp",
1318 Timestamptz,
1319 ),
1320 (
1321 "_rw_kafka_partition",
1322 Varchar,
1323 ),
1324 (
1325 "_rw_kafka_offset",
1326 Varchar,
1327 ),
1328 (
1329 "_row_id",
1330 Serial,
1331 ),
1332 ]
1333 "#]]
1334 .assert_debug_eq(&columns);
1335
1336 let sql =
1337 "CREATE SOURCE s3 (v1 int) include timestamp 'header1' as header_col with (connector = 'kafka') format plain encode json".to_owned();
1338 match frontend.run_sql(sql).await {
1339 Err(e) => {
1340 assert_eq!(
1341 e.to_string(),
1342 "Protocol error: Only header column can have inner field, but got \"timestamp\""
1343 )
1344 }
1345 _ => unreachable!(),
1346 }
1347 }
1348}