1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::LazyLock;
18
19use anyhow::{Context, anyhow};
20use either::Either;
21use external_schema::debezium::extract_debezium_avro_table_pk_columns;
22use external_schema::nexmark::check_nexmark_schema;
23use itertools::Itertools;
24use maplit::{convert_args, hashmap, hashset};
25use pgwire::pg_response::{PgResponse, StatementType};
26use rand::Rng;
27use risingwave_common::array::arrow::{IcebergArrowConvert, arrow_schema_iceberg};
28use risingwave_common::bail_not_implemented;
29use risingwave_common::catalog::{
30 ColumnCatalog, ColumnDesc, ColumnId, INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME,
31 ROW_ID_COLUMN_NAME, TableId, debug_assert_column_ids_distinct,
32};
33use risingwave_common::license::Feature;
34use risingwave_common::secret::LocalSecretManager;
35use risingwave_common::system_param::reader::SystemParamsRead;
36use risingwave_common::types::DataType;
37use risingwave_common::util::iter_util::ZipEqFast;
38use risingwave_connector::parser::additional_columns::{
39 build_additional_column_desc, get_supported_additional_columns,
40 source_add_partition_offset_cols,
41};
42use risingwave_connector::parser::{
43 AvroParserConfig, DEBEZIUM_IGNORE_KEY, DebeziumAvroParserConfig, ProtobufParserConfig,
44 SchemaLocation, SpecificParserConfig, TimestamptzHandling,
45 fetch_json_schema_and_map_to_columns,
46};
47use risingwave_connector::schema::AWS_GLUE_SCHEMA_ARN_KEY;
48use risingwave_connector::schema::schema_registry::{
49 SCHEMA_REGISTRY_BACKOFF_DURATION_KEY, SCHEMA_REGISTRY_BACKOFF_FACTOR_KEY,
50 SCHEMA_REGISTRY_MAX_DELAY_KEY, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_RETRIES_MAX_KEY,
51 SCHEMA_REGISTRY_USERNAME, SchemaRegistryConfig, name_strategy_from_str,
52};
53use risingwave_connector::source::cdc::{
54 CDC_MONGODB_STRONG_SCHEMA_KEY, CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL,
55 CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY, CDC_WAIT_FOR_STREAMING_START_TIMEOUT,
56 CITUS_CDC_CONNECTOR, MONGODB_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR,
57 SQL_SERVER_CDC_CONNECTOR,
58};
59use risingwave_connector::source::datagen::DATAGEN_CONNECTOR;
60use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
61use risingwave_connector::source::nexmark::source::{EventType, get_event_data_types_with_names};
62use risingwave_connector::source::test_source::TEST_CONNECTOR;
63pub use risingwave_connector::source::{
64 ADBC_SNOWFLAKE_CONNECTOR, UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR,
65};
66use risingwave_connector::source::{
67 AZBLOB_CONNECTOR, ConnectorProperties, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR,
68 KINESIS_CONNECTOR, LEGACY_S3_CONNECTOR, MQTT_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR,
69 OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR,
70};
71use risingwave_connector::{AUTO_SCHEMA_CHANGE_KEY, WithPropertiesExt};
72use risingwave_pb::catalog::connection_params::PbConnectionType;
73use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc};
74use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
75use risingwave_pb::plan_common::source_refresh_mode::{RefreshMode, SourceRefreshModeStreaming};
76use risingwave_pb::plan_common::{EncodeType, FormatType, SourceRefreshMode};
77use risingwave_pb::stream_plan::PbStreamFragmentGraph;
78use risingwave_pb::telemetry::TelemetryDatabaseObject;
79use risingwave_sqlparser::ast::{
80 AstString, ColumnDef, ColumnOption, CreateSourceStatement, Encode, Format, FormatEncodeOptions,
81 ObjectName, SourceWatermark, SqlOptionValue, TableConstraint, Value, get_delimiter,
82};
83use risingwave_sqlparser::parser::{IncludeOption, IncludeOptionItem};
84use thiserror_ext::AsReport;
85
86use super::RwPgResponse;
87use crate::binder::Binder;
88use crate::catalog::CatalogError;
89use crate::catalog::source_catalog::SourceCatalog;
90use crate::error::ErrorCode::{self, Deprecated, InvalidInputSyntax, NotSupported, ProtocolError};
91use crate::error::{Result, RwError};
92use crate::expr::Expr;
93use crate::handler::HandlerArgs;
94use crate::handler::create_table::{
95 ColumnIdGenerator, bind_pk_and_row_id_on_relation, bind_sql_column_constraints,
96 bind_sql_columns, bind_sql_pk_names, bind_table_constraints,
97};
98use crate::handler::util::{
99 SourceSchemaCompatExt, check_connector_match_connection_type, ensure_connection_type_allowed,
100};
101use crate::optimizer::plan_node::generic::SourceNodeKind;
102use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext};
103use crate::session::SessionImpl;
104use crate::session::current::notice_to_user;
105use crate::utils::{
106 OverwriteOptions, resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option,
107 resolve_secret_ref_in_with_options, resolve_source_refresh_mode_in_with_option,
108};
109use crate::{OptimizerContext, WithOptions, WithOptionsSecResolved, bind_data_type, build_graph};
110
111mod external_schema;
112pub use external_schema::{
113 bind_columns_from_source, get_schema_location, schema_has_schema_registry,
114};
115mod validate;
116pub use validate::validate_compatibility;
117use validate::{SOURCE_ALLOWED_CONNECTION_CONNECTOR, SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY};
118mod additional_column;
119use additional_column::check_and_add_timestamp_column;
120pub use additional_column::handle_addition_columns;
121use risingwave_common::catalog::ICEBERG_SOURCE_PREFIX;
122use risingwave_common::id::SourceId;
123
124use crate::stream_fragmenter::GraphJobType;
125
126fn non_generated_sql_columns(columns: &[ColumnDef]) -> Vec<ColumnDef> {
127 columns
128 .iter()
129 .filter(|c| !c.is_generated())
130 .cloned()
131 .collect()
132}
133
134fn try_consume_string_from_options(
135 format_encode_options: &mut BTreeMap<String, String>,
136 key: &str,
137) -> Option<AstString> {
138 format_encode_options.remove(key).map(AstString)
139}
140
141fn try_consume_schema_registry_config_from_options(
142 format_encode_options: &mut BTreeMap<String, String>,
143) {
144 [
145 SCHEMA_REGISTRY_USERNAME,
146 SCHEMA_REGISTRY_PASSWORD,
147 SCHEMA_REGISTRY_MAX_DELAY_KEY,
148 SCHEMA_REGISTRY_BACKOFF_DURATION_KEY,
149 SCHEMA_REGISTRY_BACKOFF_FACTOR_KEY,
150 SCHEMA_REGISTRY_RETRIES_MAX_KEY,
151 ]
152 .iter()
153 .for_each(|key| {
154 try_consume_string_from_options(format_encode_options, key);
155 });
156}
157
158fn consume_string_from_options(
159 format_encode_options: &mut BTreeMap<String, String>,
160 key: &str,
161) -> Result<AstString> {
162 try_consume_string_from_options(format_encode_options, key).ok_or(RwError::from(ProtocolError(
163 format!("missing field {} in options", key),
164 )))
165}
166
167fn consume_aws_config_from_options(format_encode_options: &mut BTreeMap<String, String>) {
168 format_encode_options.retain(|key, _| !key.starts_with("aws."))
169}
170
171#[derive(Debug, Clone, Copy, PartialEq, Eq)]
172pub enum CreateSourceType {
173 SharedCdc,
174 SharedNonCdc,
176 NonShared,
177 Table,
179}
180
181impl CreateSourceType {
182 pub fn for_newly_created(
184 session: &SessionImpl,
185 with_properties: &impl WithPropertiesExt,
186 ) -> Self {
187 if with_properties.is_shareable_cdc_connector() {
188 CreateSourceType::SharedCdc
189 } else if with_properties.is_shareable_non_cdc_connector()
190 && session
191 .env()
192 .streaming_config()
193 .developer
194 .enable_shared_source
195 && session.config().streaming_use_shared_source()
196 {
197 CreateSourceType::SharedNonCdc
198 } else {
199 CreateSourceType::NonShared
200 }
201 }
202
203 pub fn for_replace(catalog: &SourceCatalog) -> Self {
204 if !catalog.info.is_shared() {
205 CreateSourceType::NonShared
206 } else if catalog.with_properties.is_shareable_cdc_connector() {
207 CreateSourceType::SharedCdc
208 } else {
209 CreateSourceType::SharedNonCdc
210 }
211 }
212
213 pub fn is_shared(&self) -> bool {
214 matches!(
215 self,
216 CreateSourceType::SharedCdc | CreateSourceType::SharedNonCdc
217 )
218 }
219}
220
221pub(crate) fn bind_all_columns(
223 format_encode: &FormatEncodeOptions,
224 cols_from_source: Option<Vec<ColumnCatalog>>,
225 cols_from_sql: Vec<ColumnCatalog>,
226 col_defs_from_sql: &[ColumnDef],
227 wildcard_idx: Option<usize>,
228 sql_column_strategy: SqlColumnStrategy,
229) -> Result<Vec<ColumnCatalog>> {
230 if let Some(cols_from_source) = cols_from_source {
231 let generated_cols_from_sql = cols_from_sql
234 .iter()
235 .filter(|c| {
236 col_defs_from_sql
237 .iter()
238 .find(|d| d.name.real_value() == c.name())
239 .unwrap()
240 .is_generated()
241 })
242 .cloned()
243 .collect_vec();
244
245 #[allow(clippy::collapsible_else_if)]
246 match sql_column_strategy {
247 SqlColumnStrategy::FollowUnchecked => {
249 assert!(
250 wildcard_idx.is_none(),
251 "wildcard still exists while strategy is Follows, not correctly purified?"
252 );
253 return Ok(cols_from_sql);
254 }
255
256 SqlColumnStrategy::Ignore => {}
258
259 SqlColumnStrategy::FollowChecked => {
260 let has_regular_cols_from_sql =
261 generated_cols_from_sql.len() != cols_from_sql.len();
262
263 if has_regular_cols_from_sql {
264 if wildcard_idx.is_some() {
265 return Err(RwError::from(NotSupported(
267 "When there's a wildcard (\"*\"), \
268 only generated columns are allowed in user-defined schema from SQL"
269 .to_owned(),
270 "Remove the non-generated columns".to_owned(),
271 )));
272 } else {
273 for col in &cols_from_sql {
276 if generated_cols_from_sql.contains(col) {
277 continue;
278 }
279 let Some(col_from_source) =
280 cols_from_source.iter().find(|c| c.name() == col.name())
281 else {
282 return Err(RwError::from(ProtocolError(format!(
283 "Column \"{}\" is defined in SQL but not found in the source",
284 col.name()
285 ))));
286 };
287
288 if col_from_source.data_type() != col.data_type() {
289 return Err(RwError::from(ProtocolError(format!(
290 "Data type mismatch for column \"{}\". \
291 Defined in SQL as \"{}\", but found in the source as \"{}\"",
292 col.name(),
293 col.data_type(),
294 col_from_source.data_type()
295 ))));
296 }
297 }
298 return Ok(cols_from_sql);
299 }
300 } else {
301 if wildcard_idx.is_some() {
302 } else {
306 notice_to_user("\
314 Neither wildcard (\"*\") nor regular (non-generated) columns appear in the user-defined schema from SQL. \
315 For backward compatibility, all columns from the source will be included at the beginning. \
316 For clarity, consider adding a wildcard (\"*\") to indicate where the columns from the source should be included, \
317 or specifying the columns you want to include from the source.
318 ");
319 }
320 }
321 }
322 }
323
324 let wildcard_idx = wildcard_idx.unwrap_or(0).min(generated_cols_from_sql.len());
329
330 let mut merged_cols = generated_cols_from_sql;
332 let merged_cols_r = merged_cols.split_off(wildcard_idx);
333 merged_cols.extend(cols_from_source);
334 merged_cols.extend(merged_cols_r);
335
336 Ok(merged_cols)
337 } else {
338 if wildcard_idx.is_some() {
339 return Err(RwError::from(NotSupported(
340 "Wildcard in user-defined schema is only allowed when there exists columns from external schema".to_owned(),
341 "Remove the wildcard or use a source with external schema".to_owned(),
342 )));
343 }
344 let non_generated_sql_defined_columns = non_generated_sql_columns(col_defs_from_sql);
345
346 match (&format_encode.format, &format_encode.row_encode) {
347 (Format::DebeziumMongo, Encode::Json) => {
348 let strong_schema = format_encode
349 .row_options
350 .iter()
351 .find(|k| k.name.real_value().to_lowercase() == CDC_MONGODB_STRONG_SCHEMA_KEY)
352 .map(|k| matches!(k.value, SqlOptionValue::Value(Value::Boolean(true))))
353 .unwrap_or(false);
354
355 if strong_schema {
357 let (_, id_column) = non_generated_sql_defined_columns
358 .iter()
359 .enumerate()
360 .find(|(idx, col)| *idx == 0 && col.name.real_value() == "_id")
361 .ok_or_else(|| {
362 RwError::from(ProtocolError(
363 "The `_id` column of the source with row format DebeziumMongoJson must be defined as the first column in SQL".to_owned(),
364 ))
365 })?;
366
367 let id_data_type = bind_data_type(id_column.data_type.as_ref().unwrap())?;
368 if !matches!(
369 id_data_type,
370 DataType::Varchar | DataType::Int32 | DataType::Int64 | DataType::Jsonb
371 ) {
372 return Err(RwError::from(ProtocolError(
373 "the `_id` column of the source with row format DebeziumMongoJson must be [Jsonb | Varchar | Int32 | Int64]".to_owned(),
374 )));
375 }
376
377 let mut columns = Vec::with_capacity(non_generated_sql_defined_columns.len());
378 columns.push(
379 ColumnCatalog {
381 column_desc: ColumnDesc::named("_id", 0.into(), id_data_type),
382 is_hidden: false,
383 },
384 );
385
386 for (idx, col) in non_generated_sql_defined_columns
388 .into_iter()
389 .skip(1)
391 .enumerate()
392 {
393 columns.push(ColumnCatalog {
394 column_desc: ColumnDesc::named(
395 col.name.real_value(),
396 (idx as i32).into(),
397 bind_data_type(col.data_type.as_ref().unwrap())?,
398 ),
399 is_hidden: false,
400 });
401 }
402
403 return Ok(columns);
404 }
405
406 let mut columns = vec![
407 ColumnCatalog {
408 column_desc: ColumnDesc::named("_id", 0.into(), DataType::Varchar),
409 is_hidden: false,
410 },
411 ColumnCatalog {
412 column_desc: ColumnDesc::named("payload", 0.into(), DataType::Jsonb),
413 is_hidden: false,
414 },
415 ];
416
417 if non_generated_sql_defined_columns.len() != 2
418 || non_generated_sql_defined_columns[0].name.real_value() != columns[0].name()
419 || non_generated_sql_defined_columns[1].name.real_value() != columns[1].name()
420 {
421 return Err(RwError::from(ProtocolError(
422 "the not generated columns of the source with row format DebeziumMongoJson
423 must be (_id [Jsonb | Varchar | Int32 | Int64], payload jsonb)."
424 .to_owned(),
425 )));
426 }
427 let key_data_type = bind_data_type(
429 non_generated_sql_defined_columns[0]
430 .data_type
431 .as_ref()
432 .unwrap(),
433 )?;
434 match key_data_type {
435 DataType::Jsonb | DataType::Varchar | DataType::Int32 | DataType::Int64 => {
436 columns[0].column_desc.data_type = key_data_type;
437 }
438 _ => {
439 return Err(RwError::from(ProtocolError(
440 "the `_id` column of the source with row format DebeziumMongoJson
441 must be [Jsonb | Varchar | Int32 | Int64]"
442 .to_owned(),
443 )));
444 }
445 }
446
447 let value_data_type = bind_data_type(
449 non_generated_sql_defined_columns[1]
450 .data_type
451 .as_ref()
452 .unwrap(),
453 )?;
454 if !matches!(value_data_type, DataType::Jsonb) {
455 return Err(RwError::from(ProtocolError(
456 "the `payload` column of the source with row format DebeziumMongoJson
457 must be Jsonb datatype"
458 .to_owned(),
459 )));
460 }
461 Ok(columns)
462 }
463 (Format::Plain, Encode::Bytes) => {
464 let err = Err(RwError::from(ProtocolError(
465 "ENCODE BYTES only accepts one BYTEA type column".to_owned(),
466 )));
467 if non_generated_sql_defined_columns.len() == 1 {
468 let col_data_type = bind_data_type(
470 non_generated_sql_defined_columns[0]
471 .data_type
472 .as_ref()
473 .unwrap(),
474 )?;
475 if col_data_type == DataType::Bytea {
476 Ok(cols_from_sql)
477 } else {
478 err
479 }
480 } else {
481 err
482 }
483 }
484 (_, _) => Ok(cols_from_sql),
485 }
486 }
487}
488
489fn hint_format_encode(format_encode: &FormatEncodeOptions) -> String {
491 format!(
492 r#"Hint: For FORMAT {0} ENCODE {1}, INCLUDE KEY must be specified and the key column must be used as primary key.
493example:
494 CREATE TABLE <table_name> ( PRIMARY KEY ([rw_key | <key_name>]) )
495 INCLUDE KEY [AS <key_name>]
496 WITH (...)
497 FORMAT {0} ENCODE {1}{2}
498"#,
499 format_encode.format,
500 format_encode.row_encode,
501 if format_encode.row_encode == Encode::Json || format_encode.row_encode == Encode::Bytes {
502 "".to_owned()
503 } else {
504 " (...)".to_owned()
505 }
506 )
507}
508
509pub(crate) async fn bind_source_pk(
512 format_encode: &FormatEncodeOptions,
513 source_info: &StreamSourceInfo,
514 columns: &mut [ColumnCatalog],
515 sql_defined_pk_names: Vec<String>,
516 with_properties: &WithOptionsSecResolved,
517) -> Result<Vec<String>> {
518 let sql_defined_pk = !sql_defined_pk_names.is_empty();
519 let include_key_column_name: Option<String> = {
520 columns.iter().find_map(|catalog| {
523 if matches!(
524 catalog.column_desc.additional_column.column_type,
525 Some(AdditionalColumnType::Key(_))
526 ) {
527 Some(catalog.name().to_owned())
528 } else {
529 None
530 }
531 })
532 };
533 let additional_column_names = columns
534 .iter()
535 .filter_map(|col| {
536 if col.column_desc.additional_column.column_type.is_some() {
537 Some(col.name().to_owned())
538 } else {
539 None
540 }
541 })
542 .collect_vec();
543
544 let res = match (&format_encode.format, &format_encode.row_encode) {
545 (Format::Native, Encode::Native) | (Format::None, Encode::None) | (Format::Plain, _) => {
546 sql_defined_pk_names
547 }
548
549 (Format::Upsert, Encode::Json | Encode::Avro | Encode::Protobuf) => {
552 if let Some(ref key_column_name) = include_key_column_name
553 && sql_defined_pk
554 {
555 if sql_defined_pk_names.len() != 1
560 || !key_column_name.eq(sql_defined_pk_names[0].as_str())
561 {
562 return Err(RwError::from(ProtocolError(format!(
563 "Only \"{}\" can be used as primary key\n\n{}",
564 key_column_name,
565 hint_format_encode(format_encode)
566 ))));
567 }
568 sql_defined_pk_names
569 } else {
570 return if let Some(include_key_column_name) = include_key_column_name {
572 Err(RwError::from(ProtocolError(format!(
573 "Primary key must be specified to {}\n\n{}",
574 include_key_column_name,
575 hint_format_encode(format_encode)
576 ))))
577 } else {
578 Err(RwError::from(ProtocolError(format!(
579 "INCLUDE KEY clause not set\n\n{}",
580 hint_format_encode(format_encode)
581 ))))
582 };
583 }
584 }
585
586 (Format::Debezium, Encode::Json) => {
587 if !additional_column_names.is_empty() {
588 return Err(RwError::from(ProtocolError(format!(
589 "FORMAT DEBEZIUM forbids additional columns, but got {:?}",
590 additional_column_names
591 ))));
592 }
593 if !sql_defined_pk {
594 return Err(RwError::from(ProtocolError(
595 "Primary key must be specified when creating source with FORMAT DEBEZIUM."
596 .to_owned(),
597 )));
598 }
599 sql_defined_pk_names
600 }
601 (Format::Debezium, Encode::Avro) => {
602 if !additional_column_names.is_empty() {
603 return Err(RwError::from(ProtocolError(format!(
604 "FORMAT DEBEZIUM forbids additional columns, but got {:?}",
605 additional_column_names
606 ))));
607 }
608 if sql_defined_pk {
609 sql_defined_pk_names
610 } else {
611 let pk_names =
612 extract_debezium_avro_table_pk_columns(source_info, with_properties).await?;
613 for pk_name in &pk_names {
615 columns
616 .iter()
617 .find(|c: &&ColumnCatalog| c.name().eq(pk_name))
618 .ok_or_else(|| {
619 RwError::from(ProtocolError(format!(
620 "avro's key column {} not exists in avro's row schema",
621 pk_name
622 )))
623 })?;
624 }
625 pk_names
626 }
627 }
628 (Format::DebeziumMongo, Encode::Json) => {
629 if sql_defined_pk {
630 sql_defined_pk_names
631 } else {
632 vec!["_id".to_owned()]
633 }
634 }
635
636 (Format::Maxwell, Encode::Json) => {
637 if !additional_column_names.is_empty() {
638 return Err(RwError::from(ProtocolError(format!(
639 "FORMAT MAXWELL forbids additional columns, but got {:?}",
640 additional_column_names
641 ))));
642 }
643 if !sql_defined_pk {
644 return Err(RwError::from(ProtocolError(
645 "Primary key must be specified when creating source with FORMAT MAXWELL ENCODE JSON.".to_owned(),
646 )));
647 }
648 sql_defined_pk_names
649 }
650
651 (Format::Canal, Encode::Json) => {
652 if !additional_column_names.is_empty() {
653 return Err(RwError::from(ProtocolError(format!(
654 "FORMAT CANAL forbids additional columns, but got {:?}",
655 additional_column_names
656 ))));
657 }
658 if !sql_defined_pk {
659 return Err(RwError::from(ProtocolError(
660 "Primary key must be specified when creating source with FORMAT CANAL ENCODE JSON.".to_owned(),
661 )));
662 }
663 sql_defined_pk_names
664 }
665 (format, encoding) => {
666 return Err(RwError::from(ProtocolError(format!(
667 "Unknown combination {:?} {:?}",
668 format, encoding
669 ))));
670 }
671 };
672 Ok(res)
673}
674
675pub(super) fn bind_source_watermark(
676 session: &SessionImpl,
677 name: String,
678 source_watermarks: Vec<SourceWatermark>,
679 column_catalogs: &[ColumnCatalog],
680) -> Result<Vec<WatermarkDesc>> {
681 let mut binder = Binder::new_for_ddl(session);
682 binder.bind_columns_to_context(name.clone(), column_catalogs)?;
683
684 let watermark_descs = source_watermarks
685 .into_iter()
686 .map(|source_watermark| {
687 let col_name = source_watermark.column.real_value();
688 let watermark_idx = binder.get_column_binding_index(name.clone(), &col_name)?;
689
690 let expr = binder.bind_expr(&source_watermark.expr)?;
691 let watermark_col_type = column_catalogs[watermark_idx].data_type();
692 let watermark_expr_type = &expr.return_type();
693 if watermark_col_type != watermark_expr_type {
694 Err(RwError::from(ErrorCode::BindError(
695 format!("The return value type of the watermark expression must be identical to the watermark column data type. Current data type of watermark return value: `{}`, column `{}`",watermark_expr_type, watermark_col_type),
696 )))
697 } else {
698 let expr_proto = expr.to_expr_proto();
699 Ok::<_, RwError>(WatermarkDesc {
700 watermark_idx: watermark_idx as u32,
701 expr: Some(expr_proto),
702 })
703 }
704 })
705 .try_collect()?;
706 Ok(watermark_descs)
707}
708
709pub(super) fn check_format_encode(
715 props: &WithOptionsSecResolved,
716 row_id_index: Option<usize>,
717 columns: &[ColumnCatalog],
718) -> Result<()> {
719 let Some(connector) = props.get_connector() else {
720 return Ok(());
721 };
722
723 if connector == NEXMARK_CONNECTOR {
724 check_nexmark_schema(props, row_id_index, columns)
725 } else {
726 Ok(())
727 }
728}
729
730pub fn bind_connector_props(
731 handler_args: &HandlerArgs,
732 format_encode: &FormatEncodeOptions,
733 is_create_source: bool,
734) -> Result<(WithOptions, SourceRefreshMode)> {
735 let mut with_properties = handler_args.with_options.clone().into_connector_props();
736 validate_compatibility(format_encode, &mut with_properties)?;
737 let refresh_mode = {
738 let refresh_mode = resolve_source_refresh_mode_in_with_option(&mut with_properties)?;
739 if is_create_source && refresh_mode.is_some() {
740 return Err(RwError::from(ProtocolError(
741 "`refresh_mode` only supported for CREATE TABLE".to_owned(),
742 )));
743 }
744
745 refresh_mode.unwrap_or(SourceRefreshMode {
746 refresh_mode: Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})),
747 })
748 };
749
750 let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
751
752 if !is_create_source && with_properties.is_shareable_only_cdc_connector() {
753 return Err(RwError::from(ProtocolError(format!(
754 "connector {} does not support `CREATE TABLE`, please use `CREATE SOURCE` instead",
755 with_properties.get_connector().unwrap(),
756 ))));
757 }
758 if is_create_source && create_cdc_source_job {
759 if let Some(value) = with_properties.get(AUTO_SCHEMA_CHANGE_KEY)
760 && value.parse::<bool>().map_err(|_| {
761 ErrorCode::InvalidInputSyntax(format!(
762 "invalid value of '{}' option",
763 AUTO_SCHEMA_CHANGE_KEY
764 ))
765 })?
766 {
767 Feature::CdcAutoSchemaChange.check_available()?;
768 }
769
770 with_properties.insert(CDC_SNAPSHOT_MODE_KEY.into(), CDC_SNAPSHOT_BACKFILL.into());
772 with_properties.insert(CDC_SHARING_MODE_KEY.into(), "true".into());
774 if with_properties.enable_transaction_metadata() {
776 with_properties.insert(CDC_TRANSACTIONAL_KEY.into(), "true".into());
777 }
778 if !with_properties.contains_key(CDC_WAIT_FOR_STREAMING_START_TIMEOUT) {
780 with_properties.insert(
781 CDC_WAIT_FOR_STREAMING_START_TIMEOUT.into(),
782 handler_args
783 .session
784 .config()
785 .cdc_source_wait_streaming_start_timeout()
786 .to_string(),
787 );
788 }
789 }
790 if with_properties.is_mysql_cdc_connector() {
791 with_properties
795 .entry("server.id".to_owned())
796 .or_insert(rand::rng().random_range(1..u32::MAX).to_string());
797 }
798 Ok((with_properties, refresh_mode))
799}
800
801pub enum SqlColumnStrategy {
804 FollowUnchecked,
809
810 FollowChecked,
817
818 Ignore,
823}
824
825#[allow(clippy::too_many_arguments)]
828pub async fn bind_create_source_or_table_with_connector(
829 handler_args: HandlerArgs,
830 full_name: ObjectName,
831 format_encode: FormatEncodeOptions,
832 with_properties: WithOptions,
833 sql_columns_defs: &[ColumnDef],
834 constraints: Vec<TableConstraint>,
835 wildcard_idx: Option<usize>,
836 source_watermarks: Vec<SourceWatermark>,
837 columns_from_resolve_source: Option<Vec<ColumnCatalog>>,
838 source_info: StreamSourceInfo,
839 include_column_options: IncludeOption,
840 col_id_gen: &mut ColumnIdGenerator,
841 create_source_type: CreateSourceType,
842 source_rate_limit: Option<u32>,
843 sql_column_strategy: SqlColumnStrategy,
844 refresh_mode: SourceRefreshMode,
845) -> Result<SourceCatalog> {
846 let session = &handler_args.session;
847 let db_name: &str = &session.database();
848 let (schema_name, source_name) = Binder::resolve_schema_qualified_name(db_name, &full_name)?;
849 let (database_id, schema_id) =
850 session.get_database_and_schema_id_for_create(schema_name.clone())?;
851
852 let is_create_source = create_source_type != CreateSourceType::Table;
853
854 if is_create_source {
855 if with_properties.is_batch_connector() {
857 return Err(ErrorCode::BindError(
858 "can't CREATE SOURCE with refreshable batch connector\n\nHint: use CREATE TABLE instead"
859 .to_owned(),
860 )
861 .into());
862 }
863
864 match format_encode.format {
865 Format::Debezium | Format::DebeziumMongo | Format::Maxwell | Format::Canal => {
867 return Err(ErrorCode::BindError(format!(
868 "can't CREATE SOURCE with FORMAT {}.\n\nHint: use CREATE TABLE instead\n\n{}",
869 format_encode.format,
870 hint_format_encode(&format_encode)
871 ))
872 .into());
873 }
874 Format::Upsert => {
876 notice_to_user(format!(
877 "Streaming queries on sources with `FORMAT {}` may have limitations. If your query isn't supported, consider using `CREATE TABLE` instead.",
878 format_encode.format
879 ));
880 }
881 _ => {}
882 }
883 }
884
885 let sql_pk_names = bind_sql_pk_names(sql_columns_defs, bind_table_constraints(&constraints)?)?;
886
887 if with_properties.is_iceberg_connector() && !sql_columns_defs.is_empty() {
891 return Err(RwError::from(InvalidInputSyntax(
892 r#"Schema is automatically inferred for iceberg source and should not be specified
893
894HINT: use `CREATE SOURCE <name> WITH (...)` instead of `CREATE SOURCE <name> (<columns>) WITH (...)`."#.to_owned(),
895 )));
896 }
897
898 if with_properties.is_batch_connector()
900 && with_properties
901 .get(UPSTREAM_SOURCE_KEY)
902 .is_some_and(|s| s.eq_ignore_ascii_case(ADBC_SNOWFLAKE_CONNECTOR))
903 && !sql_columns_defs.is_empty()
904 {
905 return Err(RwError::from(InvalidInputSyntax(
906 r#"Schema is automatically inferred for ADBC Snowflake source and should not be specified
907
908HINT: use `CREATE TABLE <name> WITH (...)` instead of `CREATE TABLE <name> (<columns>) WITH (...)`."#.to_owned(),
909 )));
910 }
911 let columns_from_sql = bind_sql_columns(sql_columns_defs, false)?;
912
913 let mut columns = bind_all_columns(
914 &format_encode,
915 columns_from_resolve_source,
916 columns_from_sql,
917 sql_columns_defs,
918 wildcard_idx,
919 sql_column_strategy,
920 )?;
921
922 handle_addition_columns(
924 Some(&format_encode),
925 &with_properties,
926 include_column_options,
927 &mut columns,
928 false,
929 )?;
930
931 if columns.is_empty() {
932 return Err(RwError::from(ProtocolError(
933 "Schema definition is required, either from SQL or schema registry.".to_owned(),
934 )));
935 }
936
937 if is_create_source {
939 check_and_add_timestamp_column(&with_properties, &mut columns);
941
942 if create_source_type == CreateSourceType::SharedNonCdc {
945 let (columns_exist, additional_columns) = source_add_partition_offset_cols(
946 &columns,
947 &with_properties.get_connector().unwrap(),
948 true, );
950 for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) {
951 if !existed {
952 columns.push(ColumnCatalog::hidden(c));
953 }
954 }
955 }
956 }
957
958 let mut with_properties = with_properties;
960 resolve_privatelink_in_with_option(&mut with_properties)?;
961
962 if session
964 .env()
965 .system_params_manager()
966 .get_params()
967 .load()
968 .enforce_secret()
969 && Feature::SecretManagement.check_available().is_ok()
970 {
971 ConnectorProperties::enforce_secret_source(&with_properties)?;
973 }
974
975 let (with_properties, connection_type, connector_conn_ref) =
976 resolve_connection_ref_and_secret_ref(
977 with_properties,
978 session,
979 Some(TelemetryDatabaseObject::Source),
980 )?;
981 ensure_connection_type_allowed(connection_type, &SOURCE_ALLOWED_CONNECTION_CONNECTOR)?;
982
983 if !matches!(connection_type, PbConnectionType::Unspecified) {
985 let Some(connector) = with_properties.get_connector() else {
986 return Err(RwError::from(ProtocolError(format!(
987 "missing field '{}' in WITH clause",
988 UPSTREAM_SOURCE_KEY
989 ))));
990 };
991 check_connector_match_connection_type(connector.as_str(), &connection_type)?;
992 }
993
994 let pk_names = bind_source_pk(
995 &format_encode,
996 &source_info,
997 &mut columns,
998 sql_pk_names,
999 &with_properties,
1000 )
1001 .await?;
1002
1003 if let Some(duplicated_name) = columns.iter().map(|c| c.name()).duplicates().next() {
1006 return Err(ErrorCode::InvalidInputSyntax(format!(
1007 "column \"{}\" specified more than once",
1008 duplicated_name
1009 ))
1010 .into());
1011 }
1012
1013 for c in &mut columns {
1015 let original_data_type = c.data_type().clone();
1016 col_id_gen.generate(c)?;
1017 if is_create_source {
1021 c.column_desc.data_type = original_data_type;
1022 }
1023 }
1024 debug_assert_column_ids_distinct(&columns);
1025
1026 let (mut columns, pk_col_ids, row_id_index) =
1027 bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
1028
1029 let watermark_descs =
1030 bind_source_watermark(session, source_name.clone(), source_watermarks, &columns)?;
1031 assert!(watermark_descs.len() <= 1);
1033
1034 let append_only = row_id_index.is_some();
1035 if is_create_source && !append_only && !watermark_descs.is_empty() {
1036 return Err(ErrorCode::NotSupported(
1037 "Defining watermarks on source requires the source connector to be append only."
1038 .to_owned(),
1039 "Use the key words `FORMAT PLAIN`".to_owned(),
1040 )
1041 .into());
1042 }
1043
1044 bind_sql_column_constraints(
1045 session,
1046 source_name.clone(),
1047 &mut columns,
1048 sql_columns_defs,
1050 &pk_col_ids,
1051 )?;
1052 check_format_encode(&with_properties, row_id_index, &columns)?;
1053
1054 let definition = handler_args.normalized_sql.clone();
1055
1056 let associated_table_id = if is_create_source {
1057 None
1058 } else {
1059 Some(TableId::placeholder())
1060 };
1061 let source = SourceCatalog {
1062 id: SourceId::placeholder(),
1063 name: source_name,
1064 schema_id,
1065 database_id,
1066 columns,
1067 pk_col_ids,
1068 append_only,
1069 owner: session.user_id(),
1070 info: source_info,
1071 row_id_index,
1072 with_properties,
1073 watermark_descs,
1074 associated_table_id,
1075 definition,
1076 connection_id: connector_conn_ref,
1077 created_at_epoch: None,
1078 initialized_at_epoch: None,
1079 version: INITIAL_SOURCE_VERSION_ID,
1080 created_at_cluster_version: None,
1081 initialized_at_cluster_version: None,
1082 rate_limit: source_rate_limit,
1083 refresh_mode: Some(refresh_mode),
1084 };
1085 Ok(source)
1086}
1087
1088pub async fn handle_create_source(
1089 mut handler_args: HandlerArgs,
1090 stmt: CreateSourceStatement,
1091) -> Result<RwPgResponse> {
1092 let session = handler_args.session.clone();
1093 let overwrite_options = OverwriteOptions::new(&mut handler_args);
1094
1095 if let Either::Right(resp) = session.check_relation_name_duplicated(
1096 stmt.source_name.clone(),
1097 StatementType::CREATE_SOURCE,
1098 stmt.if_not_exists,
1099 )? {
1100 return Ok(resp);
1101 }
1102
1103 if stmt
1104 .source_name
1105 .base_name()
1106 .starts_with(ICEBERG_SOURCE_PREFIX)
1107 {
1108 return Err(RwError::from(InvalidInputSyntax(format!(
1109 "Source name cannot start with reserved prefix '{}'",
1110 ICEBERG_SOURCE_PREFIX
1111 ))));
1112 }
1113
1114 if handler_args.with_options.is_empty() {
1115 return Err(RwError::from(InvalidInputSyntax(
1116 "missing WITH clause".to_owned(),
1117 )));
1118 }
1119
1120 let format_encode = stmt.format_encode.into_v2_with_warning();
1121 let (with_properties, refresh_mode) =
1122 bind_connector_props(&handler_args, &format_encode, true)?;
1123
1124 let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
1125 let (columns_from_resolve_source, source_info) = bind_columns_from_source(
1126 &session,
1127 &format_encode,
1128 Either::Left(&with_properties),
1129 create_source_type,
1130 )
1131 .await?;
1132 let mut col_id_gen = ColumnIdGenerator::new_initial();
1133
1134 if stmt.columns.iter().any(|col| {
1135 col.options
1136 .iter()
1137 .any(|def| matches!(def.option, ColumnOption::NotNull))
1138 }) {
1139 return Err(RwError::from(InvalidInputSyntax(
1140 "NOT NULL constraint is not supported in source schema".to_owned(),
1141 )));
1142 }
1143
1144 let source_catalog = bind_create_source_or_table_with_connector(
1145 handler_args.clone(),
1146 stmt.source_name,
1147 format_encode,
1148 with_properties,
1149 &stmt.columns,
1150 stmt.constraints,
1151 stmt.wildcard_idx,
1152 stmt.source_watermarks,
1153 columns_from_resolve_source,
1154 source_info,
1155 stmt.include_column_options,
1156 &mut col_id_gen,
1157 create_source_type,
1158 overwrite_options.source_rate_limit,
1159 SqlColumnStrategy::FollowChecked,
1160 refresh_mode,
1161 )
1162 .await?;
1163
1164 if stmt.temporary {
1166 if session.get_temporary_source(&source_catalog.name).is_some() {
1167 return Err(CatalogError::duplicated("source", source_catalog.name.clone()).into());
1168 }
1169 session.create_temporary_source(source_catalog);
1170 return Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE));
1171 }
1172
1173 let source = source_catalog.to_prost();
1174
1175 let catalog_writer = session.catalog_writer()?;
1176
1177 if create_source_type.is_shared() {
1178 let graph = generate_stream_graph_for_source(handler_args, source_catalog)?;
1179 catalog_writer
1180 .create_source(source, Some(graph), stmt.if_not_exists)
1181 .await?;
1182 } else {
1183 catalog_writer
1185 .create_source(source, None, stmt.if_not_exists)
1186 .await?;
1187 }
1188
1189 Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE))
1190}
1191
1192pub(super) fn generate_stream_graph_for_source(
1193 handler_args: HandlerArgs,
1194 source_catalog: SourceCatalog,
1195) -> Result<PbStreamFragmentGraph> {
1196 let context = OptimizerContext::from_handler_args(handler_args);
1197 let source_node = LogicalSource::with_catalog(
1198 Rc::new(source_catalog),
1199 SourceNodeKind::CreateSharedSource,
1200 context.into(),
1201 None,
1202 )?;
1203
1204 let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?;
1205 let graph = build_graph(stream_plan, Some(GraphJobType::Source))?;
1206 Ok(graph)
1207}
1208
1209#[cfg(test)]
1210pub mod tests {
1211 use std::collections::HashMap;
1212 use std::sync::Arc;
1213
1214 use risingwave_common::catalog::{
1215 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME,
1216 };
1217 use risingwave_common::types::{DataType, StructType};
1218
1219 use crate::catalog::root_catalog::SchemaPath;
1220 use crate::catalog::source_catalog::SourceCatalog;
1221 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
1222
1223 const GET_COLUMN_FROM_CATALOG: fn(&Arc<SourceCatalog>) -> HashMap<&str, DataType> =
1224 |catalog: &Arc<SourceCatalog>| -> HashMap<&str, DataType> {
1225 catalog
1226 .columns
1227 .iter()
1228 .map(|col| (col.name(), col.data_type().clone()))
1229 .collect::<HashMap<&str, DataType>>()
1230 };
1231
1232 #[tokio::test]
1233 async fn test_create_source_handler() {
1234 let proto_file = create_proto_file(PROTO_FILE_DATA);
1235 let sql = format!(
1236 r#"CREATE SOURCE t
1237 WITH (connector = 'kinesis')
1238 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
1239 proto_file.path().to_str().unwrap()
1240 );
1241 let frontend = LocalFrontend::new(Default::default()).await;
1242 frontend.run_sql(sql).await.unwrap();
1243
1244 let session = frontend.session_ref();
1245 let catalog_reader = session.env().catalog_reader().read_guard();
1246 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1247
1248 let (source, _) = catalog_reader
1250 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
1251 .unwrap();
1252 assert_eq!(source.name, "t");
1253
1254 let columns = GET_COLUMN_FROM_CATALOG(source);
1255
1256 let city_type = StructType::new(vec![
1257 ("address", DataType::Varchar),
1258 ("zipcode", DataType::Varchar),
1259 ])
1260 .into();
1262 let expected_columns = maplit::hashmap! {
1263 ROW_ID_COLUMN_NAME => DataType::Serial,
1264 "id" => DataType::Int32,
1265 "zipcode" => DataType::Int64,
1266 "rate" => DataType::Float32,
1267 "country" => StructType::new(
1268 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
1269 )
1270 .into(),
1272 };
1273 assert_eq!(columns, expected_columns, "{columns:#?}");
1274 }
1275
1276 #[tokio::test]
1277 async fn test_duplicate_props_options() {
1278 let proto_file = create_proto_file(PROTO_FILE_DATA);
1279 let sql = format!(
1280 r#"CREATE SOURCE t
1281 WITH (
1282 connector = 'kinesis',
1283 aws.region='user_test_topic',
1284 endpoint='172.10.1.1:9090,172.10.1.2:9090',
1285 aws.credentials.access_key_id = 'your_access_key_1',
1286 aws.credentials.secret_access_key = 'your_secret_key_1'
1287 )
1288 FORMAT PLAIN ENCODE PROTOBUF (
1289 message = '.test.TestRecord',
1290 aws.credentials.access_key_id = 'your_access_key_2',
1291 aws.credentials.secret_access_key = 'your_secret_key_2',
1292 schema.location = 'file://{}',
1293 )"#,
1294 proto_file.path().to_str().unwrap()
1295 );
1296 let frontend = LocalFrontend::new(Default::default()).await;
1297 frontend.run_sql(sql).await.unwrap();
1298
1299 let session = frontend.session_ref();
1300 let catalog_reader = session.env().catalog_reader().read_guard();
1301 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1302
1303 let (source, _) = catalog_reader
1305 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
1306 .unwrap();
1307 assert_eq!(source.name, "t");
1308
1309 assert_eq!(
1311 source
1312 .info
1313 .format_encode_options
1314 .get("aws.credentials.access_key_id")
1315 .unwrap(),
1316 "your_access_key_2"
1317 );
1318 assert_eq!(
1319 source
1320 .info
1321 .format_encode_options
1322 .get("aws.credentials.secret_access_key")
1323 .unwrap(),
1324 "your_secret_key_2"
1325 );
1326
1327 assert_eq!(
1329 source
1330 .with_properties
1331 .get("aws.credentials.access_key_id")
1332 .unwrap(),
1333 "your_access_key_1"
1334 );
1335 assert_eq!(
1336 source
1337 .with_properties
1338 .get("aws.credentials.secret_access_key")
1339 .unwrap(),
1340 "your_secret_key_1"
1341 );
1342
1343 assert!(!source.with_properties.contains_key("schema.location"));
1345 }
1346
1347 #[tokio::test]
1348 async fn test_multi_table_cdc_create_source_handler() {
1349 let sql =
1350 "CREATE SOURCE t2 WITH (connector = 'mysql-cdc') FORMAT PLAIN ENCODE JSON".to_owned();
1351 let frontend = LocalFrontend::new(Default::default()).await;
1352 let session = frontend.session_ref();
1353
1354 frontend
1355 .run_sql_with_session(session.clone(), sql)
1356 .await
1357 .unwrap();
1358 let catalog_reader = session.env().catalog_reader().read_guard();
1359 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1360
1361 let (source, _) = catalog_reader
1363 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t2")
1364 .unwrap();
1365 assert_eq!(source.name, "t2");
1366
1367 let columns = source
1368 .columns
1369 .iter()
1370 .map(|col| (col.name(), col.data_type().clone()))
1371 .collect::<Vec<(&str, DataType)>>();
1372
1373 expect_test::expect![[r#"
1374 [
1375 (
1376 "payload",
1377 Jsonb,
1378 ),
1379 (
1380 "_rw_offset",
1381 Varchar,
1382 ),
1383 (
1384 "_rw_table_name",
1385 Varchar,
1386 ),
1387 (
1388 "_row_id",
1389 Serial,
1390 ),
1391 ]
1392 "#]]
1393 .assert_debug_eq(&columns);
1394 }
1395
1396 #[tokio::test]
1397 async fn test_source_addition_columns() {
1398 let sql =
1400 "CREATE SOURCE s (v1 int) include key as _rw_kafka_key with (connector = 'kafka') format plain encode json".to_owned();
1401 let frontend = LocalFrontend::new(Default::default()).await;
1402 frontend.run_sql(sql).await.unwrap();
1403 let session = frontend.session_ref();
1404 let catalog_reader = session.env().catalog_reader().read_guard();
1405 let (source, _) = catalog_reader
1406 .get_source_by_name(
1407 DEFAULT_DATABASE_NAME,
1408 SchemaPath::Name(DEFAULT_SCHEMA_NAME),
1409 "s",
1410 )
1411 .unwrap();
1412 assert_eq!(source.name, "s");
1413
1414 let columns = source
1415 .columns
1416 .iter()
1417 .map(|col| (col.name(), col.data_type().clone()))
1418 .collect::<Vec<(&str, DataType)>>();
1419
1420 expect_test::expect![[r#"
1421 [
1422 (
1423 "v1",
1424 Int32,
1425 ),
1426 (
1427 "_rw_kafka_key",
1428 Bytea,
1429 ),
1430 (
1431 "_rw_kafka_timestamp",
1432 Timestamptz,
1433 ),
1434 (
1435 "_rw_kafka_partition",
1436 Varchar,
1437 ),
1438 (
1439 "_rw_kafka_offset",
1440 Varchar,
1441 ),
1442 (
1443 "_row_id",
1444 Serial,
1445 ),
1446 ]
1447 "#]]
1448 .assert_debug_eq(&columns);
1449
1450 let sql =
1451 "CREATE SOURCE s3 (v1 int) include timestamp 'header1' as header_col with (connector = 'kafka') format plain encode json".to_owned();
1452 match frontend.run_sql(sql).await {
1453 Err(e) => {
1454 assert_eq!(
1455 e.to_string(),
1456 "Protocol error: Only header column can have inner field, but got \"timestamp\""
1457 )
1458 }
1459 _ => unreachable!(),
1460 }
1461 }
1462}