1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::LazyLock;
18
19use anyhow::{Context, anyhow};
20use either::Either;
21use external_schema::debezium::extract_debezium_avro_table_pk_columns;
22use external_schema::nexmark::check_nexmark_schema;
23use itertools::Itertools;
24use maplit::{convert_args, hashmap, hashset};
25use pgwire::pg_response::{PgResponse, StatementType};
26use rand::Rng;
27use risingwave_common::array::arrow::{IcebergArrowConvert, arrow_schema_iceberg};
28use risingwave_common::bail_not_implemented;
29use risingwave_common::catalog::{
30 ColumnCatalog, ColumnDesc, ColumnId, INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME,
31 ROW_ID_COLUMN_NAME, TableId, debug_assert_column_ids_distinct,
32};
33use risingwave_common::license::Feature;
34use risingwave_common::secret::LocalSecretManager;
35use risingwave_common::system_param::reader::SystemParamsRead;
36use risingwave_common::types::DataType;
37use risingwave_common::util::iter_util::ZipEqFast;
38use risingwave_connector::parser::additional_columns::{
39 build_additional_column_desc, get_supported_additional_columns,
40 source_add_partition_offset_cols,
41};
42use risingwave_connector::parser::{
43 AvroParserConfig, DEBEZIUM_IGNORE_KEY, DebeziumAvroParserConfig, ProtobufParserConfig,
44 SchemaLocation, SpecificParserConfig, TimestamptzHandling,
45 fetch_json_schema_and_map_to_columns,
46};
47use risingwave_connector::schema::AWS_GLUE_SCHEMA_ARN_KEY;
48use risingwave_connector::schema::schema_registry::{
49 SCHEMA_REGISTRY_BACKOFF_DURATION_KEY, SCHEMA_REGISTRY_BACKOFF_FACTOR_KEY,
50 SCHEMA_REGISTRY_MAX_DELAY_KEY, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_RETRIES_MAX_KEY,
51 SCHEMA_REGISTRY_USERNAME, SchemaRegistryConfig, name_strategy_from_str,
52};
53use risingwave_connector::source::cdc::{
54 CDC_MONGODB_STRONG_SCHEMA_KEY, CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL,
55 CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY, CDC_WAIT_FOR_STREAMING_START_TIMEOUT,
56 CITUS_CDC_CONNECTOR, MONGODB_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR,
57 SQL_SERVER_CDC_CONNECTOR,
58};
59use risingwave_connector::source::datagen::DATAGEN_CONNECTOR;
60use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
61use risingwave_connector::source::nexmark::source::{EventType, get_event_data_types_with_names};
62use risingwave_connector::source::test_source::TEST_CONNECTOR;
63pub use risingwave_connector::source::{
64 ADBC_SNOWFLAKE_CONNECTOR, UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR,
65};
66use risingwave_connector::source::{
67 AZBLOB_CONNECTOR, ConnectorProperties, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR,
68 KINESIS_CONNECTOR, LEGACY_S3_CONNECTOR, MQTT_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR,
69 OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR,
70};
71use risingwave_connector::{AUTO_SCHEMA_CHANGE_KEY, WithPropertiesExt};
72use risingwave_pb::catalog::connection_params::PbConnectionType;
73use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc};
74use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
75use risingwave_pb::plan_common::source_refresh_mode::{RefreshMode, SourceRefreshModeStreaming};
76use risingwave_pb::plan_common::{EncodeType, FormatType, SourceRefreshMode};
77use risingwave_pb::stream_plan::PbStreamFragmentGraph;
78use risingwave_pb::telemetry::TelemetryDatabaseObject;
79use risingwave_sqlparser::ast::{
80 AstString, ColumnDef, ColumnOption, CreateSourceStatement, Encode, Format, FormatEncodeOptions,
81 ObjectName, SourceWatermark, SqlOptionValue, TableConstraint, Value, get_delimiter,
82};
83use risingwave_sqlparser::parser::{IncludeOption, IncludeOptionItem};
84use thiserror_ext::AsReport;
85
86use super::RwPgResponse;
87use crate::binder::Binder;
88use crate::catalog::CatalogError;
89use crate::catalog::source_catalog::SourceCatalog;
90use crate::error::ErrorCode::{self, Deprecated, InvalidInputSyntax, NotSupported, ProtocolError};
91use crate::error::{Result, RwError};
92use crate::expr::Expr;
93use crate::handler::HandlerArgs;
94use crate::handler::create_table::{
95 ColumnIdGenerator, bind_pk_and_row_id_on_relation, bind_sql_column_constraints,
96 bind_sql_columns, bind_sql_pk_names, bind_table_constraints,
97};
98use crate::handler::util::{
99 SourceSchemaCompatExt, check_connector_match_connection_type, ensure_connection_type_allowed,
100};
101use crate::optimizer::plan_node::generic::SourceNodeKind;
102use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext};
103use crate::session::SessionImpl;
104use crate::session::current::notice_to_user;
105use crate::utils::{
106 OverwriteOptions, resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option,
107 resolve_secret_ref_in_with_options, resolve_source_refresh_mode_in_with_option,
108};
109use crate::{OptimizerContext, WithOptions, WithOptionsSecResolved, bind_data_type, build_graph};
110
111mod external_schema;
112pub use external_schema::{
113 bind_columns_from_source, get_schema_location, schema_has_schema_registry,
114};
115mod validate;
116pub use validate::validate_compatibility;
117use validate::{SOURCE_ALLOWED_CONNECTION_CONNECTOR, SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY};
118mod additional_column;
119use additional_column::check_and_add_timestamp_column;
120pub use additional_column::handle_addition_columns;
121use risingwave_common::catalog::ICEBERG_SOURCE_PREFIX;
122use risingwave_common::id::SourceId;
123
124use crate::stream_fragmenter::GraphJobType;
125
126fn non_generated_sql_columns(columns: &[ColumnDef]) -> Vec<ColumnDef> {
127 columns
128 .iter()
129 .filter(|c| !c.is_generated())
130 .cloned()
131 .collect()
132}
133
134fn try_consume_string_from_options(
135 format_encode_options: &mut BTreeMap<String, String>,
136 key: &str,
137) -> Option<AstString> {
138 format_encode_options.remove(key).map(AstString)
139}
140
141fn try_consume_schema_registry_config_from_options(
142 format_encode_options: &mut BTreeMap<String, String>,
143) {
144 [
145 SCHEMA_REGISTRY_USERNAME,
146 SCHEMA_REGISTRY_PASSWORD,
147 SCHEMA_REGISTRY_MAX_DELAY_KEY,
148 SCHEMA_REGISTRY_BACKOFF_DURATION_KEY,
149 SCHEMA_REGISTRY_BACKOFF_FACTOR_KEY,
150 SCHEMA_REGISTRY_RETRIES_MAX_KEY,
151 ]
152 .iter()
153 .for_each(|key| {
154 try_consume_string_from_options(format_encode_options, key);
155 });
156}
157
158fn consume_string_from_options(
159 format_encode_options: &mut BTreeMap<String, String>,
160 key: &str,
161) -> Result<AstString> {
162 try_consume_string_from_options(format_encode_options, key).ok_or(RwError::from(ProtocolError(
163 format!("missing field {} in options", key),
164 )))
165}
166
167fn consume_aws_config_from_options(format_encode_options: &mut BTreeMap<String, String>) {
168 format_encode_options.retain(|key, _| !key.starts_with("aws."))
169}
170
171#[derive(Debug, Clone, Copy, PartialEq, Eq)]
172pub enum CreateSourceType {
173 SharedCdc,
174 SharedNonCdc,
176 NonShared,
177 Table,
179}
180
181impl CreateSourceType {
182 pub fn for_newly_created(
184 session: &SessionImpl,
185 with_properties: &impl WithPropertiesExt,
186 ) -> Self {
187 if with_properties.is_shareable_cdc_connector() {
188 CreateSourceType::SharedCdc
189 } else if with_properties.is_shareable_non_cdc_connector()
190 && session
191 .env()
192 .streaming_config()
193 .developer
194 .enable_shared_source
195 && session.config().streaming_use_shared_source()
196 {
197 CreateSourceType::SharedNonCdc
198 } else {
199 CreateSourceType::NonShared
200 }
201 }
202
203 pub fn for_replace(catalog: &SourceCatalog) -> Self {
204 if !catalog.info.is_shared() {
205 CreateSourceType::NonShared
206 } else if catalog.with_properties.is_shareable_cdc_connector() {
207 CreateSourceType::SharedCdc
208 } else {
209 CreateSourceType::SharedNonCdc
210 }
211 }
212
213 pub fn is_shared(&self) -> bool {
214 matches!(
215 self,
216 CreateSourceType::SharedCdc | CreateSourceType::SharedNonCdc
217 )
218 }
219}
220
221pub(crate) fn bind_all_columns(
223 format_encode: &FormatEncodeOptions,
224 cols_from_source: Option<Vec<ColumnCatalog>>,
225 cols_from_sql: Vec<ColumnCatalog>,
226 col_defs_from_sql: &[ColumnDef],
227 wildcard_idx: Option<usize>,
228 sql_column_strategy: SqlColumnStrategy,
229) -> Result<Vec<ColumnCatalog>> {
230 if let Some(cols_from_source) = cols_from_source {
231 let generated_cols_from_sql = cols_from_sql
234 .iter()
235 .filter(|c| {
236 col_defs_from_sql
237 .iter()
238 .find(|d| d.name.real_value() == c.name())
239 .unwrap()
240 .is_generated()
241 })
242 .cloned()
243 .collect_vec();
244
245 #[allow(clippy::collapsible_else_if)]
246 match sql_column_strategy {
247 SqlColumnStrategy::FollowUnchecked => {
249 assert!(
250 wildcard_idx.is_none(),
251 "wildcard still exists while strategy is Follows, not correctly purified?"
252 );
253 return Ok(cols_from_sql);
254 }
255
256 SqlColumnStrategy::Ignore => {}
258
259 SqlColumnStrategy::FollowChecked => {
260 let has_regular_cols_from_sql =
261 generated_cols_from_sql.len() != cols_from_sql.len();
262
263 if has_regular_cols_from_sql {
264 if wildcard_idx.is_some() {
265 return Err(RwError::from(NotSupported(
267 "When there's a wildcard (\"*\"), \
268 only generated columns are allowed in user-defined schema from SQL"
269 .to_owned(),
270 "Remove the non-generated columns".to_owned(),
271 )));
272 } else {
273 for col in &cols_from_sql {
276 if generated_cols_from_sql.contains(col) {
277 continue;
278 }
279 let Some(col_from_source) =
280 cols_from_source.iter().find(|c| c.name() == col.name())
281 else {
282 return Err(RwError::from(ProtocolError(format!(
283 "Column \"{}\" is defined in SQL but not found in the source",
284 col.name()
285 ))));
286 };
287
288 if col_from_source.data_type() != col.data_type() {
289 return Err(RwError::from(ProtocolError(format!(
290 "Data type mismatch for column \"{}\". \
291 Defined in SQL as \"{}\", but found in the source as \"{}\"",
292 col.name(),
293 col.data_type(),
294 col_from_source.data_type()
295 ))));
296 }
297 }
298 return Ok(cols_from_sql);
299 }
300 } else {
301 if wildcard_idx.is_some() {
302 } else {
306 notice_to_user("\
314 Neither wildcard (\"*\") nor regular (non-generated) columns appear in the user-defined schema from SQL. \
315 For backward compatibility, all columns from the source will be included at the beginning. \
316 For clarity, consider adding a wildcard (\"*\") to indicate where the columns from the source should be included, \
317 or specifying the columns you want to include from the source.
318 ");
319 }
320 }
321 }
322 }
323
324 let wildcard_idx = wildcard_idx.unwrap_or(0).min(generated_cols_from_sql.len());
329
330 let mut merged_cols = generated_cols_from_sql;
332 let merged_cols_r = merged_cols.split_off(wildcard_idx);
333 merged_cols.extend(cols_from_source);
334 merged_cols.extend(merged_cols_r);
335
336 Ok(merged_cols)
337 } else {
338 if wildcard_idx.is_some() {
339 return Err(RwError::from(NotSupported(
340 "Wildcard in user-defined schema is only allowed when there exists columns from external schema".to_owned(),
341 "Remove the wildcard or use a source with external schema".to_owned(),
342 )));
343 }
344 let non_generated_sql_defined_columns = non_generated_sql_columns(col_defs_from_sql);
345
346 match (&format_encode.format, &format_encode.row_encode) {
347 (Format::DebeziumMongo, Encode::Json) => {
348 let strong_schema = format_encode
349 .row_options
350 .iter()
351 .find(|k| k.name.real_value().to_lowercase() == CDC_MONGODB_STRONG_SCHEMA_KEY)
352 .map(|k| matches!(k.value, SqlOptionValue::Value(Value::Boolean(true))))
353 .unwrap_or(false);
354
355 if strong_schema {
357 let (_, id_column) = non_generated_sql_defined_columns
358 .iter()
359 .enumerate()
360 .find(|(idx, col)| *idx == 0 && col.name.real_value() == "_id")
361 .ok_or_else(|| {
362 RwError::from(ProtocolError(
363 "The `_id` column of the source with row format DebeziumMongoJson must be defined as the first column in SQL".to_owned(),
364 ))
365 })?;
366
367 let id_data_type = bind_data_type(id_column.data_type.as_ref().unwrap())?;
368 if !matches!(
369 id_data_type,
370 DataType::Varchar | DataType::Int32 | DataType::Int64 | DataType::Jsonb
371 ) {
372 return Err(RwError::from(ProtocolError(
373 "the `_id` column of the source with row format DebeziumMongoJson must be [Jsonb | Varchar | Int32 | Int64]".to_owned(),
374 )));
375 }
376
377 let mut columns = Vec::with_capacity(non_generated_sql_defined_columns.len());
378 columns.push(
379 ColumnCatalog {
381 column_desc: ColumnDesc::named("_id", 0.into(), id_data_type),
382 is_hidden: false,
383 },
384 );
385
386 for (idx, col) in non_generated_sql_defined_columns
388 .into_iter()
389 .skip(1)
391 .enumerate()
392 {
393 columns.push(ColumnCatalog {
394 column_desc: ColumnDesc::named(
395 col.name.real_value(),
396 (idx as i32).into(),
397 bind_data_type(col.data_type.as_ref().unwrap())?,
398 ),
399 is_hidden: false,
400 });
401 }
402
403 return Ok(columns);
404 }
405
406 let mut columns = vec![
407 ColumnCatalog {
408 column_desc: ColumnDesc::named("_id", 0.into(), DataType::Varchar),
409 is_hidden: false,
410 },
411 ColumnCatalog {
412 column_desc: ColumnDesc::named("payload", 0.into(), DataType::Jsonb),
413 is_hidden: false,
414 },
415 ];
416
417 if non_generated_sql_defined_columns.len() != 2
418 || non_generated_sql_defined_columns[0].name.real_value() != columns[0].name()
419 || non_generated_sql_defined_columns[1].name.real_value() != columns[1].name()
420 {
421 return Err(RwError::from(ProtocolError(
422 "the not generated columns of the source with row format DebeziumMongoJson
423 must be (_id [Jsonb | Varchar | Int32 | Int64], payload jsonb)."
424 .to_owned(),
425 )));
426 }
427 let key_data_type = bind_data_type(
429 non_generated_sql_defined_columns[0]
430 .data_type
431 .as_ref()
432 .unwrap(),
433 )?;
434 match key_data_type {
435 DataType::Jsonb | DataType::Varchar | DataType::Int32 | DataType::Int64 => {
436 columns[0].column_desc.data_type = key_data_type;
437 }
438 _ => {
439 return Err(RwError::from(ProtocolError(
440 "the `_id` column of the source with row format DebeziumMongoJson
441 must be [Jsonb | Varchar | Int32 | Int64]"
442 .to_owned(),
443 )));
444 }
445 }
446
447 let value_data_type = bind_data_type(
449 non_generated_sql_defined_columns[1]
450 .data_type
451 .as_ref()
452 .unwrap(),
453 )?;
454 if !matches!(value_data_type, DataType::Jsonb) {
455 return Err(RwError::from(ProtocolError(
456 "the `payload` column of the source with row format DebeziumMongoJson
457 must be Jsonb datatype"
458 .to_owned(),
459 )));
460 }
461 Ok(columns)
462 }
463 (Format::Plain, Encode::Bytes) => {
464 let err = Err(RwError::from(ProtocolError(
465 "ENCODE BYTES only accepts one BYTEA type column".to_owned(),
466 )));
467 if non_generated_sql_defined_columns.len() == 1 {
468 let col_data_type = bind_data_type(
470 non_generated_sql_defined_columns[0]
471 .data_type
472 .as_ref()
473 .unwrap(),
474 )?;
475 if col_data_type == DataType::Bytea {
476 Ok(cols_from_sql)
477 } else {
478 err
479 }
480 } else {
481 err
482 }
483 }
484 (_, _) => Ok(cols_from_sql),
485 }
486 }
487}
488
489fn hint_format_encode(format_encode: &FormatEncodeOptions) -> String {
491 format!(
492 r#"Hint: For FORMAT {0} ENCODE {1}, INCLUDE KEY must be specified and the key column must be used as primary key.
493example:
494 CREATE TABLE <table_name> ( PRIMARY KEY ([rw_key | <key_name>]) )
495 INCLUDE KEY [AS <key_name>]
496 WITH (...)
497 FORMAT {0} ENCODE {1}{2}
498"#,
499 format_encode.format,
500 format_encode.row_encode,
501 if format_encode.row_encode == Encode::Json || format_encode.row_encode == Encode::Bytes {
502 "".to_owned()
503 } else {
504 " (...)".to_owned()
505 }
506 )
507}
508
509pub(crate) async fn bind_source_pk(
512 format_encode: &FormatEncodeOptions,
513 source_info: &StreamSourceInfo,
514 columns: &mut [ColumnCatalog],
515 sql_defined_pk_names: Vec<String>,
516 with_properties: &WithOptionsSecResolved,
517) -> Result<Vec<String>> {
518 let sql_defined_pk = !sql_defined_pk_names.is_empty();
519 let include_key_column_name: Option<String> = {
520 columns.iter().find_map(|catalog| {
523 if matches!(
524 catalog.column_desc.additional_column.column_type,
525 Some(AdditionalColumnType::Key(_))
526 ) {
527 Some(catalog.name().to_owned())
528 } else {
529 None
530 }
531 })
532 };
533 let additional_column_names = columns
534 .iter()
535 .filter_map(|col| {
536 if col.column_desc.additional_column.column_type.is_some() {
537 Some(col.name().to_owned())
538 } else {
539 None
540 }
541 })
542 .collect_vec();
543
544 let res = match (&format_encode.format, &format_encode.row_encode) {
545 (Format::Native, Encode::Native) | (Format::None, Encode::None) | (Format::Plain, _) => {
546 sql_defined_pk_names
547 }
548
549 (Format::Upsert, Encode::Json | Encode::Avro | Encode::Protobuf) => {
552 if let Some(ref key_column_name) = include_key_column_name
553 && sql_defined_pk
554 {
555 if sql_defined_pk_names.len() != 1
560 || !key_column_name.eq(sql_defined_pk_names[0].as_str())
561 {
562 return Err(RwError::from(ProtocolError(format!(
563 "Only \"{}\" can be used as primary key\n\n{}",
564 key_column_name,
565 hint_format_encode(format_encode)
566 ))));
567 }
568 sql_defined_pk_names
569 } else {
570 return if let Some(include_key_column_name) = include_key_column_name {
572 Err(RwError::from(ProtocolError(format!(
573 "Primary key must be specified to {}\n\n{}",
574 include_key_column_name,
575 hint_format_encode(format_encode)
576 ))))
577 } else {
578 Err(RwError::from(ProtocolError(format!(
579 "INCLUDE KEY clause not set\n\n{}",
580 hint_format_encode(format_encode)
581 ))))
582 };
583 }
584 }
585
586 (Format::Debezium, Encode::Json) => {
587 if !additional_column_names.is_empty() {
588 return Err(RwError::from(ProtocolError(format!(
589 "FORMAT DEBEZIUM forbids additional columns, but got {:?}",
590 additional_column_names
591 ))));
592 }
593 if !sql_defined_pk {
594 return Err(RwError::from(ProtocolError(
595 "Primary key must be specified when creating source with FORMAT DEBEZIUM."
596 .to_owned(),
597 )));
598 }
599 sql_defined_pk_names
600 }
601 (Format::Debezium, Encode::Avro) => {
602 if !additional_column_names.is_empty() {
603 return Err(RwError::from(ProtocolError(format!(
604 "FORMAT DEBEZIUM forbids additional columns, but got {:?}",
605 additional_column_names
606 ))));
607 }
608 if sql_defined_pk {
609 sql_defined_pk_names
610 } else {
611 let pk_names =
612 extract_debezium_avro_table_pk_columns(source_info, with_properties).await?;
613 for pk_name in &pk_names {
615 columns
616 .iter()
617 .find(|c: &&ColumnCatalog| c.name().eq(pk_name))
618 .ok_or_else(|| {
619 RwError::from(ProtocolError(format!(
620 "avro's key column {} not exists in avro's row schema",
621 pk_name
622 )))
623 })?;
624 }
625 pk_names
626 }
627 }
628 (Format::DebeziumMongo, Encode::Json) => {
629 if sql_defined_pk {
630 sql_defined_pk_names
631 } else {
632 vec!["_id".to_owned()]
633 }
634 }
635
636 (Format::Maxwell, Encode::Json) => {
637 if !additional_column_names.is_empty() {
638 return Err(RwError::from(ProtocolError(format!(
639 "FORMAT MAXWELL forbids additional columns, but got {:?}",
640 additional_column_names
641 ))));
642 }
643 if !sql_defined_pk {
644 return Err(RwError::from(ProtocolError(
645 "Primary key must be specified when creating source with FORMAT MAXWELL ENCODE JSON.".to_owned(),
646 )));
647 }
648 sql_defined_pk_names
649 }
650
651 (Format::Canal, Encode::Json) => {
652 if !additional_column_names.is_empty() {
653 return Err(RwError::from(ProtocolError(format!(
654 "FORMAT CANAL forbids additional columns, but got {:?}",
655 additional_column_names
656 ))));
657 }
658 if !sql_defined_pk {
659 return Err(RwError::from(ProtocolError(
660 "Primary key must be specified when creating source with FORMAT CANAL ENCODE JSON.".to_owned(),
661 )));
662 }
663 sql_defined_pk_names
664 }
665 (format, encoding) => {
666 return Err(RwError::from(ProtocolError(format!(
667 "Unknown combination {:?} {:?}",
668 format, encoding
669 ))));
670 }
671 };
672 Ok(res)
673}
674
675pub(super) fn bind_source_watermark(
676 session: &SessionImpl,
677 name: String,
678 source_watermarks: Vec<SourceWatermark>,
679 column_catalogs: &[ColumnCatalog],
680) -> Result<Vec<WatermarkDesc>> {
681 let mut binder = Binder::new_for_ddl(session);
682 binder.bind_columns_to_context(name.clone(), column_catalogs)?;
683
684 let watermark_descs = source_watermarks
685 .into_iter()
686 .map(|source_watermark| {
687 let col_name = source_watermark.column.real_value();
688 let watermark_idx = binder.get_column_binding_index(name.clone(), &col_name)?;
689
690 let expr = binder.bind_expr(&source_watermark.expr)?;
691 let watermark_col_type = column_catalogs[watermark_idx].data_type();
692 let watermark_expr_type = &expr.return_type();
693 if watermark_col_type != watermark_expr_type {
694 Err(RwError::from(ErrorCode::BindError(
695 format!("The return value type of the watermark expression must be identical to the watermark column data type. Current data type of watermark return value: `{}`, column `{}`",watermark_expr_type, watermark_col_type),
696 )))
697 } else {
698 let expr_proto = expr.to_expr_proto();
699 Ok::<_, RwError>(WatermarkDesc {
700 watermark_idx: watermark_idx as u32,
701 expr: Some(expr_proto),
702 with_ttl: source_watermark.with_ttl,
703 })
704 }
705 })
706 .try_collect()?;
707 Ok(watermark_descs)
708}
709
710pub(super) fn check_format_encode(
716 props: &WithOptionsSecResolved,
717 row_id_index: Option<usize>,
718 columns: &[ColumnCatalog],
719) -> Result<()> {
720 let Some(connector) = props.get_connector() else {
721 return Ok(());
722 };
723
724 if connector == NEXMARK_CONNECTOR {
725 check_nexmark_schema(props, row_id_index, columns)
726 } else {
727 Ok(())
728 }
729}
730
731pub fn bind_connector_props(
732 handler_args: &HandlerArgs,
733 format_encode: &FormatEncodeOptions,
734 is_create_source: bool,
735) -> Result<(WithOptions, SourceRefreshMode)> {
736 let mut with_properties = handler_args.with_options.clone().into_connector_props();
737 validate_compatibility(format_encode, &mut with_properties)?;
738 let refresh_mode = {
739 let refresh_mode = resolve_source_refresh_mode_in_with_option(&mut with_properties)?;
740 if is_create_source && refresh_mode.is_some() {
741 return Err(RwError::from(ProtocolError(
742 "`refresh_mode` only supported for CREATE TABLE".to_owned(),
743 )));
744 }
745
746 refresh_mode.unwrap_or(SourceRefreshMode {
747 refresh_mode: Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})),
748 })
749 };
750
751 let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
752
753 if !is_create_source && with_properties.is_shareable_only_cdc_connector() {
754 return Err(RwError::from(ProtocolError(format!(
755 "connector {} does not support `CREATE TABLE`, please use `CREATE SOURCE` instead",
756 with_properties.get_connector().unwrap(),
757 ))));
758 }
759 if is_create_source && create_cdc_source_job {
760 if let Some(value) = with_properties.get(AUTO_SCHEMA_CHANGE_KEY)
761 && value.parse::<bool>().map_err(|_| {
762 ErrorCode::InvalidInputSyntax(format!(
763 "invalid value of '{}' option",
764 AUTO_SCHEMA_CHANGE_KEY
765 ))
766 })?
767 {
768 Feature::CdcAutoSchemaChange.check_available()?;
769 }
770
771 with_properties.insert(CDC_SNAPSHOT_MODE_KEY.into(), CDC_SNAPSHOT_BACKFILL.into());
773 with_properties.insert(CDC_SHARING_MODE_KEY.into(), "true".into());
775 if with_properties.enable_transaction_metadata() {
777 with_properties.insert(CDC_TRANSACTIONAL_KEY.into(), "true".into());
778 }
779 if !with_properties.contains_key(CDC_WAIT_FOR_STREAMING_START_TIMEOUT) {
781 with_properties.insert(
782 CDC_WAIT_FOR_STREAMING_START_TIMEOUT.into(),
783 handler_args
784 .session
785 .config()
786 .cdc_source_wait_streaming_start_timeout()
787 .to_string(),
788 );
789 }
790 }
791 if with_properties.is_mysql_cdc_connector() {
792 with_properties
796 .entry("server.id".to_owned())
797 .or_insert(rand::rng().random_range(1..u32::MAX).to_string());
798 }
799 Ok((with_properties, refresh_mode))
800}
801
802pub enum SqlColumnStrategy {
805 FollowUnchecked,
810
811 FollowChecked,
818
819 Ignore,
824}
825
826#[allow(clippy::too_many_arguments)]
829pub async fn bind_create_source_or_table_with_connector(
830 handler_args: HandlerArgs,
831 full_name: ObjectName,
832 format_encode: FormatEncodeOptions,
833 with_properties: WithOptions,
834 sql_columns_defs: &[ColumnDef],
835 constraints: Vec<TableConstraint>,
836 wildcard_idx: Option<usize>,
837 source_watermarks: Vec<SourceWatermark>,
838 columns_from_resolve_source: Option<Vec<ColumnCatalog>>,
839 source_info: StreamSourceInfo,
840 include_column_options: IncludeOption,
841 col_id_gen: &mut ColumnIdGenerator,
842 create_source_type: CreateSourceType,
843 source_rate_limit: Option<u32>,
844 sql_column_strategy: SqlColumnStrategy,
845 refresh_mode: SourceRefreshMode,
846) -> Result<SourceCatalog> {
847 let session = &handler_args.session;
848 let db_name: &str = &session.database();
849 let (schema_name, source_name) = Binder::resolve_schema_qualified_name(db_name, &full_name)?;
850 let (database_id, schema_id) =
851 session.get_database_and_schema_id_for_create(schema_name.clone())?;
852
853 let is_create_source = create_source_type != CreateSourceType::Table;
854
855 if is_create_source {
856 if with_properties.is_batch_connector() {
858 return Err(ErrorCode::BindError(
859 "can't CREATE SOURCE with refreshable batch connector\n\nHint: use CREATE TABLE instead"
860 .to_owned(),
861 )
862 .into());
863 }
864
865 match format_encode.format {
866 Format::Debezium | Format::DebeziumMongo | Format::Maxwell | Format::Canal => {
868 return Err(ErrorCode::BindError(format!(
869 "can't CREATE SOURCE with FORMAT {}.\n\nHint: use CREATE TABLE instead\n\n{}",
870 format_encode.format,
871 hint_format_encode(&format_encode)
872 ))
873 .into());
874 }
875 Format::Upsert => {
877 notice_to_user(format!(
878 "Streaming queries on sources with `FORMAT {}` may have limitations. If your query isn't supported, consider using `CREATE TABLE` instead.",
879 format_encode.format
880 ));
881 }
882 _ => {}
883 }
884 }
885
886 let sql_pk_names = bind_sql_pk_names(sql_columns_defs, bind_table_constraints(&constraints)?)?;
887
888 if with_properties.is_iceberg_connector() && !sql_columns_defs.is_empty() {
892 return Err(RwError::from(InvalidInputSyntax(
893 r#"Schema is automatically inferred for iceberg source and should not be specified
894
895HINT: use `CREATE SOURCE <name> WITH (...)` instead of `CREATE SOURCE <name> (<columns>) WITH (...)`."#.to_owned(),
896 )));
897 }
898
899 if with_properties.is_batch_connector()
901 && with_properties
902 .get(UPSTREAM_SOURCE_KEY)
903 .is_some_and(|s| s.eq_ignore_ascii_case(ADBC_SNOWFLAKE_CONNECTOR))
904 && !sql_columns_defs.is_empty()
905 {
906 return Err(RwError::from(InvalidInputSyntax(
907 r#"Schema is automatically inferred for ADBC Snowflake source and should not be specified
908
909HINT: use `CREATE TABLE <name> WITH (...)` instead of `CREATE TABLE <name> (<columns>) WITH (...)`."#.to_owned(),
910 )));
911 }
912 let columns_from_sql = bind_sql_columns(sql_columns_defs, false)?;
913
914 let mut columns = bind_all_columns(
915 &format_encode,
916 columns_from_resolve_source,
917 columns_from_sql,
918 sql_columns_defs,
919 wildcard_idx,
920 sql_column_strategy,
921 )?;
922
923 handle_addition_columns(
925 Some(&format_encode),
926 &with_properties,
927 include_column_options,
928 &mut columns,
929 false,
930 )?;
931
932 if columns.is_empty() {
933 return Err(RwError::from(ProtocolError(
934 "Schema definition is required, either from SQL or schema registry.".to_owned(),
935 )));
936 }
937
938 if is_create_source {
940 check_and_add_timestamp_column(&with_properties, &mut columns);
942
943 if create_source_type == CreateSourceType::SharedNonCdc {
946 let (columns_exist, additional_columns) = source_add_partition_offset_cols(
947 &columns,
948 &with_properties.get_connector().unwrap(),
949 true, );
951 for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) {
952 if !existed {
953 columns.push(ColumnCatalog::hidden(c));
954 }
955 }
956 }
957 }
958
959 let mut with_properties = with_properties;
961 resolve_privatelink_in_with_option(&mut with_properties)?;
962
963 if session
965 .env()
966 .system_params_manager()
967 .get_params()
968 .load()
969 .enforce_secret()
970 && Feature::SecretManagement.check_available().is_ok()
971 {
972 ConnectorProperties::enforce_secret_source(&with_properties)?;
974 }
975
976 let (with_properties, connection_type, connector_conn_ref) =
977 resolve_connection_ref_and_secret_ref(
978 with_properties,
979 session,
980 Some(TelemetryDatabaseObject::Source),
981 )?;
982 ensure_connection_type_allowed(connection_type, &SOURCE_ALLOWED_CONNECTION_CONNECTOR)?;
983
984 if !matches!(connection_type, PbConnectionType::Unspecified) {
986 let Some(connector) = with_properties.get_connector() else {
987 return Err(RwError::from(ProtocolError(format!(
988 "missing field '{}' in WITH clause",
989 UPSTREAM_SOURCE_KEY
990 ))));
991 };
992 check_connector_match_connection_type(connector.as_str(), &connection_type)?;
993 }
994
995 let pk_names = bind_source_pk(
996 &format_encode,
997 &source_info,
998 &mut columns,
999 sql_pk_names,
1000 &with_properties,
1001 )
1002 .await?;
1003
1004 if let Some(duplicated_name) = columns.iter().map(|c| c.name()).duplicates().next() {
1007 return Err(ErrorCode::InvalidInputSyntax(format!(
1008 "column \"{}\" specified more than once",
1009 duplicated_name
1010 ))
1011 .into());
1012 }
1013
1014 for c in &mut columns {
1016 let original_data_type = c.data_type().clone();
1017 col_id_gen.generate(c)?;
1018 if is_create_source {
1022 c.column_desc.data_type = original_data_type;
1023 }
1024 }
1025 debug_assert_column_ids_distinct(&columns);
1026
1027 let (mut columns, pk_col_ids, row_id_index) =
1028 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
1029
1030 let watermark_descs =
1031 bind_source_watermark(session, source_name.clone(), source_watermarks, &columns)?;
1032 assert!(watermark_descs.len() <= 1);
1034 if is_create_source && watermark_descs.iter().any(|d| d.with_ttl) {
1035 return Err(ErrorCode::NotSupported(
1036 "WITH TTL is not supported in WATERMARK clause for CREATE SOURCE.".to_owned(),
1037 "Use `CREATE TABLE ... WATERMARK ... WITH TTL` instead.".to_owned(),
1038 )
1039 .into());
1040 }
1041
1042 let append_only = row_id_index.is_some();
1043 if is_create_source && !append_only && !watermark_descs.is_empty() {
1044 return Err(ErrorCode::NotSupported(
1045 "Defining watermarks on source requires the source connector to be append only."
1046 .to_owned(),
1047 "Use the key words `FORMAT PLAIN`".to_owned(),
1048 )
1049 .into());
1050 }
1051
1052 bind_sql_column_constraints(
1053 session,
1054 source_name.clone(),
1055 &mut columns,
1056 sql_columns_defs,
1058 &pk_col_ids,
1059 )?;
1060 check_format_encode(&with_properties, row_id_index, &columns)?;
1061
1062 let definition = handler_args.normalized_sql.clone();
1063
1064 let associated_table_id = if is_create_source {
1065 None
1066 } else {
1067 Some(TableId::placeholder())
1068 };
1069 let source = SourceCatalog {
1070 id: SourceId::placeholder(),
1071 name: source_name,
1072 schema_id,
1073 database_id,
1074 columns,
1075 pk_col_ids,
1076 append_only,
1077 owner: session.user_id(),
1078 info: source_info,
1079 row_id_index,
1080 with_properties,
1081 watermark_descs,
1082 associated_table_id,
1083 definition,
1084 connection_id: connector_conn_ref,
1085 created_at_epoch: None,
1086 initialized_at_epoch: None,
1087 version: INITIAL_SOURCE_VERSION_ID,
1088 created_at_cluster_version: None,
1089 initialized_at_cluster_version: None,
1090 rate_limit: source_rate_limit,
1091 refresh_mode: Some(refresh_mode),
1092 };
1093 Ok(source)
1094}
1095
1096pub async fn handle_create_source(
1097 mut handler_args: HandlerArgs,
1098 stmt: CreateSourceStatement,
1099) -> Result<RwPgResponse> {
1100 let session = handler_args.session.clone();
1101 let overwrite_options = OverwriteOptions::new(&mut handler_args);
1102
1103 if let Either::Right(resp) = session.check_relation_name_duplicated(
1104 stmt.source_name.clone(),
1105 StatementType::CREATE_SOURCE,
1106 stmt.if_not_exists,
1107 )? {
1108 return Ok(resp);
1109 }
1110
1111 if stmt
1112 .source_name
1113 .base_name()
1114 .starts_with(ICEBERG_SOURCE_PREFIX)
1115 {
1116 return Err(RwError::from(InvalidInputSyntax(format!(
1117 "Source name cannot start with reserved prefix '{}'",
1118 ICEBERG_SOURCE_PREFIX
1119 ))));
1120 }
1121
1122 if handler_args.with_options.is_empty() {
1123 return Err(RwError::from(InvalidInputSyntax(
1124 "missing WITH clause".to_owned(),
1125 )));
1126 }
1127
1128 let format_encode = stmt.format_encode.into_v2_with_warning();
1129 let (with_properties, refresh_mode) =
1130 bind_connector_props(&handler_args, &format_encode, true)?;
1131
1132 let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
1133 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
1134 &session,
1135 &format_encode,
1136 Either::Left(&with_properties),
1137 create_source_type,
1138 )
1139 .await?;
1140 let mut col_id_gen = ColumnIdGenerator::new_initial();
1141
1142 if stmt.columns.iter().any(|col| {
1143 col.options
1144 .iter()
1145 .any(|def| matches!(def.option, ColumnOption::NotNull))
1146 }) {
1147 return Err(RwError::from(InvalidInputSyntax(
1148 "NOT NULL constraint is not supported in source schema".to_owned(),
1149 )));
1150 }
1151
1152 let source_catalog = bind_create_source_or_table_with_connector(
1153 handler_args.clone(),
1154 stmt.source_name,
1155 format_encode,
1156 with_properties,
1157 &stmt.columns,
1158 stmt.constraints,
1159 stmt.wildcard_idx,
1160 stmt.source_watermarks,
1161 columns_from_resolve_source,
1162 source_info,
1163 stmt.include_column_options,
1164 &mut col_id_gen,
1165 create_source_type,
1166 overwrite_options.source_rate_limit,
1167 SqlColumnStrategy::FollowChecked,
1168 refresh_mode,
1169 )
1170 .await?;
1171
1172 if stmt.temporary {
1174 if session.get_temporary_source(&source_catalog.name).is_some() {
1175 return Err(CatalogError::duplicated("source", source_catalog.name.clone()).into());
1176 }
1177 session.create_temporary_source(source_catalog);
1178 return Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE));
1179 }
1180
1181 let source = source_catalog.to_prost();
1182
1183 let catalog_writer = session.catalog_writer()?;
1184
1185 if create_source_type.is_shared() {
1186 let graph = generate_stream_graph_for_source(handler_args, source_catalog)?;
1187 catalog_writer
1188 .create_source(source, Some(graph), stmt.if_not_exists)
1189 .await?;
1190 } else {
1191 catalog_writer
1193 .create_source(source, None, stmt.if_not_exists)
1194 .await?;
1195 }
1196
1197 Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE))
1198}
1199
1200pub(super) fn generate_stream_graph_for_source(
1201 handler_args: HandlerArgs,
1202 source_catalog: SourceCatalog,
1203) -> Result<PbStreamFragmentGraph> {
1204 let context = OptimizerContext::from_handler_args(handler_args);
1205 let source_node = LogicalSource::with_catalog(
1206 Rc::new(source_catalog),
1207 SourceNodeKind::CreateSharedSource,
1208 context.into(),
1209 None,
1210 )?;
1211
1212 let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?;
1213 let graph = build_graph(stream_plan, Some(GraphJobType::Source))?;
1214 Ok(graph)
1215}
1216
1217#[cfg(test)]
1218pub mod tests {
1219 use std::collections::HashMap;
1220 use std::sync::Arc;
1221
1222 use risingwave_common::catalog::{
1223 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME,
1224 };
1225 use risingwave_common::types::{DataType, StructType};
1226
1227 use crate::catalog::root_catalog::SchemaPath;
1228 use crate::catalog::source_catalog::SourceCatalog;
1229 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
1230
1231 const GET_COLUMN_FROM_CATALOG: fn(&Arc<SourceCatalog>) -> HashMap<&str, DataType> =
1232 |catalog: &Arc<SourceCatalog>| -> HashMap<&str, DataType> {
1233 catalog
1234 .columns
1235 .iter()
1236 .map(|col| (col.name(), col.data_type().clone()))
1237 .collect::<HashMap<&str, DataType>>()
1238 };
1239
1240 #[tokio::test]
1241 async fn test_create_source_handler() {
1242 let proto_file = create_proto_file(PROTO_FILE_DATA);
1243 let sql = format!(
1244 r#"CREATE SOURCE t
1245 WITH (connector = 'kinesis')
1246 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
1247 proto_file.path().to_str().unwrap()
1248 );
1249 let frontend = LocalFrontend::new(Default::default()).await;
1250 frontend.run_sql(sql).await.unwrap();
1251
1252 let session = frontend.session_ref();
1253 let catalog_reader = session.env().catalog_reader().read_guard();
1254 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1255
1256 let (source, _) = catalog_reader
1258 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
1259 .unwrap();
1260 assert_eq!(source.name, "t");
1261
1262 let columns = GET_COLUMN_FROM_CATALOG(source);
1263
1264 let city_type = StructType::new(vec![
1265 ("address", DataType::Varchar),
1266 ("zipcode", DataType::Varchar),
1267 ])
1268 .into();
1270 let expected_columns = maplit::hashmap! {
1271 ROW_ID_COLUMN_NAME => DataType::Serial,
1272 "id" => DataType::Int32,
1273 "zipcode" => DataType::Int64,
1274 "rate" => DataType::Float32,
1275 "country" => StructType::new(
1276 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
1277 )
1278 .into(),
1280 };
1281 assert_eq!(columns, expected_columns, "{columns:#?}");
1282 }
1283
1284 #[tokio::test]
1285 async fn test_duplicate_props_options() {
1286 let proto_file = create_proto_file(PROTO_FILE_DATA);
1287 let sql = format!(
1288 r#"CREATE SOURCE t
1289 WITH (
1290 connector = 'kinesis',
1291 aws.region='user_test_topic',
1292 endpoint='172.10.1.1:9090,172.10.1.2:9090',
1293 aws.credentials.access_key_id = 'your_access_key_1',
1294 aws.credentials.secret_access_key = 'your_secret_key_1'
1295 )
1296 FORMAT PLAIN ENCODE PROTOBUF (
1297 message = '.test.TestRecord',
1298 aws.credentials.access_key_id = 'your_access_key_2',
1299 aws.credentials.secret_access_key = 'your_secret_key_2',
1300 schema.location = 'file://{}',
1301 )"#,
1302 proto_file.path().to_str().unwrap()
1303 );
1304 let frontend = LocalFrontend::new(Default::default()).await;
1305 frontend.run_sql(sql).await.unwrap();
1306
1307 let session = frontend.session_ref();
1308 let catalog_reader = session.env().catalog_reader().read_guard();
1309 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1310
1311 let (source, _) = catalog_reader
1313 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
1314 .unwrap();
1315 assert_eq!(source.name, "t");
1316
1317 assert_eq!(
1319 source
1320 .info
1321 .format_encode_options
1322 .get("aws.credentials.access_key_id")
1323 .unwrap(),
1324 "your_access_key_2"
1325 );
1326 assert_eq!(
1327 source
1328 .info
1329 .format_encode_options
1330 .get("aws.credentials.secret_access_key")
1331 .unwrap(),
1332 "your_secret_key_2"
1333 );
1334
1335 assert_eq!(
1337 source
1338 .with_properties
1339 .get("aws.credentials.access_key_id")
1340 .unwrap(),
1341 "your_access_key_1"
1342 );
1343 assert_eq!(
1344 source
1345 .with_properties
1346 .get("aws.credentials.secret_access_key")
1347 .unwrap(),
1348 "your_secret_key_1"
1349 );
1350
1351 assert!(!source.with_properties.contains_key("schema.location"));
1353 }
1354
1355 #[tokio::test]
1356 async fn test_multi_table_cdc_create_source_handler() {
1357 let sql =
1358 "CREATE SOURCE t2 WITH (connector = 'mysql-cdc') FORMAT PLAIN ENCODE JSON".to_owned();
1359 let frontend = LocalFrontend::new(Default::default()).await;
1360 let session = frontend.session_ref();
1361
1362 frontend
1363 .run_sql_with_session(session.clone(), sql)
1364 .await
1365 .unwrap();
1366 let catalog_reader = session.env().catalog_reader().read_guard();
1367 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1368
1369 let (source, _) = catalog_reader
1371 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t2")
1372 .unwrap();
1373 assert_eq!(source.name, "t2");
1374
1375 let columns = source
1376 .columns
1377 .iter()
1378 .map(|col| (col.name(), col.data_type().clone()))
1379 .collect::<Vec<(&str, DataType)>>();
1380
1381 expect_test::expect![[r#"
1382 [
1383 (
1384 "payload",
1385 Jsonb,
1386 ),
1387 (
1388 "_rw_offset",
1389 Varchar,
1390 ),
1391 (
1392 "_rw_table_name",
1393 Varchar,
1394 ),
1395 (
1396 "_row_id",
1397 Serial,
1398 ),
1399 ]
1400 "#]]
1401 .assert_debug_eq(&columns);
1402 }
1403
1404 #[tokio::test]
1405 async fn test_source_addition_columns() {
1406 let sql =
1408 "CREATE SOURCE s (v1 int) include key as _rw_kafka_key with (connector = 'kafka') format plain encode json".to_owned();
1409 let frontend = LocalFrontend::new(Default::default()).await;
1410 frontend.run_sql(sql).await.unwrap();
1411 let session = frontend.session_ref();
1412 let catalog_reader = session.env().catalog_reader().read_guard();
1413 let (source, _) = catalog_reader
1414 .get_source_by_name(
1415 DEFAULT_DATABASE_NAME,
1416 SchemaPath::Name(DEFAULT_SCHEMA_NAME),
1417 "s",
1418 )
1419 .unwrap();
1420 assert_eq!(source.name, "s");
1421
1422 let columns = source
1423 .columns
1424 .iter()
1425 .map(|col| (col.name(), col.data_type().clone()))
1426 .collect::<Vec<(&str, DataType)>>();
1427
1428 expect_test::expect![[r#"
1429 [
1430 (
1431 "v1",
1432 Int32,
1433 ),
1434 (
1435 "_rw_kafka_key",
1436 Bytea,
1437 ),
1438 (
1439 "_rw_kafka_timestamp",
1440 Timestamptz,
1441 ),
1442 (
1443 "_rw_kafka_partition",
1444 Varchar,
1445 ),
1446 (
1447 "_rw_kafka_offset",
1448 Varchar,
1449 ),
1450 (
1451 "_row_id",
1452 Serial,
1453 ),
1454 ]
1455 "#]]
1456 .assert_debug_eq(&columns);
1457
1458 let sql =
1459 "CREATE SOURCE s3 (v1 int) include timestamp 'header1' as header_col with (connector = 'kafka') format plain encode json".to_owned();
1460 match frontend.run_sql(sql).await {
1461 Err(e) => {
1462 assert_eq!(
1463 e.to_string(),
1464 "Protocol error: Only header column can have inner field, but got \"timestamp\""
1465 )
1466 }
1467 _ => unreachable!(),
1468 }
1469 }
1470}