1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::LazyLock;
18
19use anyhow::{Context, anyhow};
20use either::Either;
21use external_schema::debezium::extract_debezium_avro_table_pk_columns;
22use external_schema::nexmark::check_nexmark_schema;
23use itertools::Itertools;
24use maplit::{convert_args, hashmap, hashset};
25use pgwire::pg_response::{PgResponse, StatementType};
26use rand::Rng;
27use risingwave_common::array::arrow::{IcebergArrowConvert, arrow_schema_iceberg};
28use risingwave_common::bail_not_implemented;
29use risingwave_common::catalog::{
30 ColumnCatalog, ColumnDesc, ColumnId, INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME,
31 ROW_ID_COLUMN_NAME, TableId, debug_assert_column_ids_distinct,
32};
33use risingwave_common::license::Feature;
34use risingwave_common::secret::LocalSecretManager;
35use risingwave_common::system_param::reader::SystemParamsRead;
36use risingwave_common::types::DataType;
37use risingwave_common::util::iter_util::ZipEqFast;
38use risingwave_connector::parser::additional_columns::{
39 build_additional_column_desc, get_supported_additional_columns,
40 source_add_partition_offset_cols,
41};
42use risingwave_connector::parser::{
43 AvroParserConfig, DEBEZIUM_IGNORE_KEY, DebeziumAvroParserConfig, ProtobufParserConfig,
44 SchemaLocation, SpecificParserConfig, TimestamptzHandling,
45 fetch_json_schema_and_map_to_columns,
46};
47use risingwave_connector::schema::AWS_GLUE_SCHEMA_ARN_KEY;
48use risingwave_connector::schema::schema_registry::{
49 SCHEMA_REGISTRY_BACKOFF_DURATION_KEY, SCHEMA_REGISTRY_BACKOFF_FACTOR_KEY,
50 SCHEMA_REGISTRY_MAX_DELAY_KEY, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_RETRIES_MAX_KEY,
51 SCHEMA_REGISTRY_USERNAME, SchemaRegistryConfig, name_strategy_from_str,
52};
53use risingwave_connector::source::cdc::{
54 CDC_MONGODB_STRONG_SCHEMA_KEY, CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL,
55 CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY, CDC_WAIT_FOR_STREAMING_START_TIMEOUT,
56 CITUS_CDC_CONNECTOR, MONGODB_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR,
57 SQL_SERVER_CDC_CONNECTOR,
58};
59use risingwave_connector::source::datagen::DATAGEN_CONNECTOR;
60use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
61use risingwave_connector::source::nexmark::source::{EventType, get_event_data_types_with_names};
62use risingwave_connector::source::test_source::TEST_CONNECTOR;
63use risingwave_connector::source::{
64 AZBLOB_CONNECTOR, ConnectorProperties, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR,
65 KINESIS_CONNECTOR, LEGACY_S3_CONNECTOR, MQTT_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR,
66 OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR,
67};
68pub use risingwave_connector::source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR};
69use risingwave_connector::{AUTO_SCHEMA_CHANGE_KEY, WithPropertiesExt};
70use risingwave_pb::catalog::connection_params::PbConnectionType;
71use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc};
72use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
73use risingwave_pb::plan_common::source_refresh_mode::{RefreshMode, SourceRefreshModeStreaming};
74use risingwave_pb::plan_common::{EncodeType, FormatType, SourceRefreshMode};
75use risingwave_pb::stream_plan::PbStreamFragmentGraph;
76use risingwave_pb::telemetry::TelemetryDatabaseObject;
77use risingwave_sqlparser::ast::{
78 AstString, ColumnDef, ColumnOption, CreateSourceStatement, Encode, Format, FormatEncodeOptions,
79 ObjectName, SourceWatermark, SqlOptionValue, TableConstraint, Value, get_delimiter,
80};
81use risingwave_sqlparser::parser::{IncludeOption, IncludeOptionItem};
82use thiserror_ext::AsReport;
83
84use super::RwPgResponse;
85use crate::binder::Binder;
86use crate::catalog::CatalogError;
87use crate::catalog::source_catalog::SourceCatalog;
88use crate::error::ErrorCode::{self, Deprecated, InvalidInputSyntax, NotSupported, ProtocolError};
89use crate::error::{Result, RwError};
90use crate::expr::Expr;
91use crate::handler::HandlerArgs;
92use crate::handler::create_table::{
93 ColumnIdGenerator, bind_pk_and_row_id_on_relation, bind_sql_column_constraints,
94 bind_sql_columns, bind_sql_pk_names, bind_table_constraints,
95};
96use crate::handler::util::{
97 SourceSchemaCompatExt, check_connector_match_connection_type, ensure_connection_type_allowed,
98};
99use crate::optimizer::plan_node::generic::SourceNodeKind;
100use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext};
101use crate::session::SessionImpl;
102use crate::session::current::notice_to_user;
103use crate::utils::{
104 OverwriteOptions, resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option,
105 resolve_secret_ref_in_with_options, resolve_source_refresh_mode_in_with_option,
106};
107use crate::{OptimizerContext, WithOptions, WithOptionsSecResolved, bind_data_type, build_graph};
108
109mod external_schema;
110pub use external_schema::{
111 bind_columns_from_source, get_schema_location, schema_has_schema_registry,
112};
113mod validate;
114pub use validate::validate_compatibility;
115use validate::{SOURCE_ALLOWED_CONNECTION_CONNECTOR, SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY};
116mod additional_column;
117use additional_column::check_and_add_timestamp_column;
118pub use additional_column::handle_addition_columns;
119use risingwave_common::catalog::ICEBERG_SOURCE_PREFIX;
120use risingwave_common::id::SourceId;
121
122use crate::stream_fragmenter::GraphJobType;
123
124fn non_generated_sql_columns(columns: &[ColumnDef]) -> Vec<ColumnDef> {
125 columns
126 .iter()
127 .filter(|c| !c.is_generated())
128 .cloned()
129 .collect()
130}
131
132fn try_consume_string_from_options(
133 format_encode_options: &mut BTreeMap<String, String>,
134 key: &str,
135) -> Option<AstString> {
136 format_encode_options.remove(key).map(AstString)
137}
138
139fn try_consume_schema_registry_config_from_options(
140 format_encode_options: &mut BTreeMap<String, String>,
141) {
142 [
143 SCHEMA_REGISTRY_USERNAME,
144 SCHEMA_REGISTRY_PASSWORD,
145 SCHEMA_REGISTRY_MAX_DELAY_KEY,
146 SCHEMA_REGISTRY_BACKOFF_DURATION_KEY,
147 SCHEMA_REGISTRY_BACKOFF_FACTOR_KEY,
148 SCHEMA_REGISTRY_RETRIES_MAX_KEY,
149 ]
150 .iter()
151 .for_each(|key| {
152 try_consume_string_from_options(format_encode_options, key);
153 });
154}
155
156fn consume_string_from_options(
157 format_encode_options: &mut BTreeMap<String, String>,
158 key: &str,
159) -> Result<AstString> {
160 try_consume_string_from_options(format_encode_options, key).ok_or(RwError::from(ProtocolError(
161 format!("missing field {} in options", key),
162 )))
163}
164
165fn consume_aws_config_from_options(format_encode_options: &mut BTreeMap<String, String>) {
166 format_encode_options.retain(|key, _| !key.starts_with("aws."))
167}
168
169#[derive(Debug, Clone, Copy, PartialEq, Eq)]
170pub enum CreateSourceType {
171 SharedCdc,
172 SharedNonCdc,
174 NonShared,
175 Table,
177}
178
179impl CreateSourceType {
180 pub fn for_newly_created(
182 session: &SessionImpl,
183 with_properties: &impl WithPropertiesExt,
184 ) -> Self {
185 if with_properties.is_shareable_cdc_connector() {
186 CreateSourceType::SharedCdc
187 } else if with_properties.is_shareable_non_cdc_connector()
188 && session
189 .env()
190 .streaming_config()
191 .developer
192 .enable_shared_source
193 && session.config().streaming_use_shared_source()
194 {
195 CreateSourceType::SharedNonCdc
196 } else {
197 CreateSourceType::NonShared
198 }
199 }
200
201 pub fn for_replace(catalog: &SourceCatalog) -> Self {
202 if !catalog.info.is_shared() {
203 CreateSourceType::NonShared
204 } else if catalog.with_properties.is_shareable_cdc_connector() {
205 CreateSourceType::SharedCdc
206 } else {
207 CreateSourceType::SharedNonCdc
208 }
209 }
210
211 pub fn is_shared(&self) -> bool {
212 matches!(
213 self,
214 CreateSourceType::SharedCdc | CreateSourceType::SharedNonCdc
215 )
216 }
217}
218
219pub(crate) fn bind_all_columns(
221 format_encode: &FormatEncodeOptions,
222 cols_from_source: Option<Vec<ColumnCatalog>>,
223 cols_from_sql: Vec<ColumnCatalog>,
224 col_defs_from_sql: &[ColumnDef],
225 wildcard_idx: Option<usize>,
226 sql_column_strategy: SqlColumnStrategy,
227) -> Result<Vec<ColumnCatalog>> {
228 if let Some(cols_from_source) = cols_from_source {
229 let generated_cols_from_sql = cols_from_sql
232 .iter()
233 .filter(|c| {
234 col_defs_from_sql
235 .iter()
236 .find(|d| d.name.real_value() == c.name())
237 .unwrap()
238 .is_generated()
239 })
240 .cloned()
241 .collect_vec();
242
243 #[allow(clippy::collapsible_else_if)]
244 match sql_column_strategy {
245 SqlColumnStrategy::FollowUnchecked => {
247 assert!(
248 wildcard_idx.is_none(),
249 "wildcard still exists while strategy is Follows, not correctly purified?"
250 );
251 return Ok(cols_from_sql);
252 }
253
254 SqlColumnStrategy::Ignore => {}
256
257 SqlColumnStrategy::FollowChecked => {
258 let has_regular_cols_from_sql =
259 generated_cols_from_sql.len() != cols_from_sql.len();
260
261 if has_regular_cols_from_sql {
262 if wildcard_idx.is_some() {
263 return Err(RwError::from(NotSupported(
265 "When there's a wildcard (\"*\"), \
266 only generated columns are allowed in user-defined schema from SQL"
267 .to_owned(),
268 "Remove the non-generated columns".to_owned(),
269 )));
270 } else {
271 for col in &cols_from_sql {
274 if generated_cols_from_sql.contains(col) {
275 continue;
276 }
277 let Some(col_from_source) =
278 cols_from_source.iter().find(|c| c.name() == col.name())
279 else {
280 return Err(RwError::from(ProtocolError(format!(
281 "Column \"{}\" is defined in SQL but not found in the source",
282 col.name()
283 ))));
284 };
285
286 if col_from_source.data_type() != col.data_type() {
287 return Err(RwError::from(ProtocolError(format!(
288 "Data type mismatch for column \"{}\". \
289 Defined in SQL as \"{}\", but found in the source as \"{}\"",
290 col.name(),
291 col.data_type(),
292 col_from_source.data_type()
293 ))));
294 }
295 }
296 return Ok(cols_from_sql);
297 }
298 } else {
299 if wildcard_idx.is_some() {
300 } else {
304 notice_to_user("\
312 Neither wildcard (\"*\") nor regular (non-generated) columns appear in the user-defined schema from SQL. \
313 For backward compatibility, all columns from the source will be included at the beginning. \
314 For clarity, consider adding a wildcard (\"*\") to indicate where the columns from the source should be included, \
315 or specifying the columns you want to include from the source.
316 ");
317 }
318 }
319 }
320 }
321
322 let wildcard_idx = wildcard_idx.unwrap_or(0).min(generated_cols_from_sql.len());
327
328 let mut merged_cols = generated_cols_from_sql;
330 let merged_cols_r = merged_cols.split_off(wildcard_idx);
331 merged_cols.extend(cols_from_source);
332 merged_cols.extend(merged_cols_r);
333
334 Ok(merged_cols)
335 } else {
336 if wildcard_idx.is_some() {
337 return Err(RwError::from(NotSupported(
338 "Wildcard in user-defined schema is only allowed when there exists columns from external schema".to_owned(),
339 "Remove the wildcard or use a source with external schema".to_owned(),
340 )));
341 }
342 let non_generated_sql_defined_columns = non_generated_sql_columns(col_defs_from_sql);
343
344 match (&format_encode.format, &format_encode.row_encode) {
345 (Format::DebeziumMongo, Encode::Json) => {
346 let strong_schema = format_encode
347 .row_options
348 .iter()
349 .find(|k| k.name.real_value().to_lowercase() == CDC_MONGODB_STRONG_SCHEMA_KEY)
350 .map(|k| matches!(k.value, SqlOptionValue::Value(Value::Boolean(true))))
351 .unwrap_or(false);
352
353 if strong_schema {
355 let (_, id_column) = non_generated_sql_defined_columns
356 .iter()
357 .enumerate()
358 .find(|(idx, col)| *idx == 0 && col.name.real_value() == "_id")
359 .ok_or_else(|| {
360 RwError::from(ProtocolError(
361 "The `_id` column of the source with row format DebeziumMongoJson must be defined as the first column in SQL".to_owned(),
362 ))
363 })?;
364
365 let id_data_type = bind_data_type(id_column.data_type.as_ref().unwrap())?;
366 if !matches!(
367 id_data_type,
368 DataType::Varchar | DataType::Int32 | DataType::Int64 | DataType::Jsonb
369 ) {
370 return Err(RwError::from(ProtocolError(
371 "the `_id` column of the source with row format DebeziumMongoJson must be [Jsonb | Varchar | Int32 | Int64]".to_owned(),
372 )));
373 }
374
375 let mut columns = Vec::with_capacity(non_generated_sql_defined_columns.len());
376 columns.push(
377 ColumnCatalog {
379 column_desc: ColumnDesc::named("_id", 0.into(), id_data_type),
380 is_hidden: false,
381 },
382 );
383
384 for (idx, col) in non_generated_sql_defined_columns
386 .into_iter()
387 .skip(1)
389 .enumerate()
390 {
391 columns.push(ColumnCatalog {
392 column_desc: ColumnDesc::named(
393 col.name.real_value(),
394 (idx as i32).into(),
395 bind_data_type(col.data_type.as_ref().unwrap())?,
396 ),
397 is_hidden: false,
398 });
399 }
400
401 return Ok(columns);
402 }
403
404 let mut columns = vec![
405 ColumnCatalog {
406 column_desc: ColumnDesc::named("_id", 0.into(), DataType::Varchar),
407 is_hidden: false,
408 },
409 ColumnCatalog {
410 column_desc: ColumnDesc::named("payload", 0.into(), DataType::Jsonb),
411 is_hidden: false,
412 },
413 ];
414
415 if non_generated_sql_defined_columns.len() != 2
416 || non_generated_sql_defined_columns[0].name.real_value() != columns[0].name()
417 || non_generated_sql_defined_columns[1].name.real_value() != columns[1].name()
418 {
419 return Err(RwError::from(ProtocolError(
420 "the not generated columns of the source with row format DebeziumMongoJson
421 must be (_id [Jsonb | Varchar | Int32 | Int64], payload jsonb)."
422 .to_owned(),
423 )));
424 }
425 let key_data_type = bind_data_type(
427 non_generated_sql_defined_columns[0]
428 .data_type
429 .as_ref()
430 .unwrap(),
431 )?;
432 match key_data_type {
433 DataType::Jsonb | DataType::Varchar | DataType::Int32 | DataType::Int64 => {
434 columns[0].column_desc.data_type = key_data_type;
435 }
436 _ => {
437 return Err(RwError::from(ProtocolError(
438 "the `_id` column of the source with row format DebeziumMongoJson
439 must be [Jsonb | Varchar | Int32 | Int64]"
440 .to_owned(),
441 )));
442 }
443 }
444
445 let value_data_type = bind_data_type(
447 non_generated_sql_defined_columns[1]
448 .data_type
449 .as_ref()
450 .unwrap(),
451 )?;
452 if !matches!(value_data_type, DataType::Jsonb) {
453 return Err(RwError::from(ProtocolError(
454 "the `payload` column of the source with row format DebeziumMongoJson
455 must be Jsonb datatype"
456 .to_owned(),
457 )));
458 }
459 Ok(columns)
460 }
461 (Format::Plain, Encode::Bytes) => {
462 let err = Err(RwError::from(ProtocolError(
463 "ENCODE BYTES only accepts one BYTEA type column".to_owned(),
464 )));
465 if non_generated_sql_defined_columns.len() == 1 {
466 let col_data_type = bind_data_type(
468 non_generated_sql_defined_columns[0]
469 .data_type
470 .as_ref()
471 .unwrap(),
472 )?;
473 if col_data_type == DataType::Bytea {
474 Ok(cols_from_sql)
475 } else {
476 err
477 }
478 } else {
479 err
480 }
481 }
482 (_, _) => Ok(cols_from_sql),
483 }
484 }
485}
486
487fn hint_format_encode(format_encode: &FormatEncodeOptions) -> String {
489 format!(
490 r#"Hint: For FORMAT {0} ENCODE {1}, INCLUDE KEY must be specified and the key column must be used as primary key.
491example:
492 CREATE TABLE <table_name> ( PRIMARY KEY ([rw_key | <key_name>]) )
493 INCLUDE KEY [AS <key_name>]
494 WITH (...)
495 FORMAT {0} ENCODE {1}{2}
496"#,
497 format_encode.format,
498 format_encode.row_encode,
499 if format_encode.row_encode == Encode::Json || format_encode.row_encode == Encode::Bytes {
500 "".to_owned()
501 } else {
502 " (...)".to_owned()
503 }
504 )
505}
506
507pub(crate) async fn bind_source_pk(
510 format_encode: &FormatEncodeOptions,
511 source_info: &StreamSourceInfo,
512 columns: &mut [ColumnCatalog],
513 sql_defined_pk_names: Vec<String>,
514 with_properties: &WithOptionsSecResolved,
515) -> Result<Vec<String>> {
516 let sql_defined_pk = !sql_defined_pk_names.is_empty();
517 let include_key_column_name: Option<String> = {
518 columns.iter().find_map(|catalog| {
521 if matches!(
522 catalog.column_desc.additional_column.column_type,
523 Some(AdditionalColumnType::Key(_))
524 ) {
525 Some(catalog.name().to_owned())
526 } else {
527 None
528 }
529 })
530 };
531 let additional_column_names = columns
532 .iter()
533 .filter_map(|col| {
534 if col.column_desc.additional_column.column_type.is_some() {
535 Some(col.name().to_owned())
536 } else {
537 None
538 }
539 })
540 .collect_vec();
541
542 let res = match (&format_encode.format, &format_encode.row_encode) {
543 (Format::Native, Encode::Native) | (Format::None, Encode::None) | (Format::Plain, _) => {
544 sql_defined_pk_names
545 }
546
547 (Format::Upsert, Encode::Json | Encode::Avro | Encode::Protobuf) => {
550 if let Some(ref key_column_name) = include_key_column_name
551 && sql_defined_pk
552 {
553 if sql_defined_pk_names.len() != 1
558 || !key_column_name.eq(sql_defined_pk_names[0].as_str())
559 {
560 return Err(RwError::from(ProtocolError(format!(
561 "Only \"{}\" can be used as primary key\n\n{}",
562 key_column_name,
563 hint_format_encode(format_encode)
564 ))));
565 }
566 sql_defined_pk_names
567 } else {
568 return if let Some(include_key_column_name) = include_key_column_name {
570 Err(RwError::from(ProtocolError(format!(
571 "Primary key must be specified to {}\n\n{}",
572 include_key_column_name,
573 hint_format_encode(format_encode)
574 ))))
575 } else {
576 Err(RwError::from(ProtocolError(format!(
577 "INCLUDE KEY clause not set\n\n{}",
578 hint_format_encode(format_encode)
579 ))))
580 };
581 }
582 }
583
584 (Format::Debezium, Encode::Json) => {
585 if !additional_column_names.is_empty() {
586 return Err(RwError::from(ProtocolError(format!(
587 "FORMAT DEBEZIUM forbids additional columns, but got {:?}",
588 additional_column_names
589 ))));
590 }
591 if !sql_defined_pk {
592 return Err(RwError::from(ProtocolError(
593 "Primary key must be specified when creating source with FORMAT DEBEZIUM."
594 .to_owned(),
595 )));
596 }
597 sql_defined_pk_names
598 }
599 (Format::Debezium, Encode::Avro) => {
600 if !additional_column_names.is_empty() {
601 return Err(RwError::from(ProtocolError(format!(
602 "FORMAT DEBEZIUM forbids additional columns, but got {:?}",
603 additional_column_names
604 ))));
605 }
606 if sql_defined_pk {
607 sql_defined_pk_names
608 } else {
609 let pk_names =
610 extract_debezium_avro_table_pk_columns(source_info, with_properties).await?;
611 for pk_name in &pk_names {
613 columns
614 .iter()
615 .find(|c: &&ColumnCatalog| c.name().eq(pk_name))
616 .ok_or_else(|| {
617 RwError::from(ProtocolError(format!(
618 "avro's key column {} not exists in avro's row schema",
619 pk_name
620 )))
621 })?;
622 }
623 pk_names
624 }
625 }
626 (Format::DebeziumMongo, Encode::Json) => {
627 if sql_defined_pk {
628 sql_defined_pk_names
629 } else {
630 vec!["_id".to_owned()]
631 }
632 }
633
634 (Format::Maxwell, Encode::Json) => {
635 if !additional_column_names.is_empty() {
636 return Err(RwError::from(ProtocolError(format!(
637 "FORMAT MAXWELL forbids additional columns, but got {:?}",
638 additional_column_names
639 ))));
640 }
641 if !sql_defined_pk {
642 return Err(RwError::from(ProtocolError(
643 "Primary key must be specified when creating source with FORMAT MAXWELL ENCODE JSON.".to_owned(),
644 )));
645 }
646 sql_defined_pk_names
647 }
648
649 (Format::Canal, Encode::Json) => {
650 if !additional_column_names.is_empty() {
651 return Err(RwError::from(ProtocolError(format!(
652 "FORMAT CANAL forbids additional columns, but got {:?}",
653 additional_column_names
654 ))));
655 }
656 if !sql_defined_pk {
657 return Err(RwError::from(ProtocolError(
658 "Primary key must be specified when creating source with FORMAT CANAL ENCODE JSON.".to_owned(),
659 )));
660 }
661 sql_defined_pk_names
662 }
663 (format, encoding) => {
664 return Err(RwError::from(ProtocolError(format!(
665 "Unknown combination {:?} {:?}",
666 format, encoding
667 ))));
668 }
669 };
670 Ok(res)
671}
672
673pub(super) fn bind_source_watermark(
674 session: &SessionImpl,
675 name: String,
676 source_watermarks: Vec<SourceWatermark>,
677 column_catalogs: &[ColumnCatalog],
678) -> Result<Vec<WatermarkDesc>> {
679 let mut binder = Binder::new_for_ddl(session);
680 binder.bind_columns_to_context(name.clone(), column_catalogs)?;
681
682 let watermark_descs = source_watermarks
683 .into_iter()
684 .map(|source_watermark| {
685 let col_name = source_watermark.column.real_value();
686 let watermark_idx = binder.get_column_binding_index(name.clone(), &col_name)?;
687
688 let expr = binder.bind_expr(&source_watermark.expr)?;
689 let watermark_col_type = column_catalogs[watermark_idx].data_type();
690 let watermark_expr_type = &expr.return_type();
691 if watermark_col_type != watermark_expr_type {
692 Err(RwError::from(ErrorCode::BindError(
693 format!("The return value type of the watermark expression must be identical to the watermark column data type. Current data type of watermark return value: `{}`, column `{}`",watermark_expr_type, watermark_col_type),
694 )))
695 } else {
696 let expr_proto = expr.to_expr_proto();
697 Ok::<_, RwError>(WatermarkDesc {
698 watermark_idx: watermark_idx as u32,
699 expr: Some(expr_proto),
700 })
701 }
702 })
703 .try_collect()?;
704 Ok(watermark_descs)
705}
706
707pub(super) fn check_format_encode(
713 props: &WithOptionsSecResolved,
714 row_id_index: Option<usize>,
715 columns: &[ColumnCatalog],
716) -> Result<()> {
717 let Some(connector) = props.get_connector() else {
718 return Ok(());
719 };
720
721 if connector == NEXMARK_CONNECTOR {
722 check_nexmark_schema(props, row_id_index, columns)
723 } else {
724 Ok(())
725 }
726}
727
728pub fn bind_connector_props(
729 handler_args: &HandlerArgs,
730 format_encode: &FormatEncodeOptions,
731 is_create_source: bool,
732) -> Result<(WithOptions, SourceRefreshMode)> {
733 let mut with_properties = handler_args.with_options.clone().into_connector_props();
734 validate_compatibility(format_encode, &mut with_properties)?;
735 let refresh_mode = {
736 let refresh_mode = resolve_source_refresh_mode_in_with_option(&mut with_properties)?;
737 if is_create_source && refresh_mode.is_some() {
738 return Err(RwError::from(ProtocolError(
739 "`refresh_mode` only supported for CREATE TABLE".to_owned(),
740 )));
741 }
742
743 refresh_mode.unwrap_or(SourceRefreshMode {
744 refresh_mode: Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})),
745 })
746 };
747
748 let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
749
750 if !is_create_source && with_properties.is_shareable_only_cdc_connector() {
751 return Err(RwError::from(ProtocolError(format!(
752 "connector {} does not support `CREATE TABLE`, please use `CREATE SOURCE` instead",
753 with_properties.get_connector().unwrap(),
754 ))));
755 }
756 if is_create_source && create_cdc_source_job {
757 if let Some(value) = with_properties.get(AUTO_SCHEMA_CHANGE_KEY)
758 && value.parse::<bool>().map_err(|_| {
759 ErrorCode::InvalidInputSyntax(format!(
760 "invalid value of '{}' option",
761 AUTO_SCHEMA_CHANGE_KEY
762 ))
763 })?
764 {
765 Feature::CdcAutoSchemaChange.check_available()?;
766 }
767
768 with_properties.insert(CDC_SNAPSHOT_MODE_KEY.into(), CDC_SNAPSHOT_BACKFILL.into());
770 with_properties.insert(CDC_SHARING_MODE_KEY.into(), "true".into());
772 if with_properties.enable_transaction_metadata() {
774 with_properties.insert(CDC_TRANSACTIONAL_KEY.into(), "true".into());
775 }
776 if !with_properties.contains_key(CDC_WAIT_FOR_STREAMING_START_TIMEOUT) {
778 with_properties.insert(
779 CDC_WAIT_FOR_STREAMING_START_TIMEOUT.into(),
780 handler_args
781 .session
782 .config()
783 .cdc_source_wait_streaming_start_timeout()
784 .to_string(),
785 );
786 }
787 }
788 if with_properties.is_mysql_cdc_connector() {
789 with_properties
793 .entry("server.id".to_owned())
794 .or_insert(rand::rng().random_range(1..u32::MAX).to_string());
795 }
796 Ok((with_properties, refresh_mode))
797}
798
799pub enum SqlColumnStrategy {
802 FollowUnchecked,
807
808 FollowChecked,
815
816 Ignore,
821}
822
823#[allow(clippy::too_many_arguments)]
826pub async fn bind_create_source_or_table_with_connector(
827 handler_args: HandlerArgs,
828 full_name: ObjectName,
829 format_encode: FormatEncodeOptions,
830 with_properties: WithOptions,
831 sql_columns_defs: &[ColumnDef],
832 constraints: Vec<TableConstraint>,
833 wildcard_idx: Option<usize>,
834 source_watermarks: Vec<SourceWatermark>,
835 columns_from_resolve_source: Option<Vec<ColumnCatalog>>,
836 source_info: StreamSourceInfo,
837 include_column_options: IncludeOption,
838 col_id_gen: &mut ColumnIdGenerator,
839 create_source_type: CreateSourceType,
840 source_rate_limit: Option<u32>,
841 sql_column_strategy: SqlColumnStrategy,
842 refresh_mode: SourceRefreshMode,
843) -> Result<SourceCatalog> {
844 let session = &handler_args.session;
845 let db_name: &str = &session.database();
846 let (schema_name, source_name) = Binder::resolve_schema_qualified_name(db_name, &full_name)?;
847 let (database_id, schema_id) =
848 session.get_database_and_schema_id_for_create(schema_name.clone())?;
849
850 let is_create_source = create_source_type != CreateSourceType::Table;
851
852 if is_create_source {
853 if with_properties.is_batch_connector() {
855 return Err(ErrorCode::BindError(
856 "can't CREATE SOURCE with refreshable batch connector\n\nHint: use CREATE TABLE instead"
857 .to_owned(),
858 )
859 .into());
860 }
861
862 match format_encode.format {
863 Format::Debezium | Format::DebeziumMongo | Format::Maxwell | Format::Canal => {
865 return Err(ErrorCode::BindError(format!(
866 "can't CREATE SOURCE with FORMAT {}.\n\nHint: use CREATE TABLE instead\n\n{}",
867 format_encode.format,
868 hint_format_encode(&format_encode)
869 ))
870 .into());
871 }
872 Format::Upsert => {
874 notice_to_user(format!(
875 "Streaming queries on sources with `FORMAT {}` may have limitations. If your query isn't supported, consider using `CREATE TABLE` instead.",
876 format_encode.format
877 ));
878 }
879 _ => {}
880 }
881 }
882
883 let sql_pk_names = bind_sql_pk_names(sql_columns_defs, bind_table_constraints(&constraints)?)?;
884
885 if with_properties.is_iceberg_connector() && !sql_columns_defs.is_empty() {
889 return Err(RwError::from(InvalidInputSyntax(
890 r#"Schema is automatically inferred for iceberg source and should not be specified
891
892HINT: use `CREATE SOURCE <name> WITH (...)` instead of `CREATE SOURCE <name> (<columns>) WITH (...)`."#.to_owned(),
893 )));
894 }
895 let columns_from_sql = bind_sql_columns(sql_columns_defs, false)?;
896
897 let mut columns = bind_all_columns(
898 &format_encode,
899 columns_from_resolve_source,
900 columns_from_sql,
901 sql_columns_defs,
902 wildcard_idx,
903 sql_column_strategy,
904 )?;
905
906 handle_addition_columns(
908 Some(&format_encode),
909 &with_properties,
910 include_column_options,
911 &mut columns,
912 false,
913 )?;
914
915 if columns.is_empty() {
916 return Err(RwError::from(ProtocolError(
917 "Schema definition is required, either from SQL or schema registry.".to_owned(),
918 )));
919 }
920
921 if is_create_source {
923 check_and_add_timestamp_column(&with_properties, &mut columns);
925
926 if create_source_type == CreateSourceType::SharedNonCdc {
929 let (columns_exist, additional_columns) = source_add_partition_offset_cols(
930 &columns,
931 &with_properties.get_connector().unwrap(),
932 true, );
934 for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) {
935 if !existed {
936 columns.push(ColumnCatalog::hidden(c));
937 }
938 }
939 }
940 }
941
942 let mut with_properties = with_properties;
944 resolve_privatelink_in_with_option(&mut with_properties)?;
945
946 if session
948 .env()
949 .system_params_manager()
950 .get_params()
951 .load()
952 .enforce_secret()
953 && Feature::SecretManagement.check_available().is_ok()
954 {
955 ConnectorProperties::enforce_secret_source(&with_properties)?;
957 }
958
959 let (with_properties, connection_type, connector_conn_ref) =
960 resolve_connection_ref_and_secret_ref(
961 with_properties,
962 session,
963 Some(TelemetryDatabaseObject::Source),
964 )?;
965 ensure_connection_type_allowed(connection_type, &SOURCE_ALLOWED_CONNECTION_CONNECTOR)?;
966
967 if !matches!(connection_type, PbConnectionType::Unspecified) {
969 let Some(connector) = with_properties.get_connector() else {
970 return Err(RwError::from(ProtocolError(format!(
971 "missing field '{}' in WITH clause",
972 UPSTREAM_SOURCE_KEY
973 ))));
974 };
975 check_connector_match_connection_type(connector.as_str(), &connection_type)?;
976 }
977
978 let pk_names = bind_source_pk(
979 &format_encode,
980 &source_info,
981 &mut columns,
982 sql_pk_names,
983 &with_properties,
984 )
985 .await?;
986
987 if let Some(duplicated_name) = columns.iter().map(|c| c.name()).duplicates().next() {
990 return Err(ErrorCode::InvalidInputSyntax(format!(
991 "column \"{}\" specified more than once",
992 duplicated_name
993 ))
994 .into());
995 }
996
997 for c in &mut columns {
999 let original_data_type = c.data_type().clone();
1000 col_id_gen.generate(c)?;
1001 if is_create_source {
1005 c.column_desc.data_type = original_data_type;
1006 }
1007 }
1008 debug_assert_column_ids_distinct(&columns);
1009
1010 let (mut columns, pk_col_ids, row_id_index) =
1011 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
1012
1013 let watermark_descs =
1014 bind_source_watermark(session, source_name.clone(), source_watermarks, &columns)?;
1015 assert!(watermark_descs.len() <= 1);
1017
1018 bind_sql_column_constraints(
1019 session,
1020 source_name.clone(),
1021 &mut columns,
1022 sql_columns_defs,
1024 &pk_col_ids,
1025 )?;
1026 check_format_encode(&with_properties, row_id_index, &columns)?;
1027
1028 let definition = handler_args.normalized_sql.clone();
1029
1030 let associated_table_id = if is_create_source {
1031 None
1032 } else {
1033 Some(TableId::placeholder())
1034 };
1035 let source = SourceCatalog {
1036 id: SourceId::placeholder(),
1037 name: source_name,
1038 schema_id,
1039 database_id,
1040 columns,
1041 pk_col_ids,
1042 append_only: row_id_index.is_some(),
1043 owner: session.user_id(),
1044 info: source_info,
1045 row_id_index,
1046 with_properties,
1047 watermark_descs,
1048 associated_table_id,
1049 definition,
1050 connection_id: connector_conn_ref,
1051 created_at_epoch: None,
1052 initialized_at_epoch: None,
1053 version: INITIAL_SOURCE_VERSION_ID,
1054 created_at_cluster_version: None,
1055 initialized_at_cluster_version: None,
1056 rate_limit: source_rate_limit,
1057 refresh_mode: Some(refresh_mode),
1058 };
1059 Ok(source)
1060}
1061
1062pub async fn handle_create_source(
1063 mut handler_args: HandlerArgs,
1064 stmt: CreateSourceStatement,
1065) -> Result<RwPgResponse> {
1066 let session = handler_args.session.clone();
1067 let overwrite_options = OverwriteOptions::new(&mut handler_args);
1068
1069 if let Either::Right(resp) = session.check_relation_name_duplicated(
1070 stmt.source_name.clone(),
1071 StatementType::CREATE_SOURCE,
1072 stmt.if_not_exists,
1073 )? {
1074 return Ok(resp);
1075 }
1076
1077 if stmt
1078 .source_name
1079 .base_name()
1080 .starts_with(ICEBERG_SOURCE_PREFIX)
1081 {
1082 return Err(RwError::from(InvalidInputSyntax(format!(
1083 "Source name cannot start with reserved prefix '{}'",
1084 ICEBERG_SOURCE_PREFIX
1085 ))));
1086 }
1087
1088 if handler_args.with_options.is_empty() {
1089 return Err(RwError::from(InvalidInputSyntax(
1090 "missing WITH clause".to_owned(),
1091 )));
1092 }
1093
1094 let format_encode = stmt.format_encode.into_v2_with_warning();
1095 let (with_properties, refresh_mode) =
1096 bind_connector_props(&handler_args, &format_encode, true)?;
1097
1098 let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
1099 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
1100 &session,
1101 &format_encode,
1102 Either::Left(&with_properties),
1103 create_source_type,
1104 )
1105 .await?;
1106 let mut col_id_gen = ColumnIdGenerator::new_initial();
1107
1108 if stmt.columns.iter().any(|col| {
1109 col.options
1110 .iter()
1111 .any(|def| matches!(def.option, ColumnOption::NotNull))
1112 }) {
1113 return Err(RwError::from(InvalidInputSyntax(
1114 "NOT NULL constraint is not supported in source schema".to_owned(),
1115 )));
1116 }
1117
1118 let source_catalog = bind_create_source_or_table_with_connector(
1119 handler_args.clone(),
1120 stmt.source_name,
1121 format_encode,
1122 with_properties,
1123 &stmt.columns,
1124 stmt.constraints,
1125 stmt.wildcard_idx,
1126 stmt.source_watermarks,
1127 columns_from_resolve_source,
1128 source_info,
1129 stmt.include_column_options,
1130 &mut col_id_gen,
1131 create_source_type,
1132 overwrite_options.source_rate_limit,
1133 SqlColumnStrategy::FollowChecked,
1134 refresh_mode,
1135 )
1136 .await?;
1137
1138 if stmt.temporary {
1140 if session.get_temporary_source(&source_catalog.name).is_some() {
1141 return Err(CatalogError::duplicated("source", source_catalog.name.clone()).into());
1142 }
1143 session.create_temporary_source(source_catalog);
1144 return Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE));
1145 }
1146
1147 let source = source_catalog.to_prost();
1148
1149 let catalog_writer = session.catalog_writer()?;
1150
1151 if create_source_type.is_shared() {
1152 let graph = generate_stream_graph_for_source(handler_args, source_catalog)?;
1153 catalog_writer
1154 .create_source(source, Some(graph), stmt.if_not_exists)
1155 .await?;
1156 } else {
1157 catalog_writer
1159 .create_source(source, None, stmt.if_not_exists)
1160 .await?;
1161 }
1162
1163 Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE))
1164}
1165
1166pub(super) fn generate_stream_graph_for_source(
1167 handler_args: HandlerArgs,
1168 source_catalog: SourceCatalog,
1169) -> Result<PbStreamFragmentGraph> {
1170 let context = OptimizerContext::from_handler_args(handler_args);
1171 let source_node = LogicalSource::with_catalog(
1172 Rc::new(source_catalog),
1173 SourceNodeKind::CreateSharedSource,
1174 context.into(),
1175 None,
1176 )?;
1177
1178 let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?;
1179 let graph = build_graph(stream_plan, Some(GraphJobType::Source))?;
1180 Ok(graph)
1181}
1182
1183#[cfg(test)]
1184pub mod tests {
1185 use std::collections::HashMap;
1186 use std::sync::Arc;
1187
1188 use risingwave_common::catalog::{
1189 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME,
1190 };
1191 use risingwave_common::types::{DataType, StructType};
1192
1193 use crate::catalog::root_catalog::SchemaPath;
1194 use crate::catalog::source_catalog::SourceCatalog;
1195 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
1196
1197 const GET_COLUMN_FROM_CATALOG: fn(&Arc<SourceCatalog>) -> HashMap<&str, DataType> =
1198 |catalog: &Arc<SourceCatalog>| -> HashMap<&str, DataType> {
1199 catalog
1200 .columns
1201 .iter()
1202 .map(|col| (col.name(), col.data_type().clone()))
1203 .collect::<HashMap<&str, DataType>>()
1204 };
1205
1206 #[tokio::test]
1207 async fn test_create_source_handler() {
1208 let proto_file = create_proto_file(PROTO_FILE_DATA);
1209 let sql = format!(
1210 r#"CREATE SOURCE t
1211 WITH (connector = 'kinesis')
1212 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
1213 proto_file.path().to_str().unwrap()
1214 );
1215 let frontend = LocalFrontend::new(Default::default()).await;
1216 frontend.run_sql(sql).await.unwrap();
1217
1218 let session = frontend.session_ref();
1219 let catalog_reader = session.env().catalog_reader().read_guard();
1220 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1221
1222 let (source, _) = catalog_reader
1224 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
1225 .unwrap();
1226 assert_eq!(source.name, "t");
1227
1228 let columns = GET_COLUMN_FROM_CATALOG(source);
1229
1230 let city_type = StructType::new(vec![
1231 ("address", DataType::Varchar),
1232 ("zipcode", DataType::Varchar),
1233 ])
1234 .into();
1236 let expected_columns = maplit::hashmap! {
1237 ROW_ID_COLUMN_NAME => DataType::Serial,
1238 "id" => DataType::Int32,
1239 "zipcode" => DataType::Int64,
1240 "rate" => DataType::Float32,
1241 "country" => StructType::new(
1242 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
1243 )
1244 .into(),
1246 };
1247 assert_eq!(columns, expected_columns, "{columns:#?}");
1248 }
1249
1250 #[tokio::test]
1251 async fn test_duplicate_props_options() {
1252 let proto_file = create_proto_file(PROTO_FILE_DATA);
1253 let sql = format!(
1254 r#"CREATE SOURCE t
1255 WITH (
1256 connector = 'kinesis',
1257 aws.region='user_test_topic',
1258 endpoint='172.10.1.1:9090,172.10.1.2:9090',
1259 aws.credentials.access_key_id = 'your_access_key_1',
1260 aws.credentials.secret_access_key = 'your_secret_key_1'
1261 )
1262 FORMAT PLAIN ENCODE PROTOBUF (
1263 message = '.test.TestRecord',
1264 aws.credentials.access_key_id = 'your_access_key_2',
1265 aws.credentials.secret_access_key = 'your_secret_key_2',
1266 schema.location = 'file://{}',
1267 )"#,
1268 proto_file.path().to_str().unwrap()
1269 );
1270 let frontend = LocalFrontend::new(Default::default()).await;
1271 frontend.run_sql(sql).await.unwrap();
1272
1273 let session = frontend.session_ref();
1274 let catalog_reader = session.env().catalog_reader().read_guard();
1275 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1276
1277 let (source, _) = catalog_reader
1279 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
1280 .unwrap();
1281 assert_eq!(source.name, "t");
1282
1283 assert_eq!(
1285 source
1286 .info
1287 .format_encode_options
1288 .get("aws.credentials.access_key_id")
1289 .unwrap(),
1290 "your_access_key_2"
1291 );
1292 assert_eq!(
1293 source
1294 .info
1295 .format_encode_options
1296 .get("aws.credentials.secret_access_key")
1297 .unwrap(),
1298 "your_secret_key_2"
1299 );
1300
1301 assert_eq!(
1303 source
1304 .with_properties
1305 .get("aws.credentials.access_key_id")
1306 .unwrap(),
1307 "your_access_key_1"
1308 );
1309 assert_eq!(
1310 source
1311 .with_properties
1312 .get("aws.credentials.secret_access_key")
1313 .unwrap(),
1314 "your_secret_key_1"
1315 );
1316
1317 assert!(!source.with_properties.contains_key("schema.location"));
1319 }
1320
1321 #[tokio::test]
1322 async fn test_multi_table_cdc_create_source_handler() {
1323 let sql =
1324 "CREATE SOURCE t2 WITH (connector = 'mysql-cdc') FORMAT PLAIN ENCODE JSON".to_owned();
1325 let frontend = LocalFrontend::new(Default::default()).await;
1326 let session = frontend.session_ref();
1327
1328 frontend
1329 .run_sql_with_session(session.clone(), sql)
1330 .await
1331 .unwrap();
1332 let catalog_reader = session.env().catalog_reader().read_guard();
1333 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1334
1335 let (source, _) = catalog_reader
1337 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t2")
1338 .unwrap();
1339 assert_eq!(source.name, "t2");
1340
1341 let columns = source
1342 .columns
1343 .iter()
1344 .map(|col| (col.name(), col.data_type().clone()))
1345 .collect::<Vec<(&str, DataType)>>();
1346
1347 expect_test::expect![[r#"
1348 [
1349 (
1350 "payload",
1351 Jsonb,
1352 ),
1353 (
1354 "_rw_offset",
1355 Varchar,
1356 ),
1357 (
1358 "_rw_table_name",
1359 Varchar,
1360 ),
1361 (
1362 "_row_id",
1363 Serial,
1364 ),
1365 ]
1366 "#]]
1367 .assert_debug_eq(&columns);
1368 }
1369
1370 #[tokio::test]
1371 async fn test_source_addition_columns() {
1372 let sql =
1374 "CREATE SOURCE s (v1 int) include key as _rw_kafka_key with (connector = 'kafka') format plain encode json".to_owned();
1375 let frontend = LocalFrontend::new(Default::default()).await;
1376 frontend.run_sql(sql).await.unwrap();
1377 let session = frontend.session_ref();
1378 let catalog_reader = session.env().catalog_reader().read_guard();
1379 let (source, _) = catalog_reader
1380 .get_source_by_name(
1381 DEFAULT_DATABASE_NAME,
1382 SchemaPath::Name(DEFAULT_SCHEMA_NAME),
1383 "s",
1384 )
1385 .unwrap();
1386 assert_eq!(source.name, "s");
1387
1388 let columns = source
1389 .columns
1390 .iter()
1391 .map(|col| (col.name(), col.data_type().clone()))
1392 .collect::<Vec<(&str, DataType)>>();
1393
1394 expect_test::expect![[r#"
1395 [
1396 (
1397 "v1",
1398 Int32,
1399 ),
1400 (
1401 "_rw_kafka_key",
1402 Bytea,
1403 ),
1404 (
1405 "_rw_kafka_timestamp",
1406 Timestamptz,
1407 ),
1408 (
1409 "_rw_kafka_partition",
1410 Varchar,
1411 ),
1412 (
1413 "_rw_kafka_offset",
1414 Varchar,
1415 ),
1416 (
1417 "_row_id",
1418 Serial,
1419 ),
1420 ]
1421 "#]]
1422 .assert_debug_eq(&columns);
1423
1424 let sql =
1425 "CREATE SOURCE s3 (v1 int) include timestamp 'header1' as header_col with (connector = 'kafka') format plain encode json".to_owned();
1426 match frontend.run_sql(sql).await {
1427 Err(e) => {
1428 assert_eq!(
1429 e.to_string(),
1430 "Protocol error: Only header column can have inner field, but got \"timestamp\""
1431 )
1432 }
1433 _ => unreachable!(),
1434 }
1435 }
1436}