risingwave_frontend/handler/
create_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::LazyLock;
18
19use anyhow::{Context, anyhow};
20use either::Either;
21use external_schema::debezium::extract_debezium_avro_table_pk_columns;
22use external_schema::nexmark::check_nexmark_schema;
23use itertools::Itertools;
24use maplit::{convert_args, hashmap, hashset};
25use pgwire::pg_response::{PgResponse, StatementType};
26use rand::Rng;
27use risingwave_common::array::arrow::{IcebergArrowConvert, arrow_schema_iceberg};
28use risingwave_common::bail_not_implemented;
29use risingwave_common::catalog::{
30    ColumnCatalog, ColumnDesc, ColumnId, INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME,
31    ROW_ID_COLUMN_NAME, TableId, debug_assert_column_ids_distinct,
32};
33use risingwave_common::license::Feature;
34use risingwave_common::secret::LocalSecretManager;
35use risingwave_common::system_param::reader::SystemParamsRead;
36use risingwave_common::types::DataType;
37use risingwave_common::util::iter_util::ZipEqFast;
38use risingwave_connector::parser::additional_columns::{
39    build_additional_column_desc, get_supported_additional_columns,
40    source_add_partition_offset_cols,
41};
42use risingwave_connector::parser::{
43    AvroParserConfig, DEBEZIUM_IGNORE_KEY, DebeziumAvroParserConfig, ProtobufParserConfig,
44    SchemaLocation, SpecificParserConfig, TimestamptzHandling,
45    fetch_json_schema_and_map_to_columns,
46};
47use risingwave_connector::schema::AWS_GLUE_SCHEMA_ARN_KEY;
48use risingwave_connector::schema::schema_registry::{
49    SCHEMA_REGISTRY_BACKOFF_DURATION_KEY, SCHEMA_REGISTRY_BACKOFF_FACTOR_KEY,
50    SCHEMA_REGISTRY_MAX_DELAY_KEY, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_RETRIES_MAX_KEY,
51    SCHEMA_REGISTRY_USERNAME, SchemaRegistryConfig, name_strategy_from_str,
52};
53use risingwave_connector::source::cdc::{
54    CDC_MONGODB_STRONG_SCHEMA_KEY, CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL,
55    CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY, CDC_WAIT_FOR_STREAMING_START_TIMEOUT,
56    CITUS_CDC_CONNECTOR, MONGODB_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR,
57    SQL_SERVER_CDC_CONNECTOR,
58};
59use risingwave_connector::source::datagen::DATAGEN_CONNECTOR;
60use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
61use risingwave_connector::source::nexmark::source::{EventType, get_event_data_types_with_names};
62use risingwave_connector::source::test_source::TEST_CONNECTOR;
63use risingwave_connector::source::{
64    AZBLOB_CONNECTOR, ConnectorProperties, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR,
65    KINESIS_CONNECTOR, LEGACY_S3_CONNECTOR, MQTT_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR,
66    OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR,
67};
68pub use risingwave_connector::source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR};
69use risingwave_connector::{AUTO_SCHEMA_CHANGE_KEY, WithPropertiesExt};
70use risingwave_pb::catalog::connection_params::PbConnectionType;
71use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc};
72use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
73use risingwave_pb::plan_common::source_refresh_mode::{RefreshMode, SourceRefreshModeStreaming};
74use risingwave_pb::plan_common::{EncodeType, FormatType, SourceRefreshMode};
75use risingwave_pb::stream_plan::PbStreamFragmentGraph;
76use risingwave_pb::telemetry::TelemetryDatabaseObject;
77use risingwave_sqlparser::ast::{
78    AstString, ColumnDef, ColumnOption, CreateSourceStatement, Encode, Format, FormatEncodeOptions,
79    ObjectName, SourceWatermark, SqlOptionValue, TableConstraint, Value, get_delimiter,
80};
81use risingwave_sqlparser::parser::{IncludeOption, IncludeOptionItem};
82use thiserror_ext::AsReport;
83
84use super::RwPgResponse;
85use crate::binder::Binder;
86use crate::catalog::CatalogError;
87use crate::catalog::source_catalog::SourceCatalog;
88use crate::error::ErrorCode::{self, Deprecated, InvalidInputSyntax, NotSupported, ProtocolError};
89use crate::error::{Result, RwError};
90use crate::expr::Expr;
91use crate::handler::HandlerArgs;
92use crate::handler::create_table::{
93    ColumnIdGenerator, bind_pk_and_row_id_on_relation, bind_sql_column_constraints,
94    bind_sql_columns, bind_sql_pk_names, bind_table_constraints,
95};
96use crate::handler::util::{
97    SourceSchemaCompatExt, check_connector_match_connection_type, ensure_connection_type_allowed,
98};
99use crate::optimizer::plan_node::generic::SourceNodeKind;
100use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext};
101use crate::session::SessionImpl;
102use crate::session::current::notice_to_user;
103use crate::utils::{
104    OverwriteOptions, resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option,
105    resolve_secret_ref_in_with_options, resolve_source_refresh_mode_in_with_option,
106};
107use crate::{OptimizerContext, WithOptions, WithOptionsSecResolved, bind_data_type, build_graph};
108
109mod external_schema;
110pub use external_schema::{
111    bind_columns_from_source, get_schema_location, schema_has_schema_registry,
112};
113mod validate;
114pub use validate::validate_compatibility;
115use validate::{SOURCE_ALLOWED_CONNECTION_CONNECTOR, SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY};
116mod additional_column;
117use additional_column::check_and_add_timestamp_column;
118pub use additional_column::handle_addition_columns;
119use risingwave_common::catalog::ICEBERG_SOURCE_PREFIX;
120use risingwave_common::id::SourceId;
121
122use crate::stream_fragmenter::GraphJobType;
123
124fn non_generated_sql_columns(columns: &[ColumnDef]) -> Vec<ColumnDef> {
125    columns
126        .iter()
127        .filter(|c| !c.is_generated())
128        .cloned()
129        .collect()
130}
131
132fn try_consume_string_from_options(
133    format_encode_options: &mut BTreeMap<String, String>,
134    key: &str,
135) -> Option<AstString> {
136    format_encode_options.remove(key).map(AstString)
137}
138
139fn try_consume_schema_registry_config_from_options(
140    format_encode_options: &mut BTreeMap<String, String>,
141) {
142    [
143        SCHEMA_REGISTRY_USERNAME,
144        SCHEMA_REGISTRY_PASSWORD,
145        SCHEMA_REGISTRY_MAX_DELAY_KEY,
146        SCHEMA_REGISTRY_BACKOFF_DURATION_KEY,
147        SCHEMA_REGISTRY_BACKOFF_FACTOR_KEY,
148        SCHEMA_REGISTRY_RETRIES_MAX_KEY,
149    ]
150    .iter()
151    .for_each(|key| {
152        try_consume_string_from_options(format_encode_options, key);
153    });
154}
155
156fn consume_string_from_options(
157    format_encode_options: &mut BTreeMap<String, String>,
158    key: &str,
159) -> Result<AstString> {
160    try_consume_string_from_options(format_encode_options, key).ok_or(RwError::from(ProtocolError(
161        format!("missing field {} in options", key),
162    )))
163}
164
165fn consume_aws_config_from_options(format_encode_options: &mut BTreeMap<String, String>) {
166    format_encode_options.retain(|key, _| !key.starts_with("aws."))
167}
168
169#[derive(Debug, Clone, Copy, PartialEq, Eq)]
170pub enum CreateSourceType {
171    SharedCdc,
172    /// e.g., shared Kafka source
173    SharedNonCdc,
174    NonShared,
175    /// create table with connector
176    Table,
177}
178
179impl CreateSourceType {
180    /// Note: shouldn't be used for `ALTER SOURCE`, since session variables should not affect existing source. We should respect the original type instead.
181    pub fn for_newly_created(
182        session: &SessionImpl,
183        with_properties: &impl WithPropertiesExt,
184    ) -> Self {
185        if with_properties.is_shareable_cdc_connector() {
186            CreateSourceType::SharedCdc
187        } else if with_properties.is_shareable_non_cdc_connector()
188            && session
189                .env()
190                .streaming_config()
191                .developer
192                .enable_shared_source
193            && session.config().streaming_use_shared_source()
194        {
195            CreateSourceType::SharedNonCdc
196        } else {
197            CreateSourceType::NonShared
198        }
199    }
200
201    pub fn for_replace(catalog: &SourceCatalog) -> Self {
202        if !catalog.info.is_shared() {
203            CreateSourceType::NonShared
204        } else if catalog.with_properties.is_shareable_cdc_connector() {
205            CreateSourceType::SharedCdc
206        } else {
207            CreateSourceType::SharedNonCdc
208        }
209    }
210
211    pub fn is_shared(&self) -> bool {
212        matches!(
213            self,
214            CreateSourceType::SharedCdc | CreateSourceType::SharedNonCdc
215        )
216    }
217}
218
219/// Bind columns from both source and sql defined.
220pub(crate) fn bind_all_columns(
221    format_encode: &FormatEncodeOptions,
222    cols_from_source: Option<Vec<ColumnCatalog>>,
223    cols_from_sql: Vec<ColumnCatalog>,
224    col_defs_from_sql: &[ColumnDef],
225    wildcard_idx: Option<usize>,
226    sql_column_strategy: SqlColumnStrategy,
227) -> Result<Vec<ColumnCatalog>> {
228    if let Some(cols_from_source) = cols_from_source {
229        // Need to check `col_defs` to see if a column is generated, as we haven't bind the
230        // `GeneratedColumnDesc` in `ColumnCatalog` yet.
231        let generated_cols_from_sql = cols_from_sql
232            .iter()
233            .filter(|c| {
234                col_defs_from_sql
235                    .iter()
236                    .find(|d| d.name.real_value() == c.name())
237                    .unwrap()
238                    .is_generated()
239            })
240            .cloned()
241            .collect_vec();
242
243        #[allow(clippy::collapsible_else_if)]
244        match sql_column_strategy {
245            // Ignore `cols_from_source`, follow `cols_from_sql` without checking.
246            SqlColumnStrategy::FollowUnchecked => {
247                assert!(
248                    wildcard_idx.is_none(),
249                    "wildcard still exists while strategy is Follows, not correctly purified?"
250                );
251                return Ok(cols_from_sql);
252            }
253
254            // Will merge `generated_cols_from_sql` into `cols_from_source`.
255            SqlColumnStrategy::Ignore => {}
256
257            SqlColumnStrategy::FollowChecked => {
258                let has_regular_cols_from_sql =
259                    generated_cols_from_sql.len() != cols_from_sql.len();
260
261                if has_regular_cols_from_sql {
262                    if wildcard_idx.is_some() {
263                        // (*, normal_column INT)
264                        return Err(RwError::from(NotSupported(
265                            "When there's a wildcard (\"*\"), \
266                             only generated columns are allowed in user-defined schema from SQL"
267                                .to_owned(),
268                            "Remove the non-generated columns".to_owned(),
269                        )));
270                    } else {
271                        // (normal_column INT)
272                        // Follow `cols_from_sql` with name & type checking.
273                        for col in &cols_from_sql {
274                            if generated_cols_from_sql.contains(col) {
275                                continue;
276                            }
277                            let Some(col_from_source) =
278                                cols_from_source.iter().find(|c| c.name() == col.name())
279                            else {
280                                return Err(RwError::from(ProtocolError(format!(
281                                    "Column \"{}\" is defined in SQL but not found in the source",
282                                    col.name()
283                                ))));
284                            };
285
286                            if col_from_source.data_type() != col.data_type() {
287                                return Err(RwError::from(ProtocolError(format!(
288                                    "Data type mismatch for column \"{}\". \
289                                     Defined in SQL as \"{}\", but found in the source as \"{}\"",
290                                    col.name(),
291                                    col.data_type(),
292                                    col_from_source.data_type()
293                                ))));
294                            }
295                        }
296                        return Ok(cols_from_sql);
297                    }
298                } else {
299                    if wildcard_idx.is_some() {
300                        // (*)
301                        // (*, generated_column INT)
302                        // Good, expand the wildcard later.
303                    } else {
304                        // ()
305                        // (generated_column INT)
306                        // Interpreted as if there's a wildcard for backward compatibility.
307                        // TODO: the behavior is not that consistent, making it impossible to ingest no
308                        //       columns from the source (though not useful in practice). Currently we
309                        //       just notice the user but we may want to interpret it as empty columns
310                        //       in the future.
311                        notice_to_user("\
312                            Neither wildcard (\"*\") nor regular (non-generated) columns appear in the user-defined schema from SQL. \
313                            For backward compatibility, all columns from the source will be included at the beginning. \
314                            For clarity, consider adding a wildcard (\"*\") to indicate where the columns from the source should be included, \
315                            or specifying the columns you want to include from the source.
316                        ");
317                    }
318                }
319            }
320        }
321
322        // In some cases the wildcard may be absent:
323        // - plan based on a purified SQL
324        // - interpret `()` as `(*)` for backward compatibility (see notice above)
325        // Default to 0 to expand the wildcard at the beginning.
326        let wildcard_idx = wildcard_idx.unwrap_or(0).min(generated_cols_from_sql.len());
327
328        // Merge `generated_cols_from_sql` with `cols_from_source`.
329        let mut merged_cols = generated_cols_from_sql;
330        let merged_cols_r = merged_cols.split_off(wildcard_idx);
331        merged_cols.extend(cols_from_source);
332        merged_cols.extend(merged_cols_r);
333
334        Ok(merged_cols)
335    } else {
336        if wildcard_idx.is_some() {
337            return Err(RwError::from(NotSupported(
338                "Wildcard in user-defined schema is only allowed when there exists columns from external schema".to_owned(),
339                "Remove the wildcard or use a source with external schema".to_owned(),
340            )));
341        }
342        let non_generated_sql_defined_columns = non_generated_sql_columns(col_defs_from_sql);
343
344        match (&format_encode.format, &format_encode.row_encode) {
345            (Format::DebeziumMongo, Encode::Json) => {
346                let strong_schema = format_encode
347                    .row_options
348                    .iter()
349                    .find(|k| k.name.real_value().to_lowercase() == CDC_MONGODB_STRONG_SCHEMA_KEY)
350                    .map(|k| matches!(k.value, SqlOptionValue::Value(Value::Boolean(true))))
351                    .unwrap_or(false);
352
353                // strong schema requires a '_id' column at the first position with a specific type
354                if strong_schema {
355                    let (_, id_column) = non_generated_sql_defined_columns
356                        .iter()
357                        .enumerate()
358                        .find(|(idx, col)| *idx == 0 && col.name.real_value() == "_id")
359                        .ok_or_else(|| {
360                            RwError::from(ProtocolError(
361                                "The `_id` column of the source with row format DebeziumMongoJson must be defined as the first column in SQL".to_owned(),
362                            ))
363                        })?;
364
365                    let id_data_type = bind_data_type(id_column.data_type.as_ref().unwrap())?;
366                    if !matches!(
367                        id_data_type,
368                        DataType::Varchar | DataType::Int32 | DataType::Int64 | DataType::Jsonb
369                    ) {
370                        return Err(RwError::from(ProtocolError(
371                            "the `_id` column of the source with row format DebeziumMongoJson must be [Jsonb | Varchar | Int32 | Int64]".to_owned(),
372                        )));
373                    }
374
375                    let mut columns = Vec::with_capacity(non_generated_sql_defined_columns.len());
376                    columns.push(
377                        // id column
378                        ColumnCatalog {
379                            column_desc: ColumnDesc::named("_id", 0.into(), id_data_type),
380                            is_hidden: false,
381                        },
382                    );
383
384                    // bind rest of the columns
385                    for (idx, col) in non_generated_sql_defined_columns
386                        .into_iter()
387                        // skip the first column
388                        .skip(1)
389                        .enumerate()
390                    {
391                        columns.push(ColumnCatalog {
392                            column_desc: ColumnDesc::named(
393                                col.name.real_value(),
394                                (idx as i32).into(),
395                                bind_data_type(col.data_type.as_ref().unwrap())?,
396                            ),
397                            is_hidden: false,
398                        });
399                    }
400
401                    return Ok(columns);
402                }
403
404                let mut columns = vec![
405                    ColumnCatalog {
406                        column_desc: ColumnDesc::named("_id", 0.into(), DataType::Varchar),
407                        is_hidden: false,
408                    },
409                    ColumnCatalog {
410                        column_desc: ColumnDesc::named("payload", 0.into(), DataType::Jsonb),
411                        is_hidden: false,
412                    },
413                ];
414
415                if non_generated_sql_defined_columns.len() != 2
416                    || non_generated_sql_defined_columns[0].name.real_value() != columns[0].name()
417                    || non_generated_sql_defined_columns[1].name.real_value() != columns[1].name()
418                {
419                    return Err(RwError::from(ProtocolError(
420                        "the not generated columns of the source with row format DebeziumMongoJson
421        must be (_id [Jsonb | Varchar | Int32 | Int64], payload jsonb)."
422                            .to_owned(),
423                    )));
424                }
425                // ok to unwrap since it was checked at `bind_sql_columns`
426                let key_data_type = bind_data_type(
427                    non_generated_sql_defined_columns[0]
428                        .data_type
429                        .as_ref()
430                        .unwrap(),
431                )?;
432                match key_data_type {
433                    DataType::Jsonb | DataType::Varchar | DataType::Int32 | DataType::Int64 => {
434                        columns[0].column_desc.data_type = key_data_type;
435                    }
436                    _ => {
437                        return Err(RwError::from(ProtocolError(
438                            "the `_id` column of the source with row format DebeziumMongoJson
439        must be [Jsonb | Varchar | Int32 | Int64]"
440                                .to_owned(),
441                        )));
442                    }
443                }
444
445                // ok to unwrap since it was checked at `bind_sql_columns`
446                let value_data_type = bind_data_type(
447                    non_generated_sql_defined_columns[1]
448                        .data_type
449                        .as_ref()
450                        .unwrap(),
451                )?;
452                if !matches!(value_data_type, DataType::Jsonb) {
453                    return Err(RwError::from(ProtocolError(
454                        "the `payload` column of the source with row format DebeziumMongoJson
455        must be Jsonb datatype"
456                            .to_owned(),
457                    )));
458                }
459                Ok(columns)
460            }
461            (Format::Plain, Encode::Bytes) => {
462                let err = Err(RwError::from(ProtocolError(
463                    "ENCODE BYTES only accepts one BYTEA type column".to_owned(),
464                )));
465                if non_generated_sql_defined_columns.len() == 1 {
466                    // ok to unwrap `data_type`` since it was checked at `bind_sql_columns`
467                    let col_data_type = bind_data_type(
468                        non_generated_sql_defined_columns[0]
469                            .data_type
470                            .as_ref()
471                            .unwrap(),
472                    )?;
473                    if col_data_type == DataType::Bytea {
474                        Ok(cols_from_sql)
475                    } else {
476                        err
477                    }
478                } else {
479                    err
480                }
481            }
482            (_, _) => Ok(cols_from_sql),
483        }
484    }
485}
486
487/// TODO: perhaps put the hint in notice is better. The error message format might be not that reliable.
488fn hint_format_encode(format_encode: &FormatEncodeOptions) -> String {
489    format!(
490        r#"Hint: For FORMAT {0} ENCODE {1}, INCLUDE KEY must be specified and the key column must be used as primary key.
491example:
492    CREATE TABLE <table_name> ( PRIMARY KEY ([rw_key | <key_name>]) )
493    INCLUDE KEY [AS <key_name>]
494    WITH (...)
495    FORMAT {0} ENCODE {1}{2}
496"#,
497        format_encode.format,
498        format_encode.row_encode,
499        if format_encode.row_encode == Encode::Json || format_encode.row_encode == Encode::Bytes {
500            "".to_owned()
501        } else {
502            " (...)".to_owned()
503        }
504    )
505}
506
507/// Bind column from source. Add key column to table columns if necessary.
508/// Return `pk_names`.
509pub(crate) async fn bind_source_pk(
510    format_encode: &FormatEncodeOptions,
511    source_info: &StreamSourceInfo,
512    columns: &mut [ColumnCatalog],
513    sql_defined_pk_names: Vec<String>,
514    with_properties: &WithOptionsSecResolved,
515) -> Result<Vec<String>> {
516    let sql_defined_pk = !sql_defined_pk_names.is_empty();
517    let include_key_column_name: Option<String> = {
518        // iter columns to check if contains additional columns from key part
519        // return the key column names if exists
520        columns.iter().find_map(|catalog| {
521            if matches!(
522                catalog.column_desc.additional_column.column_type,
523                Some(AdditionalColumnType::Key(_))
524            ) {
525                Some(catalog.name().to_owned())
526            } else {
527                None
528            }
529        })
530    };
531    let additional_column_names = columns
532        .iter()
533        .filter_map(|col| {
534            if col.column_desc.additional_column.column_type.is_some() {
535                Some(col.name().to_owned())
536            } else {
537                None
538            }
539        })
540        .collect_vec();
541
542    let res = match (&format_encode.format, &format_encode.row_encode) {
543        (Format::Native, Encode::Native) | (Format::None, Encode::None) | (Format::Plain, _) => {
544            sql_defined_pk_names
545        }
546
547        // For all Upsert formats, we only accept one and only key column as primary key.
548        // Additional KEY columns must be set in this case and must be primary key.
549        (Format::Upsert, Encode::Json | Encode::Avro | Encode::Protobuf) => {
550            if let Some(ref key_column_name) = include_key_column_name
551                && sql_defined_pk
552            {
553                // pk is set. check if it's valid
554
555                // the column name have been converted to real value in `handle_addition_columns`
556                // so we don't ignore ascii case here
557                if sql_defined_pk_names.len() != 1
558                    || !key_column_name.eq(sql_defined_pk_names[0].as_str())
559                {
560                    return Err(RwError::from(ProtocolError(format!(
561                        "Only \"{}\" can be used as primary key\n\n{}",
562                        key_column_name,
563                        hint_format_encode(format_encode)
564                    ))));
565                }
566                sql_defined_pk_names
567            } else {
568                // pk not set, or even key not included
569                return if let Some(include_key_column_name) = include_key_column_name {
570                    Err(RwError::from(ProtocolError(format!(
571                        "Primary key must be specified to {}\n\n{}",
572                        include_key_column_name,
573                        hint_format_encode(format_encode)
574                    ))))
575                } else {
576                    Err(RwError::from(ProtocolError(format!(
577                        "INCLUDE KEY clause not set\n\n{}",
578                        hint_format_encode(format_encode)
579                    ))))
580                };
581            }
582        }
583
584        (Format::Debezium, Encode::Json) => {
585            if !additional_column_names.is_empty() {
586                return Err(RwError::from(ProtocolError(format!(
587                    "FORMAT DEBEZIUM forbids additional columns, but got {:?}",
588                    additional_column_names
589                ))));
590            }
591            if !sql_defined_pk {
592                return Err(RwError::from(ProtocolError(
593                    "Primary key must be specified when creating source with FORMAT DEBEZIUM."
594                        .to_owned(),
595                )));
596            }
597            sql_defined_pk_names
598        }
599        (Format::Debezium, Encode::Avro) => {
600            if !additional_column_names.is_empty() {
601                return Err(RwError::from(ProtocolError(format!(
602                    "FORMAT DEBEZIUM forbids additional columns, but got {:?}",
603                    additional_column_names
604                ))));
605            }
606            if sql_defined_pk {
607                sql_defined_pk_names
608            } else {
609                let pk_names =
610                    extract_debezium_avro_table_pk_columns(source_info, with_properties).await?;
611                // extract pk(s) from schema registry
612                for pk_name in &pk_names {
613                    columns
614                        .iter()
615                        .find(|c: &&ColumnCatalog| c.name().eq(pk_name))
616                        .ok_or_else(|| {
617                            RwError::from(ProtocolError(format!(
618                                "avro's key column {} not exists in avro's row schema",
619                                pk_name
620                            )))
621                        })?;
622                }
623                pk_names
624            }
625        }
626        (Format::DebeziumMongo, Encode::Json) => {
627            if sql_defined_pk {
628                sql_defined_pk_names
629            } else {
630                vec!["_id".to_owned()]
631            }
632        }
633
634        (Format::Maxwell, Encode::Json) => {
635            if !additional_column_names.is_empty() {
636                return Err(RwError::from(ProtocolError(format!(
637                    "FORMAT MAXWELL forbids additional columns, but got {:?}",
638                    additional_column_names
639                ))));
640            }
641            if !sql_defined_pk {
642                return Err(RwError::from(ProtocolError(
643    "Primary key must be specified when creating source with FORMAT MAXWELL ENCODE JSON.".to_owned(),
644    )));
645            }
646            sql_defined_pk_names
647        }
648
649        (Format::Canal, Encode::Json) => {
650            if !additional_column_names.is_empty() {
651                return Err(RwError::from(ProtocolError(format!(
652                    "FORMAT CANAL forbids additional columns, but got {:?}",
653                    additional_column_names
654                ))));
655            }
656            if !sql_defined_pk {
657                return Err(RwError::from(ProtocolError(
658    "Primary key must be specified when creating source with FORMAT CANAL ENCODE JSON.".to_owned(),
659    )));
660            }
661            sql_defined_pk_names
662        }
663        (format, encoding) => {
664            return Err(RwError::from(ProtocolError(format!(
665                "Unknown combination {:?} {:?}",
666                format, encoding
667            ))));
668        }
669    };
670    Ok(res)
671}
672
673pub(super) fn bind_source_watermark(
674    session: &SessionImpl,
675    name: String,
676    source_watermarks: Vec<SourceWatermark>,
677    column_catalogs: &[ColumnCatalog],
678) -> Result<Vec<WatermarkDesc>> {
679    let mut binder = Binder::new_for_ddl(session);
680    binder.bind_columns_to_context(name.clone(), column_catalogs)?;
681
682    let watermark_descs = source_watermarks
683        .into_iter()
684        .map(|source_watermark| {
685            let col_name = source_watermark.column.real_value();
686            let watermark_idx = binder.get_column_binding_index(name.clone(), &col_name)?;
687
688            let expr = binder.bind_expr(&source_watermark.expr)?;
689            let watermark_col_type = column_catalogs[watermark_idx].data_type();
690            let watermark_expr_type = &expr.return_type();
691            if watermark_col_type != watermark_expr_type {
692                Err(RwError::from(ErrorCode::BindError(
693                    format!("The return value type of the watermark expression must be identical to the watermark column data type. Current data type of watermark return value: `{}`, column `{}`",watermark_expr_type, watermark_col_type),
694                )))
695            } else {
696                let expr_proto = expr.to_expr_proto();
697                Ok::<_, RwError>(WatermarkDesc {
698                    watermark_idx: watermark_idx as u32,
699                    expr: Some(expr_proto),
700                })
701            }
702        })
703        .try_collect()?;
704    Ok(watermark_descs)
705}
706
707/// Performs early stage checking in frontend to see if the schema of the given `columns` is
708/// compatible with the connector extracted from the properties.
709///
710/// One should only call this function after all properties of all columns are resolved, like
711/// generated column descriptors.
712pub(super) fn check_format_encode(
713    props: &WithOptionsSecResolved,
714    row_id_index: Option<usize>,
715    columns: &[ColumnCatalog],
716) -> Result<()> {
717    let Some(connector) = props.get_connector() else {
718        return Ok(());
719    };
720
721    if connector == NEXMARK_CONNECTOR {
722        check_nexmark_schema(props, row_id_index, columns)
723    } else {
724        Ok(())
725    }
726}
727
728pub fn bind_connector_props(
729    handler_args: &HandlerArgs,
730    format_encode: &FormatEncodeOptions,
731    is_create_source: bool,
732) -> Result<(WithOptions, SourceRefreshMode)> {
733    let mut with_properties = handler_args.with_options.clone().into_connector_props();
734    validate_compatibility(format_encode, &mut with_properties)?;
735    let refresh_mode = {
736        let refresh_mode = resolve_source_refresh_mode_in_with_option(&mut with_properties)?;
737        if is_create_source && refresh_mode.is_some() {
738            return Err(RwError::from(ProtocolError(
739                "`refresh_mode` only supported for CREATE TABLE".to_owned(),
740            )));
741        }
742
743        refresh_mode.unwrap_or(SourceRefreshMode {
744            refresh_mode: Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})),
745        })
746    };
747
748    let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
749
750    if !is_create_source && with_properties.is_shareable_only_cdc_connector() {
751        return Err(RwError::from(ProtocolError(format!(
752            "connector {} does not support `CREATE TABLE`, please use `CREATE SOURCE` instead",
753            with_properties.get_connector().unwrap(),
754        ))));
755    }
756    if is_create_source && create_cdc_source_job {
757        if let Some(value) = with_properties.get(AUTO_SCHEMA_CHANGE_KEY)
758            && value.parse::<bool>().map_err(|_| {
759                ErrorCode::InvalidInputSyntax(format!(
760                    "invalid value of '{}' option",
761                    AUTO_SCHEMA_CHANGE_KEY
762                ))
763            })?
764        {
765            Feature::CdcAutoSchemaChange.check_available()?;
766        }
767
768        // set connector to backfill mode
769        with_properties.insert(CDC_SNAPSHOT_MODE_KEY.into(), CDC_SNAPSHOT_BACKFILL.into());
770        // enable cdc sharing mode, which will capture all tables in the given `database.name`
771        with_properties.insert(CDC_SHARING_MODE_KEY.into(), "true".into());
772        // enable transactional cdc
773        if with_properties.enable_transaction_metadata() {
774            with_properties.insert(CDC_TRANSACTIONAL_KEY.into(), "true".into());
775        }
776        // Only set CDC_WAIT_FOR_STREAMING_START_TIMEOUT if not already specified by user.
777        if !with_properties.contains_key(CDC_WAIT_FOR_STREAMING_START_TIMEOUT) {
778            with_properties.insert(
779                CDC_WAIT_FOR_STREAMING_START_TIMEOUT.into(),
780                handler_args
781                    .session
782                    .config()
783                    .cdc_source_wait_streaming_start_timeout()
784                    .to_string(),
785            );
786        }
787    }
788    if with_properties.is_mysql_cdc_connector() {
789        // Generate a random server id for mysql cdc source if needed
790        // `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication
791        // group (that is, different from any other server id being used by any master or slave)
792        with_properties
793            .entry("server.id".to_owned())
794            .or_insert(rand::rng().random_range(1..u32::MAX).to_string());
795    }
796    Ok((with_properties, refresh_mode))
797}
798
799/// When the schema can be inferred from external system (like schema registry),
800/// how to handle the regular columns (i.e., non-generated) defined in SQL?
801pub enum SqlColumnStrategy {
802    /// Follow all columns defined in SQL, ignore the columns from external system.
803    /// This ensures that no accidental side effect will change the schema.
804    ///
805    /// This is the behavior when re-planning the target table of `SINK INTO`.
806    FollowUnchecked,
807
808    /// Follow all column defined in SQL, check the columns from external system with name & type.
809    /// This ensures that no accidental side effect will change the schema.
810    ///
811    /// This is the behavior when creating a new table or source with a resolvable schema;
812    /// adding, or dropping columns on that table.
813    // TODO(purify): `ALTER SOURCE` currently has its own code path and does not check.
814    FollowChecked,
815
816    /// Merge the generated columns defined in SQL and columns from external system. If there
817    /// are also regular columns defined in SQL, ignore silently.
818    ///
819    /// This is the behavior when `REFRESH SCHEMA` atop the purified SQL.
820    Ignore,
821}
822
823/// Entrypoint for binding source connector.
824/// Common logic shared by `CREATE SOURCE` and `CREATE TABLE`.
825#[allow(clippy::too_many_arguments)]
826pub async fn bind_create_source_or_table_with_connector(
827    handler_args: HandlerArgs,
828    full_name: ObjectName,
829    format_encode: FormatEncodeOptions,
830    with_properties: WithOptions,
831    sql_columns_defs: &[ColumnDef],
832    constraints: Vec<TableConstraint>,
833    wildcard_idx: Option<usize>,
834    source_watermarks: Vec<SourceWatermark>,
835    columns_from_resolve_source: Option<Vec<ColumnCatalog>>,
836    source_info: StreamSourceInfo,
837    include_column_options: IncludeOption,
838    col_id_gen: &mut ColumnIdGenerator,
839    create_source_type: CreateSourceType,
840    source_rate_limit: Option<u32>,
841    sql_column_strategy: SqlColumnStrategy,
842    refresh_mode: SourceRefreshMode,
843) -> Result<SourceCatalog> {
844    let session = &handler_args.session;
845    let db_name: &str = &session.database();
846    let (schema_name, source_name) = Binder::resolve_schema_qualified_name(db_name, &full_name)?;
847    let (database_id, schema_id) =
848        session.get_database_and_schema_id_for_create(schema_name.clone())?;
849
850    let is_create_source = create_source_type != CreateSourceType::Table;
851
852    if is_create_source {
853        // reject refreshable batch source
854        if with_properties.is_batch_connector() {
855            return Err(ErrorCode::BindError(
856            "can't CREATE SOURCE with refreshable batch connector\n\nHint: use CREATE TABLE instead"
857                .to_owned(),
858        )
859        .into());
860        }
861
862        match format_encode.format {
863            // reject unsupported formats for CREATE SOURCE
864            Format::Debezium | Format::DebeziumMongo | Format::Maxwell | Format::Canal => {
865                return Err(ErrorCode::BindError(format!(
866                    "can't CREATE SOURCE with FORMAT {}.\n\nHint: use CREATE TABLE instead\n\n{}",
867                    format_encode.format,
868                    hint_format_encode(&format_encode)
869                ))
870                .into());
871            }
872            // hint limitations for some other formats
873            Format::Upsert => {
874                notice_to_user(format!(
875                    "Streaming queries on sources with `FORMAT {}` may have limitations. If your query isn't supported, consider using `CREATE TABLE` instead.",
876                    format_encode.format
877                ));
878            }
879            _ => {}
880        }
881    }
882
883    let sql_pk_names = bind_sql_pk_names(sql_columns_defs, bind_table_constraints(&constraints)?)?;
884
885    // FIXME: ideally we can support it, but current way of handling iceberg additional columns are problematic.
886    // They are treated as normal user columns, so they will be lost if we allow user to specify columns.
887    // See `extract_iceberg_columns`
888    if with_properties.is_iceberg_connector() && !sql_columns_defs.is_empty() {
889        return Err(RwError::from(InvalidInputSyntax(
890            r#"Schema is automatically inferred for iceberg source and should not be specified
891
892HINT: use `CREATE SOURCE <name> WITH (...)` instead of `CREATE SOURCE <name> (<columns>) WITH (...)`."#.to_owned(),
893        )));
894    }
895    let columns_from_sql = bind_sql_columns(sql_columns_defs, false)?;
896
897    let mut columns = bind_all_columns(
898        &format_encode,
899        columns_from_resolve_source,
900        columns_from_sql,
901        sql_columns_defs,
902        wildcard_idx,
903        sql_column_strategy,
904    )?;
905
906    // add additional columns before bind pk, because `format upsert` requires the key column
907    handle_addition_columns(
908        Some(&format_encode),
909        &with_properties,
910        include_column_options,
911        &mut columns,
912        false,
913    )?;
914
915    if columns.is_empty() {
916        return Err(RwError::from(ProtocolError(
917            "Schema definition is required, either from SQL or schema registry.".to_owned(),
918        )));
919    }
920
921    // compatible with the behavior that add a hidden column `_rw_kafka_timestamp` to each message from Kafka source
922    if is_create_source {
923        // must behind `handle_addition_columns`
924        check_and_add_timestamp_column(&with_properties, &mut columns);
925
926        // For shared sources, we will include partition and offset cols in the SourceExecutor's *output*, to be used by the SourceBackfillExecutor.
927        // For shared CDC source, the schema is different. See ColumnCatalog::debezium_cdc_source_cols(), CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS
928        if create_source_type == CreateSourceType::SharedNonCdc {
929            let (columns_exist, additional_columns) = source_add_partition_offset_cols(
930                &columns,
931                &with_properties.get_connector().unwrap(),
932                true, // col_id filled below at col_id_gen.generate
933            );
934            for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) {
935                if !existed {
936                    columns.push(ColumnCatalog::hidden(c));
937                }
938            }
939        }
940    }
941
942    // resolve privatelink connection for Kafka
943    let mut with_properties = with_properties;
944    resolve_privatelink_in_with_option(&mut with_properties)?;
945
946    // check the system parameter `enforce_secret`
947    if session
948        .env()
949        .system_params_manager()
950        .get_params()
951        .load()
952        .enforce_secret()
953        && Feature::SecretManagement.check_available().is_ok()
954    {
955        // check enforce using secret for some props on cloud
956        ConnectorProperties::enforce_secret_source(&with_properties)?;
957    }
958
959    let (with_properties, connection_type, connector_conn_ref) =
960        resolve_connection_ref_and_secret_ref(
961            with_properties,
962            session,
963            Some(TelemetryDatabaseObject::Source),
964        )?;
965    ensure_connection_type_allowed(connection_type, &SOURCE_ALLOWED_CONNECTION_CONNECTOR)?;
966
967    // if not using connection, we don't need to check connector match connection type
968    if !matches!(connection_type, PbConnectionType::Unspecified) {
969        let Some(connector) = with_properties.get_connector() else {
970            return Err(RwError::from(ProtocolError(format!(
971                "missing field '{}' in WITH clause",
972                UPSTREAM_SOURCE_KEY
973            ))));
974        };
975        check_connector_match_connection_type(connector.as_str(), &connection_type)?;
976    }
977
978    let pk_names = bind_source_pk(
979        &format_encode,
980        &source_info,
981        &mut columns,
982        sql_pk_names,
983        &with_properties,
984    )
985    .await?;
986
987    // User may specify a generated or additional column with the same name as one from the external schema.
988    // Ensure duplicated column names are handled here.
989    if let Some(duplicated_name) = columns.iter().map(|c| c.name()).duplicates().next() {
990        return Err(ErrorCode::InvalidInputSyntax(format!(
991            "column \"{}\" specified more than once",
992            duplicated_name
993        ))
994        .into());
995    }
996
997    // XXX: why do we use col_id_gen here? It doesn't seem to be very necessary.
998    for c in &mut columns {
999        let original_data_type = c.data_type().clone();
1000        col_id_gen.generate(c)?;
1001        // TODO: Now we restore the data type for `CREATE SOURCE`, so that keep the nested field id unset.
1002        //       This behavior is inconsistent with `CREATE TABLE`, and should be fixed once we refactor
1003        //       `ALTER SOURCE` to also use `ColumnIdGenerator` in the future.
1004        if is_create_source {
1005            c.column_desc.data_type = original_data_type;
1006        }
1007    }
1008    debug_assert_column_ids_distinct(&columns);
1009
1010    let (mut columns, pk_col_ids, row_id_index) =
1011        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
1012
1013    let watermark_descs =
1014        bind_source_watermark(session, source_name.clone(), source_watermarks, &columns)?;
1015    // TODO(yuhao): allow multiple watermark on source.
1016    assert!(watermark_descs.len() <= 1);
1017
1018    bind_sql_column_constraints(
1019        session,
1020        source_name.clone(),
1021        &mut columns,
1022        // TODO(st1page): pass the ref
1023        sql_columns_defs,
1024        &pk_col_ids,
1025    )?;
1026    check_format_encode(&with_properties, row_id_index, &columns)?;
1027
1028    let definition = handler_args.normalized_sql.clone();
1029
1030    let associated_table_id = if is_create_source {
1031        None
1032    } else {
1033        Some(TableId::placeholder())
1034    };
1035    let source = SourceCatalog {
1036        id: SourceId::placeholder(),
1037        name: source_name,
1038        schema_id,
1039        database_id,
1040        columns,
1041        pk_col_ids,
1042        append_only: row_id_index.is_some(),
1043        owner: session.user_id(),
1044        info: source_info,
1045        row_id_index,
1046        with_properties,
1047        watermark_descs,
1048        associated_table_id,
1049        definition,
1050        connection_id: connector_conn_ref,
1051        created_at_epoch: None,
1052        initialized_at_epoch: None,
1053        version: INITIAL_SOURCE_VERSION_ID,
1054        created_at_cluster_version: None,
1055        initialized_at_cluster_version: None,
1056        rate_limit: source_rate_limit,
1057        refresh_mode: Some(refresh_mode),
1058    };
1059    Ok(source)
1060}
1061
1062pub async fn handle_create_source(
1063    mut handler_args: HandlerArgs,
1064    stmt: CreateSourceStatement,
1065) -> Result<RwPgResponse> {
1066    let session = handler_args.session.clone();
1067    let overwrite_options = OverwriteOptions::new(&mut handler_args);
1068
1069    if let Either::Right(resp) = session.check_relation_name_duplicated(
1070        stmt.source_name.clone(),
1071        StatementType::CREATE_SOURCE,
1072        stmt.if_not_exists,
1073    )? {
1074        return Ok(resp);
1075    }
1076
1077    if stmt
1078        .source_name
1079        .base_name()
1080        .starts_with(ICEBERG_SOURCE_PREFIX)
1081    {
1082        return Err(RwError::from(InvalidInputSyntax(format!(
1083            "Source name cannot start with reserved prefix '{}'",
1084            ICEBERG_SOURCE_PREFIX
1085        ))));
1086    }
1087
1088    if handler_args.with_options.is_empty() {
1089        return Err(RwError::from(InvalidInputSyntax(
1090            "missing WITH clause".to_owned(),
1091        )));
1092    }
1093
1094    let format_encode = stmt.format_encode.into_v2_with_warning();
1095    let (with_properties, refresh_mode) =
1096        bind_connector_props(&handler_args, &format_encode, true)?;
1097
1098    let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
1099    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
1100        &session,
1101        &format_encode,
1102        Either::Left(&with_properties),
1103        create_source_type,
1104    )
1105    .await?;
1106    let mut col_id_gen = ColumnIdGenerator::new_initial();
1107
1108    if stmt.columns.iter().any(|col| {
1109        col.options
1110            .iter()
1111            .any(|def| matches!(def.option, ColumnOption::NotNull))
1112    }) {
1113        return Err(RwError::from(InvalidInputSyntax(
1114            "NOT NULL constraint is not supported in source schema".to_owned(),
1115        )));
1116    }
1117
1118    let source_catalog = bind_create_source_or_table_with_connector(
1119        handler_args.clone(),
1120        stmt.source_name,
1121        format_encode,
1122        with_properties,
1123        &stmt.columns,
1124        stmt.constraints,
1125        stmt.wildcard_idx,
1126        stmt.source_watermarks,
1127        columns_from_resolve_source,
1128        source_info,
1129        stmt.include_column_options,
1130        &mut col_id_gen,
1131        create_source_type,
1132        overwrite_options.source_rate_limit,
1133        SqlColumnStrategy::FollowChecked,
1134        refresh_mode,
1135    )
1136    .await?;
1137
1138    // If it is a temporary source, put it into SessionImpl.
1139    if stmt.temporary {
1140        if session.get_temporary_source(&source_catalog.name).is_some() {
1141            return Err(CatalogError::duplicated("source", source_catalog.name.clone()).into());
1142        }
1143        session.create_temporary_source(source_catalog);
1144        return Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE));
1145    }
1146
1147    let source = source_catalog.to_prost();
1148
1149    let catalog_writer = session.catalog_writer()?;
1150
1151    if create_source_type.is_shared() {
1152        let graph = generate_stream_graph_for_source(handler_args, source_catalog)?;
1153        catalog_writer
1154            .create_source(source, Some(graph), stmt.if_not_exists)
1155            .await?;
1156    } else {
1157        // For other sources we don't create a streaming job
1158        catalog_writer
1159            .create_source(source, None, stmt.if_not_exists)
1160            .await?;
1161    }
1162
1163    Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE))
1164}
1165
1166pub(super) fn generate_stream_graph_for_source(
1167    handler_args: HandlerArgs,
1168    source_catalog: SourceCatalog,
1169) -> Result<PbStreamFragmentGraph> {
1170    let context = OptimizerContext::from_handler_args(handler_args);
1171    let source_node = LogicalSource::with_catalog(
1172        Rc::new(source_catalog),
1173        SourceNodeKind::CreateSharedSource,
1174        context.into(),
1175        None,
1176    )?;
1177
1178    let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?;
1179    let graph = build_graph(stream_plan, Some(GraphJobType::Source))?;
1180    Ok(graph)
1181}
1182
1183#[cfg(test)]
1184pub mod tests {
1185    use std::collections::HashMap;
1186    use std::sync::Arc;
1187
1188    use risingwave_common::catalog::{
1189        DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME,
1190    };
1191    use risingwave_common::types::{DataType, StructType};
1192
1193    use crate::catalog::root_catalog::SchemaPath;
1194    use crate::catalog::source_catalog::SourceCatalog;
1195    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
1196
1197    const GET_COLUMN_FROM_CATALOG: fn(&Arc<SourceCatalog>) -> HashMap<&str, DataType> =
1198        |catalog: &Arc<SourceCatalog>| -> HashMap<&str, DataType> {
1199            catalog
1200                .columns
1201                .iter()
1202                .map(|col| (col.name(), col.data_type().clone()))
1203                .collect::<HashMap<&str, DataType>>()
1204        };
1205
1206    #[tokio::test]
1207    async fn test_create_source_handler() {
1208        let proto_file = create_proto_file(PROTO_FILE_DATA);
1209        let sql = format!(
1210            r#"CREATE SOURCE t
1211    WITH (connector = 'kinesis')
1212    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
1213            proto_file.path().to_str().unwrap()
1214        );
1215        let frontend = LocalFrontend::new(Default::default()).await;
1216        frontend.run_sql(sql).await.unwrap();
1217
1218        let session = frontend.session_ref();
1219        let catalog_reader = session.env().catalog_reader().read_guard();
1220        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1221
1222        // Check source exists.
1223        let (source, _) = catalog_reader
1224            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
1225            .unwrap();
1226        assert_eq!(source.name, "t");
1227
1228        let columns = GET_COLUMN_FROM_CATALOG(source);
1229
1230        let city_type = StructType::new(vec![
1231            ("address", DataType::Varchar),
1232            ("zipcode", DataType::Varchar),
1233        ])
1234        // .with_ids([5, 6].map(ColumnId::new))
1235        .into();
1236        let expected_columns = maplit::hashmap! {
1237            ROW_ID_COLUMN_NAME => DataType::Serial,
1238            "id" => DataType::Int32,
1239            "zipcode" => DataType::Int64,
1240            "rate" => DataType::Float32,
1241            "country" => StructType::new(
1242                vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
1243            )
1244            // .with_ids([3, 4, 7].map(ColumnId::new))
1245            .into(),
1246        };
1247        assert_eq!(columns, expected_columns, "{columns:#?}");
1248    }
1249
1250    #[tokio::test]
1251    async fn test_duplicate_props_options() {
1252        let proto_file = create_proto_file(PROTO_FILE_DATA);
1253        let sql = format!(
1254            r#"CREATE SOURCE t
1255    WITH (
1256        connector = 'kinesis',
1257        aws.region='user_test_topic',
1258        endpoint='172.10.1.1:9090,172.10.1.2:9090',
1259        aws.credentials.access_key_id = 'your_access_key_1',
1260        aws.credentials.secret_access_key = 'your_secret_key_1'
1261    )
1262    FORMAT PLAIN ENCODE PROTOBUF (
1263        message = '.test.TestRecord',
1264        aws.credentials.access_key_id = 'your_access_key_2',
1265        aws.credentials.secret_access_key = 'your_secret_key_2',
1266        schema.location = 'file://{}',
1267    )"#,
1268            proto_file.path().to_str().unwrap()
1269        );
1270        let frontend = LocalFrontend::new(Default::default()).await;
1271        frontend.run_sql(sql).await.unwrap();
1272
1273        let session = frontend.session_ref();
1274        let catalog_reader = session.env().catalog_reader().read_guard();
1275        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1276
1277        // Check source exists.
1278        let (source, _) = catalog_reader
1279            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
1280            .unwrap();
1281        assert_eq!(source.name, "t");
1282
1283        // AwsAuth params exist in options.
1284        assert_eq!(
1285            source
1286                .info
1287                .format_encode_options
1288                .get("aws.credentials.access_key_id")
1289                .unwrap(),
1290            "your_access_key_2"
1291        );
1292        assert_eq!(
1293            source
1294                .info
1295                .format_encode_options
1296                .get("aws.credentials.secret_access_key")
1297                .unwrap(),
1298            "your_secret_key_2"
1299        );
1300
1301        // AwsAuth params exist in props.
1302        assert_eq!(
1303            source
1304                .with_properties
1305                .get("aws.credentials.access_key_id")
1306                .unwrap(),
1307            "your_access_key_1"
1308        );
1309        assert_eq!(
1310            source
1311                .with_properties
1312                .get("aws.credentials.secret_access_key")
1313                .unwrap(),
1314            "your_secret_key_1"
1315        );
1316
1317        // Options are not merged into props.
1318        assert!(!source.with_properties.contains_key("schema.location"));
1319    }
1320
1321    #[tokio::test]
1322    async fn test_multi_table_cdc_create_source_handler() {
1323        let sql =
1324            "CREATE SOURCE t2 WITH (connector = 'mysql-cdc') FORMAT PLAIN ENCODE JSON".to_owned();
1325        let frontend = LocalFrontend::new(Default::default()).await;
1326        let session = frontend.session_ref();
1327
1328        frontend
1329            .run_sql_with_session(session.clone(), sql)
1330            .await
1331            .unwrap();
1332        let catalog_reader = session.env().catalog_reader().read_guard();
1333        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1334
1335        // Check source exists.
1336        let (source, _) = catalog_reader
1337            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t2")
1338            .unwrap();
1339        assert_eq!(source.name, "t2");
1340
1341        let columns = source
1342            .columns
1343            .iter()
1344            .map(|col| (col.name(), col.data_type().clone()))
1345            .collect::<Vec<(&str, DataType)>>();
1346
1347        expect_test::expect![[r#"
1348            [
1349                (
1350                    "payload",
1351                    Jsonb,
1352                ),
1353                (
1354                    "_rw_offset",
1355                    Varchar,
1356                ),
1357                (
1358                    "_rw_table_name",
1359                    Varchar,
1360                ),
1361                (
1362                    "_row_id",
1363                    Serial,
1364                ),
1365            ]
1366        "#]]
1367        .assert_debug_eq(&columns);
1368    }
1369
1370    #[tokio::test]
1371    async fn test_source_addition_columns() {
1372        // test derive include column for format plain
1373        let sql =
1374            "CREATE SOURCE s (v1 int) include key as _rw_kafka_key with (connector = 'kafka') format plain encode json".to_owned();
1375        let frontend = LocalFrontend::new(Default::default()).await;
1376        frontend.run_sql(sql).await.unwrap();
1377        let session = frontend.session_ref();
1378        let catalog_reader = session.env().catalog_reader().read_guard();
1379        let (source, _) = catalog_reader
1380            .get_source_by_name(
1381                DEFAULT_DATABASE_NAME,
1382                SchemaPath::Name(DEFAULT_SCHEMA_NAME),
1383                "s",
1384            )
1385            .unwrap();
1386        assert_eq!(source.name, "s");
1387
1388        let columns = source
1389            .columns
1390            .iter()
1391            .map(|col| (col.name(), col.data_type().clone()))
1392            .collect::<Vec<(&str, DataType)>>();
1393
1394        expect_test::expect![[r#"
1395            [
1396                (
1397                    "v1",
1398                    Int32,
1399                ),
1400                (
1401                    "_rw_kafka_key",
1402                    Bytea,
1403                ),
1404                (
1405                    "_rw_kafka_timestamp",
1406                    Timestamptz,
1407                ),
1408                (
1409                    "_rw_kafka_partition",
1410                    Varchar,
1411                ),
1412                (
1413                    "_rw_kafka_offset",
1414                    Varchar,
1415                ),
1416                (
1417                    "_row_id",
1418                    Serial,
1419                ),
1420            ]
1421        "#]]
1422        .assert_debug_eq(&columns);
1423
1424        let sql =
1425            "CREATE SOURCE s3 (v1 int) include timestamp 'header1' as header_col with (connector = 'kafka') format plain encode json".to_owned();
1426        match frontend.run_sql(sql).await {
1427            Err(e) => {
1428                assert_eq!(
1429                    e.to_string(),
1430                    "Protocol error: Only header column can have inner field, but got \"timestamp\""
1431                )
1432            }
1433            _ => unreachable!(),
1434        }
1435    }
1436}