1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::LazyLock;
18
19use anyhow::{Context, anyhow};
20use either::Either;
21use external_schema::debezium::extract_debezium_avro_table_pk_columns;
22use external_schema::nexmark::check_nexmark_schema;
23use itertools::Itertools;
24use maplit::{convert_args, hashmap, hashset};
25use pgwire::pg_response::{PgResponse, StatementType};
26use rand::Rng;
27use risingwave_common::array::arrow::{IcebergArrowConvert, arrow_schema_iceberg};
28use risingwave_common::bail_not_implemented;
29use risingwave_common::catalog::{
30 ColumnCatalog, ColumnDesc, ColumnId, INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME,
31 ROW_ID_COLUMN_NAME, TableId, debug_assert_column_ids_distinct,
32};
33use risingwave_common::license::Feature;
34use risingwave_common::secret::LocalSecretManager;
35use risingwave_common::system_param::reader::SystemParamsRead;
36use risingwave_common::types::DataType;
37use risingwave_common::util::iter_util::ZipEqFast;
38use risingwave_connector::parser::additional_columns::{
39 build_additional_column_desc, get_supported_additional_columns,
40 source_add_partition_offset_cols,
41};
42use risingwave_connector::parser::{
43 AvroParserConfig, DEBEZIUM_IGNORE_KEY, DebeziumAvroParserConfig, ProtobufParserConfig,
44 SchemaLocation, SpecificParserConfig, TimestamptzHandling,
45 fetch_json_schema_and_map_to_columns,
46};
47use risingwave_connector::schema::AWS_GLUE_SCHEMA_ARN_KEY;
48use risingwave_connector::schema::schema_registry::{
49 SCHEMA_REGISTRY_BACKOFF_DURATION_KEY, SCHEMA_REGISTRY_BACKOFF_FACTOR_KEY,
50 SCHEMA_REGISTRY_MAX_DELAY_KEY, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_RETRIES_MAX_KEY,
51 SCHEMA_REGISTRY_USERNAME, SchemaRegistryConfig, name_strategy_from_str,
52};
53use risingwave_connector::source::cdc::{
54 CDC_MONGODB_STRONG_SCHEMA_KEY, CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL,
55 CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY, CDC_WAIT_FOR_STREAMING_START_TIMEOUT,
56 CITUS_CDC_CONNECTOR, MONGODB_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR,
57 SQL_SERVER_CDC_CONNECTOR,
58};
59use risingwave_connector::source::datagen::DATAGEN_CONNECTOR;
60use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
61use risingwave_connector::source::nexmark::source::{EventType, get_event_data_types_with_names};
62use risingwave_connector::source::test_source::TEST_CONNECTOR;
63use risingwave_connector::source::{
64 AZBLOB_CONNECTOR, ConnectorProperties, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR,
65 KINESIS_CONNECTOR, LEGACY_S3_CONNECTOR, MQTT_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR,
66 OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR,
67};
68pub use risingwave_connector::source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR};
69use risingwave_connector::{AUTO_SCHEMA_CHANGE_KEY, WithPropertiesExt};
70use risingwave_pb::catalog::connection_params::PbConnectionType;
71use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc};
72use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
73use risingwave_pb::plan_common::{EncodeType, FormatType};
74use risingwave_pb::stream_plan::PbStreamFragmentGraph;
75use risingwave_pb::telemetry::TelemetryDatabaseObject;
76use risingwave_sqlparser::ast::{
77 AstString, ColumnDef, ColumnOption, CreateSourceStatement, Encode, Format, FormatEncodeOptions,
78 ObjectName, SourceWatermark, SqlOptionValue, TableConstraint, Value, get_delimiter,
79};
80use risingwave_sqlparser::parser::{IncludeOption, IncludeOptionItem};
81use thiserror_ext::AsReport;
82
83use super::RwPgResponse;
84use crate::binder::Binder;
85use crate::catalog::CatalogError;
86use crate::catalog::source_catalog::SourceCatalog;
87use crate::error::ErrorCode::{self, Deprecated, InvalidInputSyntax, NotSupported, ProtocolError};
88use crate::error::{Result, RwError};
89use crate::expr::Expr;
90use crate::handler::HandlerArgs;
91use crate::handler::create_table::{
92 ColumnIdGenerator, bind_pk_and_row_id_on_relation, bind_sql_column_constraints,
93 bind_sql_columns, bind_sql_pk_names, bind_table_constraints,
94};
95use crate::handler::util::{
96 SourceSchemaCompatExt, check_connector_match_connection_type, ensure_connection_type_allowed,
97};
98use crate::optimizer::plan_node::generic::SourceNodeKind;
99use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext};
100use crate::session::SessionImpl;
101use crate::session::current::notice_to_user;
102use crate::utils::{
103 OverwriteOptions, resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option,
104 resolve_secret_ref_in_with_options,
105};
106use crate::{OptimizerContext, WithOptions, WithOptionsSecResolved, bind_data_type, build_graph};
107
108mod external_schema;
109pub use external_schema::{
110 bind_columns_from_source, get_schema_location, schema_has_schema_registry,
111};
112mod validate;
113pub use validate::validate_compatibility;
114use validate::{SOURCE_ALLOWED_CONNECTION_CONNECTOR, SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY};
115mod additional_column;
116use additional_column::check_and_add_timestamp_column;
117pub use additional_column::handle_addition_columns;
118
119use crate::stream_fragmenter::GraphJobType;
120
121fn non_generated_sql_columns(columns: &[ColumnDef]) -> Vec<ColumnDef> {
122 columns
123 .iter()
124 .filter(|c| !c.is_generated())
125 .cloned()
126 .collect()
127}
128
129fn try_consume_string_from_options(
130 format_encode_options: &mut BTreeMap<String, String>,
131 key: &str,
132) -> Option<AstString> {
133 format_encode_options.remove(key).map(AstString)
134}
135
136fn try_consume_schema_registry_config_from_options(
137 format_encode_options: &mut BTreeMap<String, String>,
138) {
139 [
140 SCHEMA_REGISTRY_USERNAME,
141 SCHEMA_REGISTRY_PASSWORD,
142 SCHEMA_REGISTRY_MAX_DELAY_KEY,
143 SCHEMA_REGISTRY_BACKOFF_DURATION_KEY,
144 SCHEMA_REGISTRY_BACKOFF_FACTOR_KEY,
145 SCHEMA_REGISTRY_RETRIES_MAX_KEY,
146 ]
147 .iter()
148 .for_each(|key| {
149 try_consume_string_from_options(format_encode_options, key);
150 });
151}
152
153fn consume_string_from_options(
154 format_encode_options: &mut BTreeMap<String, String>,
155 key: &str,
156) -> Result<AstString> {
157 try_consume_string_from_options(format_encode_options, key).ok_or(RwError::from(ProtocolError(
158 format!("missing field {} in options", key),
159 )))
160}
161
162fn consume_aws_config_from_options(format_encode_options: &mut BTreeMap<String, String>) {
163 format_encode_options.retain(|key, _| !key.starts_with("aws."))
164}
165
166#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub enum CreateSourceType {
168 SharedCdc,
169 SharedNonCdc,
171 NonShared,
172 Table,
174}
175
176impl CreateSourceType {
177 pub fn for_newly_created(
179 session: &SessionImpl,
180 with_properties: &impl WithPropertiesExt,
181 ) -> Self {
182 if with_properties.is_shareable_cdc_connector() {
183 CreateSourceType::SharedCdc
184 } else if with_properties.is_shareable_non_cdc_connector()
185 && session
186 .env()
187 .streaming_config()
188 .developer
189 .enable_shared_source
190 && session.config().streaming_use_shared_source()
191 {
192 CreateSourceType::SharedNonCdc
193 } else {
194 CreateSourceType::NonShared
195 }
196 }
197
198 pub fn for_replace(catalog: &SourceCatalog) -> Self {
199 if !catalog.info.is_shared() {
200 CreateSourceType::NonShared
201 } else if catalog.with_properties.is_shareable_cdc_connector() {
202 CreateSourceType::SharedCdc
203 } else {
204 CreateSourceType::SharedNonCdc
205 }
206 }
207
208 pub fn is_shared(&self) -> bool {
209 matches!(
210 self,
211 CreateSourceType::SharedCdc | CreateSourceType::SharedNonCdc
212 )
213 }
214}
215
216pub(crate) fn bind_all_columns(
218 format_encode: &FormatEncodeOptions,
219 cols_from_source: Option<Vec<ColumnCatalog>>,
220 cols_from_sql: Vec<ColumnCatalog>,
221 col_defs_from_sql: &[ColumnDef],
222 wildcard_idx: Option<usize>,
223 sql_column_strategy: SqlColumnStrategy,
224) -> Result<Vec<ColumnCatalog>> {
225 if let Some(cols_from_source) = cols_from_source {
226 let generated_cols_from_sql = cols_from_sql
229 .iter()
230 .filter(|c| {
231 col_defs_from_sql
232 .iter()
233 .find(|d| d.name.real_value() == c.name())
234 .unwrap()
235 .is_generated()
236 })
237 .cloned()
238 .collect_vec();
239
240 #[allow(clippy::collapsible_else_if)]
241 match sql_column_strategy {
242 SqlColumnStrategy::FollowUnchecked => {
244 assert!(
245 wildcard_idx.is_none(),
246 "wildcard still exists while strategy is Follows, not correctly purified?"
247 );
248 return Ok(cols_from_sql);
249 }
250
251 SqlColumnStrategy::Ignore => {}
253
254 SqlColumnStrategy::FollowChecked => {
255 let has_regular_cols_from_sql =
256 generated_cols_from_sql.len() != cols_from_sql.len();
257
258 if has_regular_cols_from_sql {
259 if wildcard_idx.is_some() {
260 return Err(RwError::from(NotSupported(
262 "When there's a wildcard (\"*\"), \
263 only generated columns are allowed in user-defined schema from SQL"
264 .to_owned(),
265 "Remove the non-generated columns".to_owned(),
266 )));
267 } else {
268 for col in &cols_from_sql {
271 if generated_cols_from_sql.contains(col) {
272 continue;
273 }
274 let Some(col_from_source) =
275 cols_from_source.iter().find(|c| c.name() == col.name())
276 else {
277 return Err(RwError::from(ProtocolError(format!(
278 "Column \"{}\" is defined in SQL but not found in the source",
279 col.name()
280 ))));
281 };
282
283 if col_from_source.data_type() != col.data_type() {
284 return Err(RwError::from(ProtocolError(format!(
285 "Data type mismatch for column \"{}\". \
286 Defined in SQL as \"{}\", but found in the source as \"{}\"",
287 col.name(),
288 col.data_type(),
289 col_from_source.data_type()
290 ))));
291 }
292 }
293 return Ok(cols_from_sql);
294 }
295 } else {
296 if wildcard_idx.is_some() {
297 } else {
301 notice_to_user("\
309 Neither wildcard (\"*\") nor regular (non-generated) columns appear in the user-defined schema from SQL. \
310 For backward compatibility, all columns from the source will be included at the beginning. \
311 For clarity, consider adding a wildcard (\"*\") to indicate where the columns from the source should be included, \
312 or specifying the columns you want to include from the source.
313 ");
314 }
315 }
316 }
317 }
318
319 let wildcard_idx = wildcard_idx.unwrap_or(0).min(generated_cols_from_sql.len());
324
325 let mut merged_cols = generated_cols_from_sql;
327 let merged_cols_r = merged_cols.split_off(wildcard_idx);
328 merged_cols.extend(cols_from_source);
329 merged_cols.extend(merged_cols_r);
330
331 Ok(merged_cols)
332 } else {
333 if wildcard_idx.is_some() {
334 return Err(RwError::from(NotSupported(
335 "Wildcard in user-defined schema is only allowed when there exists columns from external schema".to_owned(),
336 "Remove the wildcard or use a source with external schema".to_owned(),
337 )));
338 }
339 let non_generated_sql_defined_columns = non_generated_sql_columns(col_defs_from_sql);
340
341 match (&format_encode.format, &format_encode.row_encode) {
342 (Format::DebeziumMongo, Encode::Json) => {
343 let strong_schema = format_encode
344 .row_options
345 .iter()
346 .find(|k| k.name.real_value().to_lowercase() == CDC_MONGODB_STRONG_SCHEMA_KEY)
347 .map(|k| matches!(k.value, SqlOptionValue::Value(Value::Boolean(true))))
348 .unwrap_or(false);
349
350 if strong_schema {
352 let (_, id_column) = non_generated_sql_defined_columns
353 .iter()
354 .enumerate()
355 .find(|(idx, col)| *idx == 0 && col.name.real_value() == "_id")
356 .ok_or_else(|| {
357 RwError::from(ProtocolError(
358 "The `_id` column of the source with row format DebeziumMongoJson must be defined as the first column in SQL".to_owned(),
359 ))
360 })?;
361
362 let id_data_type = bind_data_type(id_column.data_type.as_ref().unwrap())?;
363 if !matches!(
364 id_data_type,
365 DataType::Varchar | DataType::Int32 | DataType::Int64 | DataType::Jsonb
366 ) {
367 return Err(RwError::from(ProtocolError(
368 "the `_id` column of the source with row format DebeziumMongoJson must be [Jsonb | Varchar | Int32 | Int64]".to_owned(),
369 )));
370 }
371
372 let mut columns = Vec::with_capacity(non_generated_sql_defined_columns.len());
373 columns.push(
374 ColumnCatalog {
376 column_desc: ColumnDesc::named("_id", 0.into(), id_data_type),
377 is_hidden: false,
378 },
379 );
380
381 for (idx, col) in non_generated_sql_defined_columns
383 .into_iter()
384 .skip(1)
386 .enumerate()
387 {
388 columns.push(ColumnCatalog {
389 column_desc: ColumnDesc::named(
390 col.name.real_value(),
391 (idx as i32).into(),
392 bind_data_type(col.data_type.as_ref().unwrap())?,
393 ),
394 is_hidden: false,
395 });
396 }
397
398 return Ok(columns);
399 }
400
401 let mut columns = vec![
402 ColumnCatalog {
403 column_desc: ColumnDesc::named("_id", 0.into(), DataType::Varchar),
404 is_hidden: false,
405 },
406 ColumnCatalog {
407 column_desc: ColumnDesc::named("payload", 0.into(), DataType::Jsonb),
408 is_hidden: false,
409 },
410 ];
411
412 if non_generated_sql_defined_columns.len() != 2
413 || non_generated_sql_defined_columns[0].name.real_value() != columns[0].name()
414 || non_generated_sql_defined_columns[1].name.real_value() != columns[1].name()
415 {
416 return Err(RwError::from(ProtocolError(
417 "the not generated columns of the source with row format DebeziumMongoJson
418 must be (_id [Jsonb | Varchar | Int32 | Int64], payload jsonb)."
419 .to_owned(),
420 )));
421 }
422 let key_data_type = bind_data_type(
424 non_generated_sql_defined_columns[0]
425 .data_type
426 .as_ref()
427 .unwrap(),
428 )?;
429 match key_data_type {
430 DataType::Jsonb | DataType::Varchar | DataType::Int32 | DataType::Int64 => {
431 columns[0].column_desc.data_type = key_data_type.clone();
432 }
433 _ => {
434 return Err(RwError::from(ProtocolError(
435 "the `_id` column of the source with row format DebeziumMongoJson
436 must be [Jsonb | Varchar | Int32 | Int64]"
437 .to_owned(),
438 )));
439 }
440 }
441
442 let value_data_type = bind_data_type(
444 non_generated_sql_defined_columns[1]
445 .data_type
446 .as_ref()
447 .unwrap(),
448 )?;
449 if !matches!(value_data_type, DataType::Jsonb) {
450 return Err(RwError::from(ProtocolError(
451 "the `payload` column of the source with row format DebeziumMongoJson
452 must be Jsonb datatype"
453 .to_owned(),
454 )));
455 }
456 Ok(columns)
457 }
458 (Format::Plain, Encode::Bytes) => {
459 let err = Err(RwError::from(ProtocolError(
460 "ENCODE BYTES only accepts one BYTEA type column".to_owned(),
461 )));
462 if non_generated_sql_defined_columns.len() == 1 {
463 let col_data_type = bind_data_type(
465 non_generated_sql_defined_columns[0]
466 .data_type
467 .as_ref()
468 .unwrap(),
469 )?;
470 if col_data_type == DataType::Bytea {
471 Ok(cols_from_sql)
472 } else {
473 err
474 }
475 } else {
476 err
477 }
478 }
479 (_, _) => Ok(cols_from_sql),
480 }
481 }
482}
483
484fn hint_format_encode(format_encode: &FormatEncodeOptions) -> String {
486 format!(
487 r#"Hint: For FORMAT {0} ENCODE {1}, INCLUDE KEY must be specified and the key column must be used as primary key.
488example:
489 CREATE TABLE <table_name> ( PRIMARY KEY ([rw_key | <key_name>]) )
490 INCLUDE KEY [AS <key_name>]
491 WITH (...)
492 FORMAT {0} ENCODE {1}{2}
493"#,
494 format_encode.format,
495 format_encode.row_encode,
496 if format_encode.row_encode == Encode::Json || format_encode.row_encode == Encode::Bytes {
497 "".to_owned()
498 } else {
499 " (...)".to_owned()
500 }
501 )
502}
503
504pub(crate) async fn bind_source_pk(
507 format_encode: &FormatEncodeOptions,
508 source_info: &StreamSourceInfo,
509 columns: &mut [ColumnCatalog],
510 sql_defined_pk_names: Vec<String>,
511 with_properties: &WithOptionsSecResolved,
512) -> Result<Vec<String>> {
513 let sql_defined_pk = !sql_defined_pk_names.is_empty();
514 let include_key_column_name: Option<String> = {
515 columns.iter().find_map(|catalog| {
518 if matches!(
519 catalog.column_desc.additional_column.column_type,
520 Some(AdditionalColumnType::Key(_))
521 ) {
522 Some(catalog.name().to_owned())
523 } else {
524 None
525 }
526 })
527 };
528 let additional_column_names = columns
529 .iter()
530 .filter_map(|col| {
531 if col.column_desc.additional_column.column_type.is_some() {
532 Some(col.name().to_owned())
533 } else {
534 None
535 }
536 })
537 .collect_vec();
538
539 let res = match (&format_encode.format, &format_encode.row_encode) {
540 (Format::Native, Encode::Native) | (Format::None, Encode::None) | (Format::Plain, _) => {
541 sql_defined_pk_names
542 }
543
544 (Format::Upsert, Encode::Json | Encode::Avro | Encode::Protobuf) => {
547 if let Some(ref key_column_name) = include_key_column_name
548 && sql_defined_pk
549 {
550 if sql_defined_pk_names.len() != 1
555 || !key_column_name.eq(sql_defined_pk_names[0].as_str())
556 {
557 return Err(RwError::from(ProtocolError(format!(
558 "Only \"{}\" can be used as primary key\n\n{}",
559 key_column_name,
560 hint_format_encode(format_encode)
561 ))));
562 }
563 sql_defined_pk_names
564 } else {
565 return if let Some(include_key_column_name) = include_key_column_name {
567 Err(RwError::from(ProtocolError(format!(
568 "Primary key must be specified to {}\n\n{}",
569 include_key_column_name,
570 hint_format_encode(format_encode)
571 ))))
572 } else {
573 Err(RwError::from(ProtocolError(format!(
574 "INCLUDE KEY clause not set\n\n{}",
575 hint_format_encode(format_encode)
576 ))))
577 };
578 }
579 }
580
581 (Format::Debezium, Encode::Json) => {
582 if !additional_column_names.is_empty() {
583 return Err(RwError::from(ProtocolError(format!(
584 "FORMAT DEBEZIUM forbids additional columns, but got {:?}",
585 additional_column_names
586 ))));
587 }
588 if !sql_defined_pk {
589 return Err(RwError::from(ProtocolError(
590 "Primary key must be specified when creating source with FORMAT DEBEZIUM."
591 .to_owned(),
592 )));
593 }
594 sql_defined_pk_names
595 }
596 (Format::Debezium, Encode::Avro) => {
597 if !additional_column_names.is_empty() {
598 return Err(RwError::from(ProtocolError(format!(
599 "FORMAT DEBEZIUM forbids additional columns, but got {:?}",
600 additional_column_names
601 ))));
602 }
603 if sql_defined_pk {
604 sql_defined_pk_names
605 } else {
606 let pk_names =
607 extract_debezium_avro_table_pk_columns(source_info, with_properties).await?;
608 for pk_name in &pk_names {
610 columns
611 .iter()
612 .find(|c: &&ColumnCatalog| c.name().eq(pk_name))
613 .ok_or_else(|| {
614 RwError::from(ProtocolError(format!(
615 "avro's key column {} not exists in avro's row schema",
616 pk_name
617 )))
618 })?;
619 }
620 pk_names
621 }
622 }
623 (Format::DebeziumMongo, Encode::Json) => {
624 if sql_defined_pk {
625 sql_defined_pk_names
626 } else {
627 vec!["_id".to_owned()]
628 }
629 }
630
631 (Format::Maxwell, Encode::Json) => {
632 if !additional_column_names.is_empty() {
633 return Err(RwError::from(ProtocolError(format!(
634 "FORMAT MAXWELL forbids additional columns, but got {:?}",
635 additional_column_names
636 ))));
637 }
638 if !sql_defined_pk {
639 return Err(RwError::from(ProtocolError(
640 "Primary key must be specified when creating source with FORMAT MAXWELL ENCODE JSON.".to_owned(),
641 )));
642 }
643 sql_defined_pk_names
644 }
645
646 (Format::Canal, Encode::Json) => {
647 if !additional_column_names.is_empty() {
648 return Err(RwError::from(ProtocolError(format!(
649 "FORMAT CANAL forbids additional columns, but got {:?}",
650 additional_column_names
651 ))));
652 }
653 if !sql_defined_pk {
654 return Err(RwError::from(ProtocolError(
655 "Primary key must be specified when creating source with FORMAT CANAL ENCODE JSON.".to_owned(),
656 )));
657 }
658 sql_defined_pk_names
659 }
660 (format, encoding) => {
661 return Err(RwError::from(ProtocolError(format!(
662 "Unknown combination {:?} {:?}",
663 format, encoding
664 ))));
665 }
666 };
667 Ok(res)
668}
669
670pub(super) fn bind_source_watermark(
671 session: &SessionImpl,
672 name: String,
673 source_watermarks: Vec<SourceWatermark>,
674 column_catalogs: &[ColumnCatalog],
675) -> Result<Vec<WatermarkDesc>> {
676 let mut binder = Binder::new_for_ddl(session);
677 binder.bind_columns_to_context(name.clone(), column_catalogs)?;
678
679 let watermark_descs = source_watermarks
680 .into_iter()
681 .map(|source_watermark| {
682 let col_name = source_watermark.column.real_value();
683 let watermark_idx = binder.get_column_binding_index(name.clone(), &col_name)?;
684
685 let expr = binder.bind_expr(&source_watermark.expr)?;
686 let watermark_col_type = column_catalogs[watermark_idx].data_type();
687 let watermark_expr_type = &expr.return_type();
688 if watermark_col_type != watermark_expr_type {
689 Err(RwError::from(ErrorCode::BindError(
690 format!("The return value type of the watermark expression must be identical to the watermark column data type. Current data type of watermark return value: `{}`, column `{}`",watermark_expr_type, watermark_col_type),
691 )))
692 } else {
693 let expr_proto = expr.to_expr_proto();
694 Ok::<_, RwError>(WatermarkDesc {
695 watermark_idx: watermark_idx as u32,
696 expr: Some(expr_proto),
697 })
698 }
699 })
700 .try_collect()?;
701 Ok(watermark_descs)
702}
703
704pub(super) fn check_format_encode(
710 props: &WithOptionsSecResolved,
711 row_id_index: Option<usize>,
712 columns: &[ColumnCatalog],
713) -> Result<()> {
714 let Some(connector) = props.get_connector() else {
715 return Ok(());
716 };
717
718 if connector == NEXMARK_CONNECTOR {
719 check_nexmark_schema(props, row_id_index, columns)
720 } else {
721 Ok(())
722 }
723}
724
725pub fn bind_connector_props(
726 handler_args: &HandlerArgs,
727 format_encode: &FormatEncodeOptions,
728 is_create_source: bool,
729) -> Result<WithOptions> {
730 let mut with_properties = handler_args.with_options.clone().into_connector_props();
731 validate_compatibility(format_encode, &mut with_properties)?;
732 let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
733
734 if !is_create_source && with_properties.is_shareable_only_cdc_connector() {
735 return Err(RwError::from(ProtocolError(format!(
736 "connector {} does not support `CREATE TABLE`, please use `CREATE SOURCE` instead",
737 with_properties.get_connector().unwrap(),
738 ))));
739 }
740 if is_create_source && create_cdc_source_job {
741 if let Some(value) = with_properties.get(AUTO_SCHEMA_CHANGE_KEY)
742 && value.parse::<bool>().map_err(|_| {
743 ErrorCode::InvalidInputSyntax(format!(
744 "invalid value of '{}' option",
745 AUTO_SCHEMA_CHANGE_KEY
746 ))
747 })?
748 {
749 Feature::CdcAutoSchemaChange.check_available()?;
750 }
751
752 with_properties.insert(CDC_SNAPSHOT_MODE_KEY.into(), CDC_SNAPSHOT_BACKFILL.into());
754 with_properties.insert(CDC_SHARING_MODE_KEY.into(), "true".into());
756 if with_properties.enable_transaction_metadata() {
758 with_properties.insert(CDC_TRANSACTIONAL_KEY.into(), "true".into());
759 }
760 if !with_properties.contains_key(CDC_WAIT_FOR_STREAMING_START_TIMEOUT) {
762 with_properties.insert(
763 CDC_WAIT_FOR_STREAMING_START_TIMEOUT.into(),
764 handler_args
765 .session
766 .config()
767 .cdc_source_wait_streaming_start_timeout()
768 .to_string(),
769 );
770 }
771 }
772 if with_properties.is_mysql_cdc_connector() {
773 with_properties
777 .entry("server.id".to_owned())
778 .or_insert(rand::rng().random_range(1..u32::MAX).to_string());
779 }
780 Ok(with_properties)
781}
782
783pub enum SqlColumnStrategy {
786 FollowUnchecked,
791
792 FollowChecked,
799
800 Ignore,
805}
806
807#[allow(clippy::too_many_arguments)]
810pub async fn bind_create_source_or_table_with_connector(
811 handler_args: HandlerArgs,
812 full_name: ObjectName,
813 format_encode: FormatEncodeOptions,
814 with_properties: WithOptions,
815 sql_columns_defs: &[ColumnDef],
816 constraints: Vec<TableConstraint>,
817 wildcard_idx: Option<usize>,
818 source_watermarks: Vec<SourceWatermark>,
819 columns_from_resolve_source: Option<Vec<ColumnCatalog>>,
820 source_info: StreamSourceInfo,
821 include_column_options: IncludeOption,
822 col_id_gen: &mut ColumnIdGenerator,
823 create_source_type: CreateSourceType,
824 source_rate_limit: Option<u32>,
825 sql_column_strategy: SqlColumnStrategy,
826) -> Result<SourceCatalog> {
827 let session = &handler_args.session;
828 let db_name: &str = &session.database();
829 let (schema_name, source_name) = Binder::resolve_schema_qualified_name(db_name, &full_name)?;
830 let (database_id, schema_id) =
831 session.get_database_and_schema_id_for_create(schema_name.clone())?;
832
833 let is_create_source = create_source_type != CreateSourceType::Table;
834 if !is_create_source && with_properties.is_iceberg_connector() {
835 return Err(ErrorCode::BindError(
836 "can't CREATE TABLE with iceberg connector\n\nHint: use CREATE SOURCE instead"
837 .to_owned(),
838 )
839 .into());
840 }
841
842 if is_create_source {
843 if with_properties.is_batch_connector() {
845 return Err(ErrorCode::BindError(
846 "can't CREATE SOURCE with refreshable batch connector\n\nHint: use CREATE TABLE instead"
847 .to_owned(),
848 )
849 .into());
850 }
851
852 match format_encode.format {
853 Format::Debezium | Format::DebeziumMongo | Format::Maxwell | Format::Canal => {
855 return Err(ErrorCode::BindError(format!(
856 "can't CREATE SOURCE with FORMAT {}.\n\nHint: use CREATE TABLE instead\n\n{}",
857 format_encode.format,
858 hint_format_encode(&format_encode)
859 ))
860 .into());
861 }
862 Format::Upsert => {
864 notice_to_user(format!(
865 "Streaming queries on sources with `FORMAT {}` may have limitations. If your query isn't supported, consider using `CREATE TABLE` instead.",
866 format_encode.format
867 ));
868 }
869 _ => {}
870 }
871 }
872
873 let sql_pk_names = bind_sql_pk_names(sql_columns_defs, bind_table_constraints(&constraints)?)?;
874
875 if with_properties.is_iceberg_connector() && !sql_columns_defs.is_empty() {
879 return Err(RwError::from(InvalidInputSyntax(
880 r#"Schema is automatically inferred for iceberg source and should not be specified
881
882HINT: use `CREATE SOURCE <name> WITH (...)` instead of `CREATE SOURCE <name> (<columns>) WITH (...)`."#.to_owned(),
883 )));
884 }
885 let columns_from_sql = bind_sql_columns(sql_columns_defs, false)?;
886
887 let mut columns = bind_all_columns(
888 &format_encode,
889 columns_from_resolve_source,
890 columns_from_sql,
891 sql_columns_defs,
892 wildcard_idx,
893 sql_column_strategy,
894 )?;
895
896 handle_addition_columns(
898 Some(&format_encode),
899 &with_properties,
900 include_column_options,
901 &mut columns,
902 false,
903 )?;
904
905 if columns.is_empty() {
906 return Err(RwError::from(ProtocolError(
907 "Schema definition is required, either from SQL or schema registry.".to_owned(),
908 )));
909 }
910
911 if is_create_source {
913 check_and_add_timestamp_column(&with_properties, &mut columns);
915
916 if create_source_type == CreateSourceType::SharedNonCdc {
919 let (columns_exist, additional_columns) = source_add_partition_offset_cols(
920 &columns,
921 &with_properties.get_connector().unwrap(),
922 true, );
924 for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) {
925 if !existed {
926 columns.push(ColumnCatalog::hidden(c));
927 }
928 }
929 }
930 }
931
932 let mut with_properties = with_properties;
934 resolve_privatelink_in_with_option(&mut with_properties)?;
935
936 if session
938 .env()
939 .system_params_manager()
940 .get_params()
941 .load()
942 .enforce_secret()
943 && Feature::SecretManagement.check_available().is_ok()
944 {
945 ConnectorProperties::enforce_secret_source(&with_properties)?;
947 }
948
949 let (with_properties, connection_type, connector_conn_ref) =
950 resolve_connection_ref_and_secret_ref(
951 with_properties,
952 session,
953 Some(TelemetryDatabaseObject::Source),
954 )?;
955 ensure_connection_type_allowed(connection_type, &SOURCE_ALLOWED_CONNECTION_CONNECTOR)?;
956
957 if !matches!(connection_type, PbConnectionType::Unspecified) {
959 let Some(connector) = with_properties.get_connector() else {
960 return Err(RwError::from(ProtocolError(format!(
961 "missing field '{}' in WITH clause",
962 UPSTREAM_SOURCE_KEY
963 ))));
964 };
965 check_connector_match_connection_type(connector.as_str(), &connection_type)?;
966 }
967
968 let pk_names = bind_source_pk(
969 &format_encode,
970 &source_info,
971 &mut columns,
972 sql_pk_names,
973 &with_properties,
974 )
975 .await?;
976
977 if let Some(duplicated_name) = columns.iter().map(|c| c.name()).duplicates().next() {
980 return Err(ErrorCode::InvalidInputSyntax(format!(
981 "column \"{}\" specified more than once",
982 duplicated_name
983 ))
984 .into());
985 }
986
987 for c in &mut columns {
989 let original_data_type = c.data_type().clone();
990 col_id_gen.generate(c)?;
991 if is_create_source {
995 c.column_desc.data_type = original_data_type;
996 }
997 }
998 debug_assert_column_ids_distinct(&columns);
999
1000 let (mut columns, pk_col_ids, row_id_index) =
1001 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
1002
1003 let watermark_descs =
1004 bind_source_watermark(session, source_name.clone(), source_watermarks, &columns)?;
1005 assert!(watermark_descs.len() <= 1);
1007
1008 bind_sql_column_constraints(
1009 session,
1010 source_name.clone(),
1011 &mut columns,
1012 sql_columns_defs,
1014 &pk_col_ids,
1015 )?;
1016 check_format_encode(&with_properties, row_id_index, &columns)?;
1017
1018 let definition = handler_args.normalized_sql.clone();
1019
1020 let associated_table_id = if is_create_source {
1021 None
1022 } else {
1023 Some(TableId::placeholder())
1024 };
1025 let source = SourceCatalog {
1026 id: TableId::placeholder().table_id,
1027 name: source_name,
1028 schema_id,
1029 database_id,
1030 columns,
1031 pk_col_ids,
1032 append_only: row_id_index.is_some(),
1033 owner: session.user_id(),
1034 info: source_info,
1035 row_id_index,
1036 with_properties,
1037 watermark_descs,
1038 associated_table_id,
1039 definition,
1040 connection_id: connector_conn_ref,
1041 created_at_epoch: None,
1042 initialized_at_epoch: None,
1043 version: INITIAL_SOURCE_VERSION_ID,
1044 created_at_cluster_version: None,
1045 initialized_at_cluster_version: None,
1046 rate_limit: source_rate_limit,
1047 };
1048 Ok(source)
1049}
1050
1051pub async fn handle_create_source(
1052 mut handler_args: HandlerArgs,
1053 stmt: CreateSourceStatement,
1054) -> Result<RwPgResponse> {
1055 let session = handler_args.session.clone();
1056 let overwrite_options = OverwriteOptions::new(&mut handler_args);
1057
1058 if let Either::Right(resp) = session.check_relation_name_duplicated(
1059 stmt.source_name.clone(),
1060 StatementType::CREATE_SOURCE,
1061 stmt.if_not_exists,
1062 )? {
1063 return Ok(resp);
1064 }
1065
1066 if handler_args.with_options.is_empty() {
1067 return Err(RwError::from(InvalidInputSyntax(
1068 "missing WITH clause".to_owned(),
1069 )));
1070 }
1071
1072 let format_encode = stmt.format_encode.into_v2_with_warning();
1073 let with_properties = bind_connector_props(&handler_args, &format_encode, true)?;
1074
1075 let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
1076 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
1077 &session,
1078 &format_encode,
1079 Either::Left(&with_properties),
1080 create_source_type,
1081 )
1082 .await?;
1083 let mut col_id_gen = ColumnIdGenerator::new_initial();
1084
1085 if stmt.columns.iter().any(|col| {
1086 col.options
1087 .iter()
1088 .any(|def| matches!(def.option, ColumnOption::NotNull))
1089 }) {
1090 return Err(RwError::from(InvalidInputSyntax(
1091 "NOT NULL constraint is not supported in source schema".to_owned(),
1092 )));
1093 }
1094
1095 let source_catalog = bind_create_source_or_table_with_connector(
1096 handler_args.clone(),
1097 stmt.source_name,
1098 format_encode,
1099 with_properties,
1100 &stmt.columns,
1101 stmt.constraints,
1102 stmt.wildcard_idx,
1103 stmt.source_watermarks,
1104 columns_from_resolve_source,
1105 source_info,
1106 stmt.include_column_options,
1107 &mut col_id_gen,
1108 create_source_type,
1109 overwrite_options.source_rate_limit,
1110 SqlColumnStrategy::FollowChecked,
1111 )
1112 .await?;
1113
1114 if stmt.temporary {
1116 if session.get_temporary_source(&source_catalog.name).is_some() {
1117 return Err(CatalogError::duplicated("source", source_catalog.name.clone()).into());
1118 }
1119 session.create_temporary_source(source_catalog);
1120 return Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE));
1121 }
1122
1123 let source = source_catalog.to_prost();
1124
1125 let catalog_writer = session.catalog_writer()?;
1126
1127 if create_source_type.is_shared() {
1128 let graph = generate_stream_graph_for_source(handler_args, source_catalog)?;
1129 catalog_writer
1130 .create_source(source, Some(graph), stmt.if_not_exists)
1131 .await?;
1132 } else {
1133 catalog_writer
1135 .create_source(source, None, stmt.if_not_exists)
1136 .await?;
1137 }
1138
1139 Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE))
1140}
1141
1142pub(super) fn generate_stream_graph_for_source(
1143 handler_args: HandlerArgs,
1144 source_catalog: SourceCatalog,
1145) -> Result<PbStreamFragmentGraph> {
1146 let context = OptimizerContext::from_handler_args(handler_args);
1147 let source_node = LogicalSource::with_catalog(
1148 Rc::new(source_catalog),
1149 SourceNodeKind::CreateSharedSource,
1150 context.into(),
1151 None,
1152 )?;
1153
1154 let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?;
1155 let graph = build_graph(stream_plan, Some(GraphJobType::Source))?;
1156 Ok(graph)
1157}
1158
1159#[cfg(test)]
1160pub mod tests {
1161 use std::collections::HashMap;
1162 use std::sync::Arc;
1163
1164 use risingwave_common::catalog::{
1165 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME,
1166 };
1167 use risingwave_common::types::{DataType, StructType};
1168
1169 use crate::catalog::root_catalog::SchemaPath;
1170 use crate::catalog::source_catalog::SourceCatalog;
1171 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
1172
1173 const GET_COLUMN_FROM_CATALOG: fn(&Arc<SourceCatalog>) -> HashMap<&str, DataType> =
1174 |catalog: &Arc<SourceCatalog>| -> HashMap<&str, DataType> {
1175 catalog
1176 .columns
1177 .iter()
1178 .map(|col| (col.name(), col.data_type().clone()))
1179 .collect::<HashMap<&str, DataType>>()
1180 };
1181
1182 #[tokio::test]
1183 async fn test_create_source_handler() {
1184 let proto_file = create_proto_file(PROTO_FILE_DATA);
1185 let sql = format!(
1186 r#"CREATE SOURCE t
1187 WITH (connector = 'kinesis')
1188 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
1189 proto_file.path().to_str().unwrap()
1190 );
1191 let frontend = LocalFrontend::new(Default::default()).await;
1192 frontend.run_sql(sql).await.unwrap();
1193
1194 let session = frontend.session_ref();
1195 let catalog_reader = session.env().catalog_reader().read_guard();
1196 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1197
1198 let (source, _) = catalog_reader
1200 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
1201 .unwrap();
1202 assert_eq!(source.name, "t");
1203
1204 let columns = GET_COLUMN_FROM_CATALOG(source);
1205
1206 let city_type = StructType::new(vec![
1207 ("address", DataType::Varchar),
1208 ("zipcode", DataType::Varchar),
1209 ])
1210 .into();
1212 let expected_columns = maplit::hashmap! {
1213 ROW_ID_COLUMN_NAME => DataType::Serial,
1214 "id" => DataType::Int32,
1215 "zipcode" => DataType::Int64,
1216 "rate" => DataType::Float32,
1217 "country" => StructType::new(
1218 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
1219 )
1220 .into(),
1222 };
1223 assert_eq!(columns, expected_columns, "{columns:#?}");
1224 }
1225
1226 #[tokio::test]
1227 async fn test_duplicate_props_options() {
1228 let proto_file = create_proto_file(PROTO_FILE_DATA);
1229 let sql = format!(
1230 r#"CREATE SOURCE t
1231 WITH (
1232 connector = 'kinesis',
1233 aws.region='user_test_topic',
1234 endpoint='172.10.1.1:9090,172.10.1.2:9090',
1235 aws.credentials.access_key_id = 'your_access_key_1',
1236 aws.credentials.secret_access_key = 'your_secret_key_1'
1237 )
1238 FORMAT PLAIN ENCODE PROTOBUF (
1239 message = '.test.TestRecord',
1240 aws.credentials.access_key_id = 'your_access_key_2',
1241 aws.credentials.secret_access_key = 'your_secret_key_2',
1242 schema.location = 'file://{}',
1243 )"#,
1244 proto_file.path().to_str().unwrap()
1245 );
1246 let frontend = LocalFrontend::new(Default::default()).await;
1247 frontend.run_sql(sql).await.unwrap();
1248
1249 let session = frontend.session_ref();
1250 let catalog_reader = session.env().catalog_reader().read_guard();
1251 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1252
1253 let (source, _) = catalog_reader
1255 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
1256 .unwrap();
1257 assert_eq!(source.name, "t");
1258
1259 assert_eq!(
1261 source
1262 .info
1263 .format_encode_options
1264 .get("aws.credentials.access_key_id")
1265 .unwrap(),
1266 "your_access_key_2"
1267 );
1268 assert_eq!(
1269 source
1270 .info
1271 .format_encode_options
1272 .get("aws.credentials.secret_access_key")
1273 .unwrap(),
1274 "your_secret_key_2"
1275 );
1276
1277 assert_eq!(
1279 source
1280 .with_properties
1281 .get("aws.credentials.access_key_id")
1282 .unwrap(),
1283 "your_access_key_1"
1284 );
1285 assert_eq!(
1286 source
1287 .with_properties
1288 .get("aws.credentials.secret_access_key")
1289 .unwrap(),
1290 "your_secret_key_1"
1291 );
1292
1293 assert!(!source.with_properties.contains_key("schema.location"));
1295 }
1296
1297 #[tokio::test]
1298 async fn test_multi_table_cdc_create_source_handler() {
1299 let sql =
1300 "CREATE SOURCE t2 WITH (connector = 'mysql-cdc') FORMAT PLAIN ENCODE JSON".to_owned();
1301 let frontend = LocalFrontend::new(Default::default()).await;
1302 let session = frontend.session_ref();
1303
1304 frontend
1305 .run_sql_with_session(session.clone(), sql)
1306 .await
1307 .unwrap();
1308 let catalog_reader = session.env().catalog_reader().read_guard();
1309 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1310
1311 let (source, _) = catalog_reader
1313 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t2")
1314 .unwrap();
1315 assert_eq!(source.name, "t2");
1316
1317 let columns = source
1318 .columns
1319 .iter()
1320 .map(|col| (col.name(), col.data_type().clone()))
1321 .collect::<Vec<(&str, DataType)>>();
1322
1323 expect_test::expect![[r#"
1324 [
1325 (
1326 "payload",
1327 Jsonb,
1328 ),
1329 (
1330 "_rw_offset",
1331 Varchar,
1332 ),
1333 (
1334 "_rw_table_name",
1335 Varchar,
1336 ),
1337 (
1338 "_row_id",
1339 Serial,
1340 ),
1341 ]
1342 "#]]
1343 .assert_debug_eq(&columns);
1344 }
1345
1346 #[tokio::test]
1347 async fn test_source_addition_columns() {
1348 let sql =
1350 "CREATE SOURCE s (v1 int) include key as _rw_kafka_key with (connector = 'kafka') format plain encode json".to_owned();
1351 let frontend = LocalFrontend::new(Default::default()).await;
1352 frontend.run_sql(sql).await.unwrap();
1353 let session = frontend.session_ref();
1354 let catalog_reader = session.env().catalog_reader().read_guard();
1355 let (source, _) = catalog_reader
1356 .get_source_by_name(
1357 DEFAULT_DATABASE_NAME,
1358 SchemaPath::Name(DEFAULT_SCHEMA_NAME),
1359 "s",
1360 )
1361 .unwrap();
1362 assert_eq!(source.name, "s");
1363
1364 let columns = source
1365 .columns
1366 .iter()
1367 .map(|col| (col.name(), col.data_type().clone()))
1368 .collect::<Vec<(&str, DataType)>>();
1369
1370 expect_test::expect![[r#"
1371 [
1372 (
1373 "v1",
1374 Int32,
1375 ),
1376 (
1377 "_rw_kafka_key",
1378 Bytea,
1379 ),
1380 (
1381 "_rw_kafka_timestamp",
1382 Timestamptz,
1383 ),
1384 (
1385 "_rw_kafka_partition",
1386 Varchar,
1387 ),
1388 (
1389 "_rw_kafka_offset",
1390 Varchar,
1391 ),
1392 (
1393 "_row_id",
1394 Serial,
1395 ),
1396 ]
1397 "#]]
1398 .assert_debug_eq(&columns);
1399
1400 let sql =
1401 "CREATE SOURCE s3 (v1 int) include timestamp 'header1' as header_col with (connector = 'kafka') format plain encode json".to_owned();
1402 match frontend.run_sql(sql).await {
1403 Err(e) => {
1404 assert_eq!(
1405 e.to_string(),
1406 "Protocol error: Only header column can have inner field, but got \"timestamp\""
1407 )
1408 }
1409 _ => unreachable!(),
1410 }
1411 }
1412}