risingwave_frontend/handler/
create_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::LazyLock;
18
19use anyhow::{Context, anyhow};
20use either::Either;
21use external_schema::debezium::extract_debezium_avro_table_pk_columns;
22use external_schema::nexmark::check_nexmark_schema;
23use itertools::Itertools;
24use maplit::{convert_args, hashmap, hashset};
25use pgwire::pg_response::{PgResponse, StatementType};
26use rand::Rng;
27use risingwave_common::array::arrow::{IcebergArrowConvert, arrow_schema_iceberg};
28use risingwave_common::bail_not_implemented;
29use risingwave_common::catalog::{
30    ColumnCatalog, ColumnDesc, ColumnId, INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME,
31    ROW_ID_COLUMN_NAME, TableId, debug_assert_column_ids_distinct,
32};
33use risingwave_common::license::Feature;
34use risingwave_common::secret::LocalSecretManager;
35use risingwave_common::system_param::reader::SystemParamsRead;
36use risingwave_common::types::DataType;
37use risingwave_common::util::iter_util::ZipEqFast;
38use risingwave_connector::parser::additional_columns::{
39    build_additional_column_desc, get_supported_additional_columns,
40    source_add_partition_offset_cols,
41};
42use risingwave_connector::parser::{
43    AvroParserConfig, DEBEZIUM_IGNORE_KEY, DebeziumAvroParserConfig, ProtobufParserConfig,
44    SchemaLocation, SpecificParserConfig, TimestamptzHandling,
45    fetch_json_schema_and_map_to_columns,
46};
47use risingwave_connector::schema::AWS_GLUE_SCHEMA_ARN_KEY;
48use risingwave_connector::schema::schema_registry::{
49    SCHEMA_REGISTRY_BACKOFF_DURATION_KEY, SCHEMA_REGISTRY_BACKOFF_FACTOR_KEY,
50    SCHEMA_REGISTRY_MAX_DELAY_KEY, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_RETRIES_MAX_KEY,
51    SCHEMA_REGISTRY_USERNAME, SchemaRegistryConfig, name_strategy_from_str,
52};
53use risingwave_connector::source::cdc::{
54    CDC_MONGODB_STRONG_SCHEMA_KEY, CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL,
55    CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY, CDC_WAIT_FOR_STREAMING_START_TIMEOUT,
56    CITUS_CDC_CONNECTOR, MONGODB_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR,
57    SQL_SERVER_CDC_CONNECTOR,
58};
59use risingwave_connector::source::datagen::DATAGEN_CONNECTOR;
60use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
61use risingwave_connector::source::nexmark::source::{EventType, get_event_data_types_with_names};
62use risingwave_connector::source::test_source::TEST_CONNECTOR;
63use risingwave_connector::source::{
64    AZBLOB_CONNECTOR, ConnectorProperties, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR,
65    KINESIS_CONNECTOR, LEGACY_S3_CONNECTOR, MQTT_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR,
66    OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR,
67};
68pub use risingwave_connector::source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR};
69use risingwave_connector::{AUTO_SCHEMA_CHANGE_KEY, WithPropertiesExt};
70use risingwave_pb::catalog::connection_params::PbConnectionType;
71use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc};
72use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
73use risingwave_pb::plan_common::{EncodeType, FormatType};
74use risingwave_pb::stream_plan::PbStreamFragmentGraph;
75use risingwave_pb::telemetry::TelemetryDatabaseObject;
76use risingwave_sqlparser::ast::{
77    AstString, ColumnDef, ColumnOption, CreateSourceStatement, Encode, Format, FormatEncodeOptions,
78    ObjectName, SourceWatermark, SqlOptionValue, TableConstraint, Value, get_delimiter,
79};
80use risingwave_sqlparser::parser::{IncludeOption, IncludeOptionItem};
81use thiserror_ext::AsReport;
82
83use super::RwPgResponse;
84use crate::binder::Binder;
85use crate::catalog::CatalogError;
86use crate::catalog::source_catalog::SourceCatalog;
87use crate::error::ErrorCode::{self, Deprecated, InvalidInputSyntax, NotSupported, ProtocolError};
88use crate::error::{Result, RwError};
89use crate::expr::Expr;
90use crate::handler::HandlerArgs;
91use crate::handler::create_table::{
92    ColumnIdGenerator, bind_pk_and_row_id_on_relation, bind_sql_column_constraints,
93    bind_sql_columns, bind_sql_pk_names, bind_table_constraints,
94};
95use crate::handler::util::{
96    SourceSchemaCompatExt, check_connector_match_connection_type, ensure_connection_type_allowed,
97};
98use crate::optimizer::plan_node::generic::SourceNodeKind;
99use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext};
100use crate::session::SessionImpl;
101use crate::session::current::notice_to_user;
102use crate::utils::{
103    OverwriteOptions, resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option,
104    resolve_secret_ref_in_with_options,
105};
106use crate::{OptimizerContext, WithOptions, WithOptionsSecResolved, bind_data_type, build_graph};
107
108mod external_schema;
109pub use external_schema::{
110    bind_columns_from_source, get_schema_location, schema_has_schema_registry,
111};
112mod validate;
113pub use validate::validate_compatibility;
114use validate::{SOURCE_ALLOWED_CONNECTION_CONNECTOR, SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY};
115mod additional_column;
116use additional_column::check_and_add_timestamp_column;
117pub use additional_column::handle_addition_columns;
118
119use crate::stream_fragmenter::GraphJobType;
120
121fn non_generated_sql_columns(columns: &[ColumnDef]) -> Vec<ColumnDef> {
122    columns
123        .iter()
124        .filter(|c| !c.is_generated())
125        .cloned()
126        .collect()
127}
128
129fn try_consume_string_from_options(
130    format_encode_options: &mut BTreeMap<String, String>,
131    key: &str,
132) -> Option<AstString> {
133    format_encode_options.remove(key).map(AstString)
134}
135
136fn try_consume_schema_registry_config_from_options(
137    format_encode_options: &mut BTreeMap<String, String>,
138) {
139    [
140        SCHEMA_REGISTRY_USERNAME,
141        SCHEMA_REGISTRY_PASSWORD,
142        SCHEMA_REGISTRY_MAX_DELAY_KEY,
143        SCHEMA_REGISTRY_BACKOFF_DURATION_KEY,
144        SCHEMA_REGISTRY_BACKOFF_FACTOR_KEY,
145        SCHEMA_REGISTRY_RETRIES_MAX_KEY,
146    ]
147    .iter()
148    .for_each(|key| {
149        try_consume_string_from_options(format_encode_options, key);
150    });
151}
152
153fn consume_string_from_options(
154    format_encode_options: &mut BTreeMap<String, String>,
155    key: &str,
156) -> Result<AstString> {
157    try_consume_string_from_options(format_encode_options, key).ok_or(RwError::from(ProtocolError(
158        format!("missing field {} in options", key),
159    )))
160}
161
162fn consume_aws_config_from_options(format_encode_options: &mut BTreeMap<String, String>) {
163    format_encode_options.retain(|key, _| !key.starts_with("aws."))
164}
165
166#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub enum CreateSourceType {
168    SharedCdc,
169    /// e.g., shared Kafka source
170    SharedNonCdc,
171    NonShared,
172    /// create table with connector
173    Table,
174}
175
176impl CreateSourceType {
177    /// Note: shouldn't be used for `ALTER SOURCE`, since session variables should not affect existing source. We should respect the original type instead.
178    pub fn for_newly_created(
179        session: &SessionImpl,
180        with_properties: &impl WithPropertiesExt,
181    ) -> Self {
182        if with_properties.is_shareable_cdc_connector() {
183            CreateSourceType::SharedCdc
184        } else if with_properties.is_shareable_non_cdc_connector()
185            && session
186                .env()
187                .streaming_config()
188                .developer
189                .enable_shared_source
190            && session.config().streaming_use_shared_source()
191        {
192            CreateSourceType::SharedNonCdc
193        } else {
194            CreateSourceType::NonShared
195        }
196    }
197
198    pub fn for_replace(catalog: &SourceCatalog) -> Self {
199        if !catalog.info.is_shared() {
200            CreateSourceType::NonShared
201        } else if catalog.with_properties.is_shareable_cdc_connector() {
202            CreateSourceType::SharedCdc
203        } else {
204            CreateSourceType::SharedNonCdc
205        }
206    }
207
208    pub fn is_shared(&self) -> bool {
209        matches!(
210            self,
211            CreateSourceType::SharedCdc | CreateSourceType::SharedNonCdc
212        )
213    }
214}
215
216/// Bind columns from both source and sql defined.
217pub(crate) fn bind_all_columns(
218    format_encode: &FormatEncodeOptions,
219    cols_from_source: Option<Vec<ColumnCatalog>>,
220    cols_from_sql: Vec<ColumnCatalog>,
221    col_defs_from_sql: &[ColumnDef],
222    wildcard_idx: Option<usize>,
223    sql_column_strategy: SqlColumnStrategy,
224) -> Result<Vec<ColumnCatalog>> {
225    if let Some(cols_from_source) = cols_from_source {
226        // Need to check `col_defs` to see if a column is generated, as we haven't bind the
227        // `GeneratedColumnDesc` in `ColumnCatalog` yet.
228        let generated_cols_from_sql = cols_from_sql
229            .iter()
230            .filter(|c| {
231                col_defs_from_sql
232                    .iter()
233                    .find(|d| d.name.real_value() == c.name())
234                    .unwrap()
235                    .is_generated()
236            })
237            .cloned()
238            .collect_vec();
239
240        #[allow(clippy::collapsible_else_if)]
241        match sql_column_strategy {
242            // Ignore `cols_from_source`, follow `cols_from_sql` without checking.
243            SqlColumnStrategy::FollowUnchecked => {
244                assert!(
245                    wildcard_idx.is_none(),
246                    "wildcard still exists while strategy is Follows, not correctly purified?"
247                );
248                return Ok(cols_from_sql);
249            }
250
251            // Will merge `generated_cols_from_sql` into `cols_from_source`.
252            SqlColumnStrategy::Ignore => {}
253
254            SqlColumnStrategy::FollowChecked => {
255                let has_regular_cols_from_sql =
256                    generated_cols_from_sql.len() != cols_from_sql.len();
257
258                if has_regular_cols_from_sql {
259                    if wildcard_idx.is_some() {
260                        // (*, normal_column INT)
261                        return Err(RwError::from(NotSupported(
262                            "When there's a wildcard (\"*\"), \
263                             only generated columns are allowed in user-defined schema from SQL"
264                                .to_owned(),
265                            "Remove the non-generated columns".to_owned(),
266                        )));
267                    } else {
268                        // (normal_column INT)
269                        // Follow `cols_from_sql` with name & type checking.
270                        for col in &cols_from_sql {
271                            if generated_cols_from_sql.contains(col) {
272                                continue;
273                            }
274                            let Some(col_from_source) =
275                                cols_from_source.iter().find(|c| c.name() == col.name())
276                            else {
277                                return Err(RwError::from(ProtocolError(format!(
278                                    "Column \"{}\" is defined in SQL but not found in the source",
279                                    col.name()
280                                ))));
281                            };
282
283                            if col_from_source.data_type() != col.data_type() {
284                                return Err(RwError::from(ProtocolError(format!(
285                                    "Data type mismatch for column \"{}\". \
286                                     Defined in SQL as \"{}\", but found in the source as \"{}\"",
287                                    col.name(),
288                                    col.data_type(),
289                                    col_from_source.data_type()
290                                ))));
291                            }
292                        }
293                        return Ok(cols_from_sql);
294                    }
295                } else {
296                    if wildcard_idx.is_some() {
297                        // (*)
298                        // (*, generated_column INT)
299                        // Good, expand the wildcard later.
300                    } else {
301                        // ()
302                        // (generated_column INT)
303                        // Interpreted as if there's a wildcard for backward compatibility.
304                        // TODO: the behavior is not that consistent, making it impossible to ingest no
305                        //       columns from the source (though not useful in practice). Currently we
306                        //       just notice the user but we may want to interpret it as empty columns
307                        //       in the future.
308                        notice_to_user("\
309                            Neither wildcard (\"*\") nor regular (non-generated) columns appear in the user-defined schema from SQL. \
310                            For backward compatibility, all columns from the source will be included at the beginning. \
311                            For clarity, consider adding a wildcard (\"*\") to indicate where the columns from the source should be included, \
312                            or specifying the columns you want to include from the source.
313                        ");
314                    }
315                }
316            }
317        }
318
319        // In some cases the wildcard may be absent:
320        // - plan based on a purified SQL
321        // - interpret `()` as `(*)` for backward compatibility (see notice above)
322        // Default to 0 to expand the wildcard at the beginning.
323        let wildcard_idx = wildcard_idx.unwrap_or(0).min(generated_cols_from_sql.len());
324
325        // Merge `generated_cols_from_sql` with `cols_from_source`.
326        let mut merged_cols = generated_cols_from_sql;
327        let merged_cols_r = merged_cols.split_off(wildcard_idx);
328        merged_cols.extend(cols_from_source);
329        merged_cols.extend(merged_cols_r);
330
331        Ok(merged_cols)
332    } else {
333        if wildcard_idx.is_some() {
334            return Err(RwError::from(NotSupported(
335                "Wildcard in user-defined schema is only allowed when there exists columns from external schema".to_owned(),
336                "Remove the wildcard or use a source with external schema".to_owned(),
337            )));
338        }
339        let non_generated_sql_defined_columns = non_generated_sql_columns(col_defs_from_sql);
340
341        match (&format_encode.format, &format_encode.row_encode) {
342            (Format::DebeziumMongo, Encode::Json) => {
343                let strong_schema = format_encode
344                    .row_options
345                    .iter()
346                    .find(|k| k.name.real_value().to_lowercase() == CDC_MONGODB_STRONG_SCHEMA_KEY)
347                    .map(|k| matches!(k.value, SqlOptionValue::Value(Value::Boolean(true))))
348                    .unwrap_or(false);
349
350                // strong schema requires a '_id' column at the first position with a specific type
351                if strong_schema {
352                    let (_, id_column) = non_generated_sql_defined_columns
353                        .iter()
354                        .enumerate()
355                        .find(|(idx, col)| *idx == 0 && col.name.real_value() == "_id")
356                        .ok_or_else(|| {
357                            RwError::from(ProtocolError(
358                                "The `_id` column of the source with row format DebeziumMongoJson must be defined as the first column in SQL".to_owned(),
359                            ))
360                        })?;
361
362                    let id_data_type = bind_data_type(id_column.data_type.as_ref().unwrap())?;
363                    if !matches!(
364                        id_data_type,
365                        DataType::Varchar | DataType::Int32 | DataType::Int64 | DataType::Jsonb
366                    ) {
367                        return Err(RwError::from(ProtocolError(
368                            "the `_id` column of the source with row format DebeziumMongoJson must be [Jsonb | Varchar | Int32 | Int64]".to_owned(),
369                        )));
370                    }
371
372                    let mut columns = Vec::with_capacity(non_generated_sql_defined_columns.len());
373                    columns.push(
374                        // id column
375                        ColumnCatalog {
376                            column_desc: ColumnDesc::named("_id", 0.into(), id_data_type),
377                            is_hidden: false,
378                        },
379                    );
380
381                    // bind rest of the columns
382                    for (idx, col) in non_generated_sql_defined_columns
383                        .into_iter()
384                        // skip the first column
385                        .skip(1)
386                        .enumerate()
387                    {
388                        columns.push(ColumnCatalog {
389                            column_desc: ColumnDesc::named(
390                                col.name.real_value(),
391                                (idx as i32).into(),
392                                bind_data_type(col.data_type.as_ref().unwrap())?,
393                            ),
394                            is_hidden: false,
395                        });
396                    }
397
398                    return Ok(columns);
399                }
400
401                let mut columns = vec![
402                    ColumnCatalog {
403                        column_desc: ColumnDesc::named("_id", 0.into(), DataType::Varchar),
404                        is_hidden: false,
405                    },
406                    ColumnCatalog {
407                        column_desc: ColumnDesc::named("payload", 0.into(), DataType::Jsonb),
408                        is_hidden: false,
409                    },
410                ];
411
412                if non_generated_sql_defined_columns.len() != 2
413                    || non_generated_sql_defined_columns[0].name.real_value() != columns[0].name()
414                    || non_generated_sql_defined_columns[1].name.real_value() != columns[1].name()
415                {
416                    return Err(RwError::from(ProtocolError(
417                        "the not generated columns of the source with row format DebeziumMongoJson
418        must be (_id [Jsonb | Varchar | Int32 | Int64], payload jsonb)."
419                            .to_owned(),
420                    )));
421                }
422                // ok to unwrap since it was checked at `bind_sql_columns`
423                let key_data_type = bind_data_type(
424                    non_generated_sql_defined_columns[0]
425                        .data_type
426                        .as_ref()
427                        .unwrap(),
428                )?;
429                match key_data_type {
430                    DataType::Jsonb | DataType::Varchar | DataType::Int32 | DataType::Int64 => {
431                        columns[0].column_desc.data_type = key_data_type.clone();
432                    }
433                    _ => {
434                        return Err(RwError::from(ProtocolError(
435                            "the `_id` column of the source with row format DebeziumMongoJson
436        must be [Jsonb | Varchar | Int32 | Int64]"
437                                .to_owned(),
438                        )));
439                    }
440                }
441
442                // ok to unwrap since it was checked at `bind_sql_columns`
443                let value_data_type = bind_data_type(
444                    non_generated_sql_defined_columns[1]
445                        .data_type
446                        .as_ref()
447                        .unwrap(),
448                )?;
449                if !matches!(value_data_type, DataType::Jsonb) {
450                    return Err(RwError::from(ProtocolError(
451                        "the `payload` column of the source with row format DebeziumMongoJson
452        must be Jsonb datatype"
453                            .to_owned(),
454                    )));
455                }
456                Ok(columns)
457            }
458            (Format::Plain, Encode::Bytes) => {
459                let err = Err(RwError::from(ProtocolError(
460                    "ENCODE BYTES only accepts one BYTEA type column".to_owned(),
461                )));
462                if non_generated_sql_defined_columns.len() == 1 {
463                    // ok to unwrap `data_type`` since it was checked at `bind_sql_columns`
464                    let col_data_type = bind_data_type(
465                        non_generated_sql_defined_columns[0]
466                            .data_type
467                            .as_ref()
468                            .unwrap(),
469                    )?;
470                    if col_data_type == DataType::Bytea {
471                        Ok(cols_from_sql)
472                    } else {
473                        err
474                    }
475                } else {
476                    err
477                }
478            }
479            (_, _) => Ok(cols_from_sql),
480        }
481    }
482}
483
484/// TODO: perhaps put the hint in notice is better. The error message format might be not that reliable.
485fn hint_format_encode(format_encode: &FormatEncodeOptions) -> String {
486    format!(
487        r#"Hint: For FORMAT {0} ENCODE {1}, INCLUDE KEY must be specified and the key column must be used as primary key.
488example:
489    CREATE TABLE <table_name> ( PRIMARY KEY ([rw_key | <key_name>]) )
490    INCLUDE KEY [AS <key_name>]
491    WITH (...)
492    FORMAT {0} ENCODE {1}{2}
493"#,
494        format_encode.format,
495        format_encode.row_encode,
496        if format_encode.row_encode == Encode::Json || format_encode.row_encode == Encode::Bytes {
497            "".to_owned()
498        } else {
499            " (...)".to_owned()
500        }
501    )
502}
503
504/// Bind column from source. Add key column to table columns if necessary.
505/// Return `pk_names`.
506pub(crate) async fn bind_source_pk(
507    format_encode: &FormatEncodeOptions,
508    source_info: &StreamSourceInfo,
509    columns: &mut [ColumnCatalog],
510    sql_defined_pk_names: Vec<String>,
511    with_properties: &WithOptionsSecResolved,
512) -> Result<Vec<String>> {
513    let sql_defined_pk = !sql_defined_pk_names.is_empty();
514    let include_key_column_name: Option<String> = {
515        // iter columns to check if contains additional columns from key part
516        // return the key column names if exists
517        columns.iter().find_map(|catalog| {
518            if matches!(
519                catalog.column_desc.additional_column.column_type,
520                Some(AdditionalColumnType::Key(_))
521            ) {
522                Some(catalog.name().to_owned())
523            } else {
524                None
525            }
526        })
527    };
528    let additional_column_names = columns
529        .iter()
530        .filter_map(|col| {
531            if col.column_desc.additional_column.column_type.is_some() {
532                Some(col.name().to_owned())
533            } else {
534                None
535            }
536        })
537        .collect_vec();
538
539    let res = match (&format_encode.format, &format_encode.row_encode) {
540        (Format::Native, Encode::Native) | (Format::None, Encode::None) | (Format::Plain, _) => {
541            sql_defined_pk_names
542        }
543
544        // For all Upsert formats, we only accept one and only key column as primary key.
545        // Additional KEY columns must be set in this case and must be primary key.
546        (Format::Upsert, Encode::Json | Encode::Avro | Encode::Protobuf) => {
547            if let Some(ref key_column_name) = include_key_column_name
548                && sql_defined_pk
549            {
550                // pk is set. check if it's valid
551
552                // the column name have been converted to real value in `handle_addition_columns`
553                // so we don't ignore ascii case here
554                if sql_defined_pk_names.len() != 1
555                    || !key_column_name.eq(sql_defined_pk_names[0].as_str())
556                {
557                    return Err(RwError::from(ProtocolError(format!(
558                        "Only \"{}\" can be used as primary key\n\n{}",
559                        key_column_name,
560                        hint_format_encode(format_encode)
561                    ))));
562                }
563                sql_defined_pk_names
564            } else {
565                // pk not set, or even key not included
566                return if let Some(include_key_column_name) = include_key_column_name {
567                    Err(RwError::from(ProtocolError(format!(
568                        "Primary key must be specified to {}\n\n{}",
569                        include_key_column_name,
570                        hint_format_encode(format_encode)
571                    ))))
572                } else {
573                    Err(RwError::from(ProtocolError(format!(
574                        "INCLUDE KEY clause not set\n\n{}",
575                        hint_format_encode(format_encode)
576                    ))))
577                };
578            }
579        }
580
581        (Format::Debezium, Encode::Json) => {
582            if !additional_column_names.is_empty() {
583                return Err(RwError::from(ProtocolError(format!(
584                    "FORMAT DEBEZIUM forbids additional columns, but got {:?}",
585                    additional_column_names
586                ))));
587            }
588            if !sql_defined_pk {
589                return Err(RwError::from(ProtocolError(
590                    "Primary key must be specified when creating source with FORMAT DEBEZIUM."
591                        .to_owned(),
592                )));
593            }
594            sql_defined_pk_names
595        }
596        (Format::Debezium, Encode::Avro) => {
597            if !additional_column_names.is_empty() {
598                return Err(RwError::from(ProtocolError(format!(
599                    "FORMAT DEBEZIUM forbids additional columns, but got {:?}",
600                    additional_column_names
601                ))));
602            }
603            if sql_defined_pk {
604                sql_defined_pk_names
605            } else {
606                let pk_names =
607                    extract_debezium_avro_table_pk_columns(source_info, with_properties).await?;
608                // extract pk(s) from schema registry
609                for pk_name in &pk_names {
610                    columns
611                        .iter()
612                        .find(|c: &&ColumnCatalog| c.name().eq(pk_name))
613                        .ok_or_else(|| {
614                            RwError::from(ProtocolError(format!(
615                                "avro's key column {} not exists in avro's row schema",
616                                pk_name
617                            )))
618                        })?;
619                }
620                pk_names
621            }
622        }
623        (Format::DebeziumMongo, Encode::Json) => {
624            if sql_defined_pk {
625                sql_defined_pk_names
626            } else {
627                vec!["_id".to_owned()]
628            }
629        }
630
631        (Format::Maxwell, Encode::Json) => {
632            if !additional_column_names.is_empty() {
633                return Err(RwError::from(ProtocolError(format!(
634                    "FORMAT MAXWELL forbids additional columns, but got {:?}",
635                    additional_column_names
636                ))));
637            }
638            if !sql_defined_pk {
639                return Err(RwError::from(ProtocolError(
640    "Primary key must be specified when creating source with FORMAT MAXWELL ENCODE JSON.".to_owned(),
641    )));
642            }
643            sql_defined_pk_names
644        }
645
646        (Format::Canal, Encode::Json) => {
647            if !additional_column_names.is_empty() {
648                return Err(RwError::from(ProtocolError(format!(
649                    "FORMAT CANAL forbids additional columns, but got {:?}",
650                    additional_column_names
651                ))));
652            }
653            if !sql_defined_pk {
654                return Err(RwError::from(ProtocolError(
655    "Primary key must be specified when creating source with FORMAT CANAL ENCODE JSON.".to_owned(),
656    )));
657            }
658            sql_defined_pk_names
659        }
660        (format, encoding) => {
661            return Err(RwError::from(ProtocolError(format!(
662                "Unknown combination {:?} {:?}",
663                format, encoding
664            ))));
665        }
666    };
667    Ok(res)
668}
669
670pub(super) fn bind_source_watermark(
671    session: &SessionImpl,
672    name: String,
673    source_watermarks: Vec<SourceWatermark>,
674    column_catalogs: &[ColumnCatalog],
675) -> Result<Vec<WatermarkDesc>> {
676    let mut binder = Binder::new_for_ddl(session);
677    binder.bind_columns_to_context(name.clone(), column_catalogs)?;
678
679    let watermark_descs = source_watermarks
680        .into_iter()
681        .map(|source_watermark| {
682            let col_name = source_watermark.column.real_value();
683            let watermark_idx = binder.get_column_binding_index(name.clone(), &col_name)?;
684
685            let expr = binder.bind_expr(&source_watermark.expr)?;
686            let watermark_col_type = column_catalogs[watermark_idx].data_type();
687            let watermark_expr_type = &expr.return_type();
688            if watermark_col_type != watermark_expr_type {
689                Err(RwError::from(ErrorCode::BindError(
690                    format!("The return value type of the watermark expression must be identical to the watermark column data type. Current data type of watermark return value: `{}`, column `{}`",watermark_expr_type, watermark_col_type),
691                )))
692            } else {
693                let expr_proto = expr.to_expr_proto();
694                Ok::<_, RwError>(WatermarkDesc {
695                    watermark_idx: watermark_idx as u32,
696                    expr: Some(expr_proto),
697                })
698            }
699        })
700        .try_collect()?;
701    Ok(watermark_descs)
702}
703
704/// Performs early stage checking in frontend to see if the schema of the given `columns` is
705/// compatible with the connector extracted from the properties.
706///
707/// One should only call this function after all properties of all columns are resolved, like
708/// generated column descriptors.
709pub(super) fn check_format_encode(
710    props: &WithOptionsSecResolved,
711    row_id_index: Option<usize>,
712    columns: &[ColumnCatalog],
713) -> Result<()> {
714    let Some(connector) = props.get_connector() else {
715        return Ok(());
716    };
717
718    if connector == NEXMARK_CONNECTOR {
719        check_nexmark_schema(props, row_id_index, columns)
720    } else {
721        Ok(())
722    }
723}
724
725pub fn bind_connector_props(
726    handler_args: &HandlerArgs,
727    format_encode: &FormatEncodeOptions,
728    is_create_source: bool,
729) -> Result<WithOptions> {
730    let mut with_properties = handler_args.with_options.clone().into_connector_props();
731    validate_compatibility(format_encode, &mut with_properties)?;
732    let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
733
734    if !is_create_source && with_properties.is_shareable_only_cdc_connector() {
735        return Err(RwError::from(ProtocolError(format!(
736            "connector {} does not support `CREATE TABLE`, please use `CREATE SOURCE` instead",
737            with_properties.get_connector().unwrap(),
738        ))));
739    }
740    if is_create_source && create_cdc_source_job {
741        if let Some(value) = with_properties.get(AUTO_SCHEMA_CHANGE_KEY)
742            && value.parse::<bool>().map_err(|_| {
743                ErrorCode::InvalidInputSyntax(format!(
744                    "invalid value of '{}' option",
745                    AUTO_SCHEMA_CHANGE_KEY
746                ))
747            })?
748        {
749            Feature::CdcAutoSchemaChange.check_available()?;
750        }
751
752        // set connector to backfill mode
753        with_properties.insert(CDC_SNAPSHOT_MODE_KEY.into(), CDC_SNAPSHOT_BACKFILL.into());
754        // enable cdc sharing mode, which will capture all tables in the given `database.name`
755        with_properties.insert(CDC_SHARING_MODE_KEY.into(), "true".into());
756        // enable transactional cdc
757        if with_properties.enable_transaction_metadata() {
758            with_properties.insert(CDC_TRANSACTIONAL_KEY.into(), "true".into());
759        }
760        // Only set CDC_WAIT_FOR_STREAMING_START_TIMEOUT if not already specified by user.
761        if !with_properties.contains_key(CDC_WAIT_FOR_STREAMING_START_TIMEOUT) {
762            with_properties.insert(
763                CDC_WAIT_FOR_STREAMING_START_TIMEOUT.into(),
764                handler_args
765                    .session
766                    .config()
767                    .cdc_source_wait_streaming_start_timeout()
768                    .to_string(),
769            );
770        }
771    }
772    if with_properties.is_mysql_cdc_connector() {
773        // Generate a random server id for mysql cdc source if needed
774        // `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication
775        // group (that is, different from any other server id being used by any master or slave)
776        with_properties
777            .entry("server.id".to_owned())
778            .or_insert(rand::rng().random_range(1..u32::MAX).to_string());
779    }
780    Ok(with_properties)
781}
782
783/// When the schema can be inferred from external system (like schema registry),
784/// how to handle the regular columns (i.e., non-generated) defined in SQL?
785pub enum SqlColumnStrategy {
786    /// Follow all columns defined in SQL, ignore the columns from external system.
787    /// This ensures that no accidental side effect will change the schema.
788    ///
789    /// This is the behavior when re-planning the target table of `SINK INTO`.
790    FollowUnchecked,
791
792    /// Follow all column defined in SQL, check the columns from external system with name & type.
793    /// This ensures that no accidental side effect will change the schema.
794    ///
795    /// This is the behavior when creating a new table or source with a resolvable schema;
796    /// adding, or dropping columns on that table.
797    // TODO(purify): `ALTER SOURCE` currently has its own code path and does not check.
798    FollowChecked,
799
800    /// Merge the generated columns defined in SQL and columns from external system. If there
801    /// are also regular columns defined in SQL, ignore silently.
802    ///
803    /// This is the behavior when `REFRESH SCHEMA` atop the purified SQL.
804    Ignore,
805}
806
807/// Entrypoint for binding source connector.
808/// Common logic shared by `CREATE SOURCE` and `CREATE TABLE`.
809#[allow(clippy::too_many_arguments)]
810pub async fn bind_create_source_or_table_with_connector(
811    handler_args: HandlerArgs,
812    full_name: ObjectName,
813    format_encode: FormatEncodeOptions,
814    with_properties: WithOptions,
815    sql_columns_defs: &[ColumnDef],
816    constraints: Vec<TableConstraint>,
817    wildcard_idx: Option<usize>,
818    source_watermarks: Vec<SourceWatermark>,
819    columns_from_resolve_source: Option<Vec<ColumnCatalog>>,
820    source_info: StreamSourceInfo,
821    include_column_options: IncludeOption,
822    col_id_gen: &mut ColumnIdGenerator,
823    create_source_type: CreateSourceType,
824    source_rate_limit: Option<u32>,
825    sql_column_strategy: SqlColumnStrategy,
826) -> Result<SourceCatalog> {
827    let session = &handler_args.session;
828    let db_name: &str = &session.database();
829    let (schema_name, source_name) = Binder::resolve_schema_qualified_name(db_name, &full_name)?;
830    let (database_id, schema_id) =
831        session.get_database_and_schema_id_for_create(schema_name.clone())?;
832
833    let is_create_source = create_source_type != CreateSourceType::Table;
834    if !is_create_source && with_properties.is_iceberg_connector() {
835        return Err(ErrorCode::BindError(
836            "can't CREATE TABLE with iceberg connector\n\nHint: use CREATE SOURCE instead"
837                .to_owned(),
838        )
839        .into());
840    }
841
842    if is_create_source {
843        // reject refreshable batch source
844        if with_properties.is_batch_connector() {
845            return Err(ErrorCode::BindError(
846            "can't CREATE SOURCE with refreshable batch connector\n\nHint: use CREATE TABLE instead"
847                .to_owned(),
848        )
849        .into());
850        }
851
852        // reject unsupported formats for CREATE SOURCE
853        match format_encode.format {
854            Format::Upsert
855            | Format::Debezium
856            | Format::DebeziumMongo
857            | Format::Maxwell
858            | Format::Canal => {
859                return Err(ErrorCode::BindError(format!(
860                    "can't CREATE SOURCE with FORMAT {}.\n\nHint: use CREATE TABLE instead\n\n{}",
861                    format_encode.format,
862                    hint_format_encode(&format_encode)
863                ))
864                .into());
865            }
866            _ => {
867                // TODO: enhance error message for other formats
868            }
869        }
870    }
871
872    let sql_pk_names = bind_sql_pk_names(sql_columns_defs, bind_table_constraints(&constraints)?)?;
873
874    // FIXME: ideally we can support it, but current way of handling iceberg additional columns are problematic.
875    // They are treated as normal user columns, so they will be lost if we allow user to specify columns.
876    // See `extract_iceberg_columns`
877    if with_properties.is_iceberg_connector() && !sql_columns_defs.is_empty() {
878        return Err(RwError::from(InvalidInputSyntax(
879            r#"Schema is automatically inferred for iceberg source and should not be specified
880
881HINT: use `CREATE SOURCE <name> WITH (...)` instead of `CREATE SOURCE <name> (<columns>) WITH (...)`."#.to_owned(),
882        )));
883    }
884    let columns_from_sql = bind_sql_columns(sql_columns_defs, false)?;
885
886    let mut columns = bind_all_columns(
887        &format_encode,
888        columns_from_resolve_source,
889        columns_from_sql,
890        sql_columns_defs,
891        wildcard_idx,
892        sql_column_strategy,
893    )?;
894
895    // add additional columns before bind pk, because `format upsert` requires the key column
896    handle_addition_columns(
897        Some(&format_encode),
898        &with_properties,
899        include_column_options,
900        &mut columns,
901        false,
902    )?;
903
904    if columns.is_empty() {
905        return Err(RwError::from(ProtocolError(
906            "Schema definition is required, either from SQL or schema registry.".to_owned(),
907        )));
908    }
909
910    // compatible with the behavior that add a hidden column `_rw_kafka_timestamp` to each message from Kafka source
911    if is_create_source {
912        // must behind `handle_addition_columns`
913        check_and_add_timestamp_column(&with_properties, &mut columns);
914
915        // For shared sources, we will include partition and offset cols in the SourceExecutor's *output*, to be used by the SourceBackfillExecutor.
916        // For shared CDC source, the schema is different. See ColumnCatalog::debezium_cdc_source_cols(), CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS
917        if create_source_type == CreateSourceType::SharedNonCdc {
918            let (columns_exist, additional_columns) = source_add_partition_offset_cols(
919                &columns,
920                &with_properties.get_connector().unwrap(),
921                true, // col_id filled below at col_id_gen.generate
922            );
923            for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) {
924                if !existed {
925                    columns.push(ColumnCatalog::hidden(c));
926                }
927            }
928        }
929    }
930
931    // resolve privatelink connection for Kafka
932    let mut with_properties = with_properties;
933    resolve_privatelink_in_with_option(&mut with_properties)?;
934
935    // check the system parameter `enforce_secret`
936    if session
937        .env()
938        .system_params_manager()
939        .get_params()
940        .load()
941        .enforce_secret()
942        && Feature::SecretManagement.check_available().is_ok()
943    {
944        // check enforce using secret for some props on cloud
945        ConnectorProperties::enforce_secret_source(&with_properties)?;
946    }
947
948    let (with_properties, connection_type, connector_conn_ref) =
949        resolve_connection_ref_and_secret_ref(
950            with_properties,
951            session,
952            Some(TelemetryDatabaseObject::Source),
953        )?;
954    ensure_connection_type_allowed(connection_type, &SOURCE_ALLOWED_CONNECTION_CONNECTOR)?;
955
956    // if not using connection, we don't need to check connector match connection type
957    if !matches!(connection_type, PbConnectionType::Unspecified) {
958        let Some(connector) = with_properties.get_connector() else {
959            return Err(RwError::from(ProtocolError(format!(
960                "missing field '{}' in WITH clause",
961                UPSTREAM_SOURCE_KEY
962            ))));
963        };
964        check_connector_match_connection_type(connector.as_str(), &connection_type)?;
965    }
966
967    let pk_names = bind_source_pk(
968        &format_encode,
969        &source_info,
970        &mut columns,
971        sql_pk_names,
972        &with_properties,
973    )
974    .await?;
975
976    if is_create_source && !pk_names.is_empty() {
977        return Err(ErrorCode::InvalidInputSyntax(
978            "Source does not support PRIMARY KEY constraint, please use \"CREATE TABLE\" instead"
979                .to_owned(),
980        )
981        .into());
982    }
983
984    // User may specify a generated or additional column with the same name as one from the external schema.
985    // Ensure duplicated column names are handled here.
986    if let Some(duplicated_name) = columns.iter().map(|c| c.name()).duplicates().next() {
987        return Err(ErrorCode::InvalidInputSyntax(format!(
988            "column \"{}\" specified more than once",
989            duplicated_name
990        ))
991        .into());
992    }
993
994    // XXX: why do we use col_id_gen here? It doesn't seem to be very necessary.
995    for c in &mut columns {
996        let original_data_type = c.data_type().clone();
997        col_id_gen.generate(c)?;
998        // TODO: Now we restore the data type for `CREATE SOURCE`, so that keep the nested field id unset.
999        //       This behavior is inconsistent with `CREATE TABLE`, and should be fixed once we refactor
1000        //       `ALTER SOURCE` to also use `ColumnIdGenerator` in the future.
1001        if is_create_source {
1002            c.column_desc.data_type = original_data_type;
1003        }
1004    }
1005    debug_assert_column_ids_distinct(&columns);
1006
1007    let (mut columns, pk_col_ids, row_id_index) =
1008        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
1009
1010    let watermark_descs =
1011        bind_source_watermark(session, source_name.clone(), source_watermarks, &columns)?;
1012    // TODO(yuhao): allow multiple watermark on source.
1013    assert!(watermark_descs.len() <= 1);
1014
1015    bind_sql_column_constraints(
1016        session,
1017        source_name.clone(),
1018        &mut columns,
1019        // TODO(st1page): pass the ref
1020        sql_columns_defs,
1021        &pk_col_ids,
1022    )?;
1023    check_format_encode(&with_properties, row_id_index, &columns)?;
1024
1025    let definition = handler_args.normalized_sql.clone();
1026
1027    let associated_table_id = if is_create_source {
1028        None
1029    } else {
1030        Some(TableId::placeholder())
1031    };
1032    let source = SourceCatalog {
1033        id: TableId::placeholder().table_id,
1034        name: source_name,
1035        schema_id,
1036        database_id,
1037        columns,
1038        pk_col_ids,
1039        append_only: row_id_index.is_some(),
1040        owner: session.user_id(),
1041        info: source_info,
1042        row_id_index,
1043        with_properties,
1044        watermark_descs,
1045        associated_table_id,
1046        definition,
1047        connection_id: connector_conn_ref,
1048        created_at_epoch: None,
1049        initialized_at_epoch: None,
1050        version: INITIAL_SOURCE_VERSION_ID,
1051        created_at_cluster_version: None,
1052        initialized_at_cluster_version: None,
1053        rate_limit: source_rate_limit,
1054    };
1055    Ok(source)
1056}
1057
1058pub async fn handle_create_source(
1059    mut handler_args: HandlerArgs,
1060    stmt: CreateSourceStatement,
1061) -> Result<RwPgResponse> {
1062    let session = handler_args.session.clone();
1063    let overwrite_options = OverwriteOptions::new(&mut handler_args);
1064
1065    if let Either::Right(resp) = session.check_relation_name_duplicated(
1066        stmt.source_name.clone(),
1067        StatementType::CREATE_SOURCE,
1068        stmt.if_not_exists,
1069    )? {
1070        return Ok(resp);
1071    }
1072
1073    if handler_args.with_options.is_empty() {
1074        return Err(RwError::from(InvalidInputSyntax(
1075            "missing WITH clause".to_owned(),
1076        )));
1077    }
1078
1079    let format_encode = stmt.format_encode.into_v2_with_warning();
1080    let with_properties = bind_connector_props(&handler_args, &format_encode, true)?;
1081
1082    let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
1083    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
1084        &session,
1085        &format_encode,
1086        Either::Left(&with_properties),
1087        create_source_type,
1088    )
1089    .await?;
1090    let mut col_id_gen = ColumnIdGenerator::new_initial();
1091
1092    if stmt.columns.iter().any(|col| {
1093        col.options
1094            .iter()
1095            .any(|def| matches!(def.option, ColumnOption::NotNull))
1096    }) {
1097        return Err(RwError::from(InvalidInputSyntax(
1098            "NOT NULL constraint is not supported in source schema".to_owned(),
1099        )));
1100    }
1101
1102    let source_catalog = bind_create_source_or_table_with_connector(
1103        handler_args.clone(),
1104        stmt.source_name,
1105        format_encode,
1106        with_properties,
1107        &stmt.columns,
1108        stmt.constraints,
1109        stmt.wildcard_idx,
1110        stmt.source_watermarks,
1111        columns_from_resolve_source,
1112        source_info,
1113        stmt.include_column_options,
1114        &mut col_id_gen,
1115        create_source_type,
1116        overwrite_options.source_rate_limit,
1117        SqlColumnStrategy::FollowChecked,
1118    )
1119    .await?;
1120
1121    // If it is a temporary source, put it into SessionImpl.
1122    if stmt.temporary {
1123        if session.get_temporary_source(&source_catalog.name).is_some() {
1124            return Err(CatalogError::duplicated("source", source_catalog.name.clone()).into());
1125        }
1126        session.create_temporary_source(source_catalog);
1127        return Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE));
1128    }
1129
1130    let source = source_catalog.to_prost();
1131
1132    let catalog_writer = session.catalog_writer()?;
1133
1134    if create_source_type.is_shared() {
1135        let graph = generate_stream_graph_for_source(handler_args, source_catalog)?;
1136        catalog_writer
1137            .create_source(source, Some(graph), stmt.if_not_exists)
1138            .await?;
1139    } else {
1140        // For other sources we don't create a streaming job
1141        catalog_writer
1142            .create_source(source, None, stmt.if_not_exists)
1143            .await?;
1144    }
1145
1146    Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE))
1147}
1148
1149pub(super) fn generate_stream_graph_for_source(
1150    handler_args: HandlerArgs,
1151    source_catalog: SourceCatalog,
1152) -> Result<PbStreamFragmentGraph> {
1153    let context = OptimizerContext::from_handler_args(handler_args);
1154    let source_node = LogicalSource::with_catalog(
1155        Rc::new(source_catalog),
1156        SourceNodeKind::CreateSharedSource,
1157        context.into(),
1158        None,
1159    )?;
1160
1161    let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?;
1162    let graph = build_graph(stream_plan, Some(GraphJobType::Source))?;
1163    Ok(graph)
1164}
1165
1166#[cfg(test)]
1167pub mod tests {
1168    use std::collections::HashMap;
1169    use std::sync::Arc;
1170
1171    use risingwave_common::catalog::{
1172        DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME,
1173    };
1174    use risingwave_common::types::{DataType, StructType};
1175
1176    use crate::catalog::root_catalog::SchemaPath;
1177    use crate::catalog::source_catalog::SourceCatalog;
1178    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
1179
1180    const GET_COLUMN_FROM_CATALOG: fn(&Arc<SourceCatalog>) -> HashMap<&str, DataType> =
1181        |catalog: &Arc<SourceCatalog>| -> HashMap<&str, DataType> {
1182            catalog
1183                .columns
1184                .iter()
1185                .map(|col| (col.name(), col.data_type().clone()))
1186                .collect::<HashMap<&str, DataType>>()
1187        };
1188
1189    #[tokio::test]
1190    async fn test_create_source_handler() {
1191        let proto_file = create_proto_file(PROTO_FILE_DATA);
1192        let sql = format!(
1193            r#"CREATE SOURCE t
1194    WITH (connector = 'kinesis')
1195    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
1196            proto_file.path().to_str().unwrap()
1197        );
1198        let frontend = LocalFrontend::new(Default::default()).await;
1199        frontend.run_sql(sql).await.unwrap();
1200
1201        let session = frontend.session_ref();
1202        let catalog_reader = session.env().catalog_reader().read_guard();
1203        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1204
1205        // Check source exists.
1206        let (source, _) = catalog_reader
1207            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
1208            .unwrap();
1209        assert_eq!(source.name, "t");
1210
1211        let columns = GET_COLUMN_FROM_CATALOG(source);
1212
1213        let city_type = StructType::new(vec![
1214            ("address", DataType::Varchar),
1215            ("zipcode", DataType::Varchar),
1216        ])
1217        // .with_ids([5, 6].map(ColumnId::new))
1218        .into();
1219        let expected_columns = maplit::hashmap! {
1220            ROW_ID_COLUMN_NAME => DataType::Serial,
1221            "id" => DataType::Int32,
1222            "zipcode" => DataType::Int64,
1223            "rate" => DataType::Float32,
1224            "country" => StructType::new(
1225                vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
1226            )
1227            // .with_ids([3, 4, 7].map(ColumnId::new))
1228            .into(),
1229        };
1230        assert_eq!(columns, expected_columns, "{columns:#?}");
1231    }
1232
1233    #[tokio::test]
1234    async fn test_duplicate_props_options() {
1235        let proto_file = create_proto_file(PROTO_FILE_DATA);
1236        let sql = format!(
1237            r#"CREATE SOURCE t
1238    WITH (
1239        connector = 'kinesis',
1240        aws.region='user_test_topic',
1241        endpoint='172.10.1.1:9090,172.10.1.2:9090',
1242        aws.credentials.access_key_id = 'your_access_key_1',
1243        aws.credentials.secret_access_key = 'your_secret_key_1'
1244    )
1245    FORMAT PLAIN ENCODE PROTOBUF (
1246        message = '.test.TestRecord',
1247        aws.credentials.access_key_id = 'your_access_key_2',
1248        aws.credentials.secret_access_key = 'your_secret_key_2',
1249        schema.location = 'file://{}',
1250    )"#,
1251            proto_file.path().to_str().unwrap()
1252        );
1253        let frontend = LocalFrontend::new(Default::default()).await;
1254        frontend.run_sql(sql).await.unwrap();
1255
1256        let session = frontend.session_ref();
1257        let catalog_reader = session.env().catalog_reader().read_guard();
1258        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1259
1260        // Check source exists.
1261        let (source, _) = catalog_reader
1262            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
1263            .unwrap();
1264        assert_eq!(source.name, "t");
1265
1266        // AwsAuth params exist in options.
1267        assert_eq!(
1268            source
1269                .info
1270                .format_encode_options
1271                .get("aws.credentials.access_key_id")
1272                .unwrap(),
1273            "your_access_key_2"
1274        );
1275        assert_eq!(
1276            source
1277                .info
1278                .format_encode_options
1279                .get("aws.credentials.secret_access_key")
1280                .unwrap(),
1281            "your_secret_key_2"
1282        );
1283
1284        // AwsAuth params exist in props.
1285        assert_eq!(
1286            source
1287                .with_properties
1288                .get("aws.credentials.access_key_id")
1289                .unwrap(),
1290            "your_access_key_1"
1291        );
1292        assert_eq!(
1293            source
1294                .with_properties
1295                .get("aws.credentials.secret_access_key")
1296                .unwrap(),
1297            "your_secret_key_1"
1298        );
1299
1300        // Options are not merged into props.
1301        assert!(!source.with_properties.contains_key("schema.location"));
1302    }
1303
1304    #[tokio::test]
1305    async fn test_multi_table_cdc_create_source_handler() {
1306        let sql =
1307            "CREATE SOURCE t2 WITH (connector = 'mysql-cdc') FORMAT PLAIN ENCODE JSON".to_owned();
1308        let frontend = LocalFrontend::new(Default::default()).await;
1309        let session = frontend.session_ref();
1310
1311        frontend
1312            .run_sql_with_session(session.clone(), sql)
1313            .await
1314            .unwrap();
1315        let catalog_reader = session.env().catalog_reader().read_guard();
1316        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1317
1318        // Check source exists.
1319        let (source, _) = catalog_reader
1320            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t2")
1321            .unwrap();
1322        assert_eq!(source.name, "t2");
1323
1324        let columns = source
1325            .columns
1326            .iter()
1327            .map(|col| (col.name(), col.data_type().clone()))
1328            .collect::<Vec<(&str, DataType)>>();
1329
1330        expect_test::expect![[r#"
1331            [
1332                (
1333                    "payload",
1334                    Jsonb,
1335                ),
1336                (
1337                    "_rw_offset",
1338                    Varchar,
1339                ),
1340                (
1341                    "_rw_table_name",
1342                    Varchar,
1343                ),
1344                (
1345                    "_row_id",
1346                    Serial,
1347                ),
1348            ]
1349        "#]]
1350        .assert_debug_eq(&columns);
1351    }
1352
1353    #[tokio::test]
1354    async fn test_source_addition_columns() {
1355        // test derive include column for format plain
1356        let sql =
1357            "CREATE SOURCE s (v1 int) include key as _rw_kafka_key with (connector = 'kafka') format plain encode json".to_owned();
1358        let frontend = LocalFrontend::new(Default::default()).await;
1359        frontend.run_sql(sql).await.unwrap();
1360        let session = frontend.session_ref();
1361        let catalog_reader = session.env().catalog_reader().read_guard();
1362        let (source, _) = catalog_reader
1363            .get_source_by_name(
1364                DEFAULT_DATABASE_NAME,
1365                SchemaPath::Name(DEFAULT_SCHEMA_NAME),
1366                "s",
1367            )
1368            .unwrap();
1369        assert_eq!(source.name, "s");
1370
1371        let columns = source
1372            .columns
1373            .iter()
1374            .map(|col| (col.name(), col.data_type().clone()))
1375            .collect::<Vec<(&str, DataType)>>();
1376
1377        expect_test::expect![[r#"
1378            [
1379                (
1380                    "v1",
1381                    Int32,
1382                ),
1383                (
1384                    "_rw_kafka_key",
1385                    Bytea,
1386                ),
1387                (
1388                    "_rw_kafka_timestamp",
1389                    Timestamptz,
1390                ),
1391                (
1392                    "_rw_kafka_partition",
1393                    Varchar,
1394                ),
1395                (
1396                    "_rw_kafka_offset",
1397                    Varchar,
1398                ),
1399                (
1400                    "_row_id",
1401                    Serial,
1402                ),
1403            ]
1404        "#]]
1405        .assert_debug_eq(&columns);
1406
1407        let sql =
1408            "CREATE SOURCE s3 (v1 int) include timestamp 'header1' as header_col with (connector = 'kafka') format plain encode json".to_owned();
1409        match frontend.run_sql(sql).await {
1410            Err(e) => {
1411                assert_eq!(
1412                    e.to_string(),
1413                    "Protocol error: Only header column can have inner field, but got \"timestamp\""
1414                )
1415            }
1416            _ => unreachable!(),
1417        }
1418    }
1419}