risingwave_frontend/handler/
create_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::LazyLock;
18
19use anyhow::{Context, anyhow};
20use either::Either;
21use external_schema::debezium::extract_debezium_avro_table_pk_columns;
22use external_schema::nexmark::check_nexmark_schema;
23use itertools::Itertools;
24use maplit::{convert_args, hashmap, hashset};
25use pgwire::pg_response::{PgResponse, StatementType};
26use rand::Rng;
27use risingwave_common::array::arrow::{IcebergArrowConvert, arrow_schema_iceberg};
28use risingwave_common::bail_not_implemented;
29use risingwave_common::catalog::{
30    ColumnCatalog, ColumnDesc, ColumnId, INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME,
31    ROW_ID_COLUMN_NAME, TableId, debug_assert_column_ids_distinct,
32};
33use risingwave_common::license::Feature;
34use risingwave_common::secret::LocalSecretManager;
35use risingwave_common::system_param::reader::SystemParamsRead;
36use risingwave_common::types::DataType;
37use risingwave_common::util::iter_util::ZipEqFast;
38use risingwave_connector::parser::additional_columns::{
39    build_additional_column_desc, get_supported_additional_columns,
40    source_add_partition_offset_cols,
41};
42use risingwave_connector::parser::{
43    AvroParserConfig, DEBEZIUM_IGNORE_KEY, DebeziumAvroParserConfig, ProtobufParserConfig,
44    SchemaLocation, SpecificParserConfig, TimestamptzHandling,
45    fetch_json_schema_and_map_to_columns,
46};
47use risingwave_connector::schema::AWS_GLUE_SCHEMA_ARN_KEY;
48use risingwave_connector::schema::schema_registry::{
49    SCHEMA_REGISTRY_BACKOFF_DURATION_KEY, SCHEMA_REGISTRY_BACKOFF_FACTOR_KEY,
50    SCHEMA_REGISTRY_MAX_DELAY_KEY, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_RETRIES_MAX_KEY,
51    SCHEMA_REGISTRY_USERNAME, SchemaRegistryConfig, name_strategy_from_str,
52};
53use risingwave_connector::source::cdc::{
54    CDC_MONGODB_STRONG_SCHEMA_KEY, CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL,
55    CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY, CDC_WAIT_FOR_STREAMING_START_TIMEOUT,
56    CITUS_CDC_CONNECTOR, MONGODB_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR,
57    SQL_SERVER_CDC_CONNECTOR,
58};
59use risingwave_connector::source::datagen::DATAGEN_CONNECTOR;
60use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
61use risingwave_connector::source::nexmark::source::{EventType, get_event_data_types_with_names};
62use risingwave_connector::source::test_source::TEST_CONNECTOR;
63pub use risingwave_connector::source::{
64    ADBC_SNOWFLAKE_CONNECTOR, UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR,
65};
66use risingwave_connector::source::{
67    AZBLOB_CONNECTOR, ConnectorProperties, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR,
68    KINESIS_CONNECTOR, LEGACY_S3_CONNECTOR, MQTT_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR,
69    OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR,
70};
71use risingwave_connector::{AUTO_SCHEMA_CHANGE_KEY, WithPropertiesExt};
72use risingwave_pb::catalog::connection_params::PbConnectionType;
73use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc};
74use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
75use risingwave_pb::plan_common::source_refresh_mode::{RefreshMode, SourceRefreshModeStreaming};
76use risingwave_pb::plan_common::{EncodeType, FormatType, SourceRefreshMode};
77use risingwave_pb::stream_plan::PbStreamFragmentGraph;
78use risingwave_pb::telemetry::TelemetryDatabaseObject;
79use risingwave_sqlparser::ast::{
80    AstString, ColumnDef, ColumnOption, CreateSourceStatement, Encode, Format, FormatEncodeOptions,
81    ObjectName, SourceWatermark, SqlOptionValue, TableConstraint, Value, get_delimiter,
82};
83use risingwave_sqlparser::parser::{IncludeOption, IncludeOptionItem};
84use thiserror_ext::AsReport;
85
86use super::RwPgResponse;
87use crate::binder::Binder;
88use crate::catalog::CatalogError;
89use crate::catalog::source_catalog::SourceCatalog;
90use crate::error::ErrorCode::{self, Deprecated, InvalidInputSyntax, NotSupported, ProtocolError};
91use crate::error::{Result, RwError};
92use crate::expr::Expr;
93use crate::handler::HandlerArgs;
94use crate::handler::create_table::{
95    ColumnIdGenerator, bind_pk_and_row_id_on_relation, bind_sql_column_constraints,
96    bind_sql_columns, bind_sql_pk_names, bind_table_constraints,
97};
98use crate::handler::util::{
99    SourceSchemaCompatExt, check_connector_match_connection_type, ensure_connection_type_allowed,
100};
101use crate::optimizer::plan_node::generic::SourceNodeKind;
102use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext};
103use crate::session::SessionImpl;
104use crate::session::current::notice_to_user;
105use crate::utils::{
106    OverwriteOptions, resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option,
107    resolve_secret_ref_in_with_options, resolve_source_refresh_mode_in_with_option,
108};
109use crate::{OptimizerContext, WithOptions, WithOptionsSecResolved, bind_data_type, build_graph};
110
111mod external_schema;
112pub use external_schema::{
113    bind_columns_from_source, get_schema_location, schema_has_schema_registry,
114};
115mod validate;
116pub use validate::validate_compatibility;
117use validate::{SOURCE_ALLOWED_CONNECTION_CONNECTOR, SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY};
118mod additional_column;
119use additional_column::check_and_add_timestamp_column;
120pub use additional_column::handle_addition_columns;
121use risingwave_common::catalog::ICEBERG_SOURCE_PREFIX;
122use risingwave_common::id::SourceId;
123
124use crate::stream_fragmenter::GraphJobType;
125
126fn non_generated_sql_columns(columns: &[ColumnDef]) -> Vec<ColumnDef> {
127    columns
128        .iter()
129        .filter(|c| !c.is_generated())
130        .cloned()
131        .collect()
132}
133
134fn try_consume_string_from_options(
135    format_encode_options: &mut BTreeMap<String, String>,
136    key: &str,
137) -> Option<AstString> {
138    format_encode_options.remove(key).map(AstString)
139}
140
141fn try_consume_schema_registry_config_from_options(
142    format_encode_options: &mut BTreeMap<String, String>,
143) {
144    [
145        SCHEMA_REGISTRY_USERNAME,
146        SCHEMA_REGISTRY_PASSWORD,
147        SCHEMA_REGISTRY_MAX_DELAY_KEY,
148        SCHEMA_REGISTRY_BACKOFF_DURATION_KEY,
149        SCHEMA_REGISTRY_BACKOFF_FACTOR_KEY,
150        SCHEMA_REGISTRY_RETRIES_MAX_KEY,
151    ]
152    .iter()
153    .for_each(|key| {
154        try_consume_string_from_options(format_encode_options, key);
155    });
156}
157
158fn consume_string_from_options(
159    format_encode_options: &mut BTreeMap<String, String>,
160    key: &str,
161) -> Result<AstString> {
162    try_consume_string_from_options(format_encode_options, key).ok_or(RwError::from(ProtocolError(
163        format!("missing field {} in options", key),
164    )))
165}
166
167fn consume_aws_config_from_options(format_encode_options: &mut BTreeMap<String, String>) {
168    format_encode_options.retain(|key, _| !key.starts_with("aws."))
169}
170
171#[derive(Debug, Clone, Copy, PartialEq, Eq)]
172pub enum CreateSourceType {
173    SharedCdc,
174    /// e.g., shared Kafka source
175    SharedNonCdc,
176    NonShared,
177    /// create table with connector
178    Table,
179}
180
181impl CreateSourceType {
182    /// Note: shouldn't be used for `ALTER SOURCE`, since session variables should not affect existing source. We should respect the original type instead.
183    pub fn for_newly_created(
184        session: &SessionImpl,
185        with_properties: &impl WithPropertiesExt,
186    ) -> Self {
187        if with_properties.is_shareable_cdc_connector() {
188            CreateSourceType::SharedCdc
189        } else if with_properties.is_shareable_non_cdc_connector()
190            && session
191                .env()
192                .streaming_config()
193                .developer
194                .enable_shared_source
195            && session.config().streaming_use_shared_source()
196        {
197            CreateSourceType::SharedNonCdc
198        } else {
199            CreateSourceType::NonShared
200        }
201    }
202
203    pub fn for_replace(catalog: &SourceCatalog) -> Self {
204        if !catalog.info.is_shared() {
205            CreateSourceType::NonShared
206        } else if catalog.with_properties.is_shareable_cdc_connector() {
207            CreateSourceType::SharedCdc
208        } else {
209            CreateSourceType::SharedNonCdc
210        }
211    }
212
213    pub fn is_shared(&self) -> bool {
214        matches!(
215            self,
216            CreateSourceType::SharedCdc | CreateSourceType::SharedNonCdc
217        )
218    }
219}
220
221/// Bind columns from both source and sql defined.
222pub(crate) fn bind_all_columns(
223    format_encode: &FormatEncodeOptions,
224    cols_from_source: Option<Vec<ColumnCatalog>>,
225    cols_from_sql: Vec<ColumnCatalog>,
226    col_defs_from_sql: &[ColumnDef],
227    wildcard_idx: Option<usize>,
228    sql_column_strategy: SqlColumnStrategy,
229) -> Result<Vec<ColumnCatalog>> {
230    if let Some(cols_from_source) = cols_from_source {
231        // Need to check `col_defs` to see if a column is generated, as we haven't bind the
232        // `GeneratedColumnDesc` in `ColumnCatalog` yet.
233        let generated_cols_from_sql = cols_from_sql
234            .iter()
235            .filter(|c| {
236                col_defs_from_sql
237                    .iter()
238                    .find(|d| d.name.real_value() == c.name())
239                    .unwrap()
240                    .is_generated()
241            })
242            .cloned()
243            .collect_vec();
244
245        #[allow(clippy::collapsible_else_if)]
246        match sql_column_strategy {
247            // Ignore `cols_from_source`, follow `cols_from_sql` without checking.
248            SqlColumnStrategy::FollowUnchecked => {
249                assert!(
250                    wildcard_idx.is_none(),
251                    "wildcard still exists while strategy is Follows, not correctly purified?"
252                );
253                return Ok(cols_from_sql);
254            }
255
256            // Will merge `generated_cols_from_sql` into `cols_from_source`.
257            SqlColumnStrategy::Ignore => {}
258
259            SqlColumnStrategy::FollowChecked => {
260                let has_regular_cols_from_sql =
261                    generated_cols_from_sql.len() != cols_from_sql.len();
262
263                if has_regular_cols_from_sql {
264                    if wildcard_idx.is_some() {
265                        // (*, normal_column INT)
266                        return Err(RwError::from(NotSupported(
267                            "When there's a wildcard (\"*\"), \
268                             only generated columns are allowed in user-defined schema from SQL"
269                                .to_owned(),
270                            "Remove the non-generated columns".to_owned(),
271                        )));
272                    } else {
273                        // (normal_column INT)
274                        // Follow `cols_from_sql` with name & type checking.
275                        for col in &cols_from_sql {
276                            if generated_cols_from_sql.contains(col) {
277                                continue;
278                            }
279                            let Some(col_from_source) =
280                                cols_from_source.iter().find(|c| c.name() == col.name())
281                            else {
282                                return Err(RwError::from(ProtocolError(format!(
283                                    "Column \"{}\" is defined in SQL but not found in the source",
284                                    col.name()
285                                ))));
286                            };
287
288                            if col_from_source.data_type() != col.data_type() {
289                                return Err(RwError::from(ProtocolError(format!(
290                                    "Data type mismatch for column \"{}\". \
291                                     Defined in SQL as \"{}\", but found in the source as \"{}\"",
292                                    col.name(),
293                                    col.data_type(),
294                                    col_from_source.data_type()
295                                ))));
296                            }
297                        }
298                        return Ok(cols_from_sql);
299                    }
300                } else {
301                    if wildcard_idx.is_some() {
302                        // (*)
303                        // (*, generated_column INT)
304                        // Good, expand the wildcard later.
305                    } else {
306                        // ()
307                        // (generated_column INT)
308                        // Interpreted as if there's a wildcard for backward compatibility.
309                        // TODO: the behavior is not that consistent, making it impossible to ingest no
310                        //       columns from the source (though not useful in practice). Currently we
311                        //       just notice the user but we may want to interpret it as empty columns
312                        //       in the future.
313                        notice_to_user("\
314                            Neither wildcard (\"*\") nor regular (non-generated) columns appear in the user-defined schema from SQL. \
315                            For backward compatibility, all columns from the source will be included at the beginning. \
316                            For clarity, consider adding a wildcard (\"*\") to indicate where the columns from the source should be included, \
317                            or specifying the columns you want to include from the source.
318                        ");
319                    }
320                }
321            }
322        }
323
324        // In some cases the wildcard may be absent:
325        // - plan based on a purified SQL
326        // - interpret `()` as `(*)` for backward compatibility (see notice above)
327        // Default to 0 to expand the wildcard at the beginning.
328        let wildcard_idx = wildcard_idx.unwrap_or(0).min(generated_cols_from_sql.len());
329
330        // Merge `generated_cols_from_sql` with `cols_from_source`.
331        let mut merged_cols = generated_cols_from_sql;
332        let merged_cols_r = merged_cols.split_off(wildcard_idx);
333        merged_cols.extend(cols_from_source);
334        merged_cols.extend(merged_cols_r);
335
336        Ok(merged_cols)
337    } else {
338        if wildcard_idx.is_some() {
339            return Err(RwError::from(NotSupported(
340                "Wildcard in user-defined schema is only allowed when there exists columns from external schema".to_owned(),
341                "Remove the wildcard or use a source with external schema".to_owned(),
342            )));
343        }
344        let non_generated_sql_defined_columns = non_generated_sql_columns(col_defs_from_sql);
345
346        match (&format_encode.format, &format_encode.row_encode) {
347            (Format::DebeziumMongo, Encode::Json) => {
348                let strong_schema = format_encode
349                    .row_options
350                    .iter()
351                    .find(|k| k.name.real_value().to_lowercase() == CDC_MONGODB_STRONG_SCHEMA_KEY)
352                    .map(|k| matches!(k.value, SqlOptionValue::Value(Value::Boolean(true))))
353                    .unwrap_or(false);
354
355                // strong schema requires a '_id' column at the first position with a specific type
356                if strong_schema {
357                    let (_, id_column) = non_generated_sql_defined_columns
358                        .iter()
359                        .enumerate()
360                        .find(|(idx, col)| *idx == 0 && col.name.real_value() == "_id")
361                        .ok_or_else(|| {
362                            RwError::from(ProtocolError(
363                                "The `_id` column of the source with row format DebeziumMongoJson must be defined as the first column in SQL".to_owned(),
364                            ))
365                        })?;
366
367                    let id_data_type = bind_data_type(id_column.data_type.as_ref().unwrap())?;
368                    if !matches!(
369                        id_data_type,
370                        DataType::Varchar | DataType::Int32 | DataType::Int64 | DataType::Jsonb
371                    ) {
372                        return Err(RwError::from(ProtocolError(
373                            "the `_id` column of the source with row format DebeziumMongoJson must be [Jsonb | Varchar | Int32 | Int64]".to_owned(),
374                        )));
375                    }
376
377                    let mut columns = Vec::with_capacity(non_generated_sql_defined_columns.len());
378                    columns.push(
379                        // id column
380                        ColumnCatalog {
381                            column_desc: ColumnDesc::named("_id", 0.into(), id_data_type),
382                            is_hidden: false,
383                        },
384                    );
385
386                    // bind rest of the columns
387                    for (idx, col) in non_generated_sql_defined_columns
388                        .into_iter()
389                        // skip the first column
390                        .skip(1)
391                        .enumerate()
392                    {
393                        columns.push(ColumnCatalog {
394                            column_desc: ColumnDesc::named(
395                                col.name.real_value(),
396                                (idx as i32).into(),
397                                bind_data_type(col.data_type.as_ref().unwrap())?,
398                            ),
399                            is_hidden: false,
400                        });
401                    }
402
403                    return Ok(columns);
404                }
405
406                let mut columns = vec![
407                    ColumnCatalog {
408                        column_desc: ColumnDesc::named("_id", 0.into(), DataType::Varchar),
409                        is_hidden: false,
410                    },
411                    ColumnCatalog {
412                        column_desc: ColumnDesc::named("payload", 0.into(), DataType::Jsonb),
413                        is_hidden: false,
414                    },
415                ];
416
417                if non_generated_sql_defined_columns.len() != 2
418                    || non_generated_sql_defined_columns[0].name.real_value() != columns[0].name()
419                    || non_generated_sql_defined_columns[1].name.real_value() != columns[1].name()
420                {
421                    return Err(RwError::from(ProtocolError(
422                        "the not generated columns of the source with row format DebeziumMongoJson
423        must be (_id [Jsonb | Varchar | Int32 | Int64], payload jsonb)."
424                            .to_owned(),
425                    )));
426                }
427                // ok to unwrap since it was checked at `bind_sql_columns`
428                let key_data_type = bind_data_type(
429                    non_generated_sql_defined_columns[0]
430                        .data_type
431                        .as_ref()
432                        .unwrap(),
433                )?;
434                match key_data_type {
435                    DataType::Jsonb | DataType::Varchar | DataType::Int32 | DataType::Int64 => {
436                        columns[0].column_desc.data_type = key_data_type;
437                    }
438                    _ => {
439                        return Err(RwError::from(ProtocolError(
440                            "the `_id` column of the source with row format DebeziumMongoJson
441        must be [Jsonb | Varchar | Int32 | Int64]"
442                                .to_owned(),
443                        )));
444                    }
445                }
446
447                // ok to unwrap since it was checked at `bind_sql_columns`
448                let value_data_type = bind_data_type(
449                    non_generated_sql_defined_columns[1]
450                        .data_type
451                        .as_ref()
452                        .unwrap(),
453                )?;
454                if !matches!(value_data_type, DataType::Jsonb) {
455                    return Err(RwError::from(ProtocolError(
456                        "the `payload` column of the source with row format DebeziumMongoJson
457        must be Jsonb datatype"
458                            .to_owned(),
459                    )));
460                }
461                Ok(columns)
462            }
463            (Format::Plain, Encode::Bytes) => {
464                let err = Err(RwError::from(ProtocolError(
465                    "ENCODE BYTES only accepts one BYTEA type column".to_owned(),
466                )));
467                if non_generated_sql_defined_columns.len() == 1 {
468                    // ok to unwrap `data_type`` since it was checked at `bind_sql_columns`
469                    let col_data_type = bind_data_type(
470                        non_generated_sql_defined_columns[0]
471                            .data_type
472                            .as_ref()
473                            .unwrap(),
474                    )?;
475                    if col_data_type == DataType::Bytea {
476                        Ok(cols_from_sql)
477                    } else {
478                        err
479                    }
480                } else {
481                    err
482                }
483            }
484            (_, _) => Ok(cols_from_sql),
485        }
486    }
487}
488
489/// TODO: perhaps put the hint in notice is better. The error message format might be not that reliable.
490fn hint_format_encode(format_encode: &FormatEncodeOptions) -> String {
491    format!(
492        r#"Hint: For FORMAT {0} ENCODE {1}, INCLUDE KEY must be specified and the key column must be used as primary key.
493example:
494    CREATE TABLE <table_name> ( PRIMARY KEY ([rw_key | <key_name>]) )
495    INCLUDE KEY [AS <key_name>]
496    WITH (...)
497    FORMAT {0} ENCODE {1}{2}
498"#,
499        format_encode.format,
500        format_encode.row_encode,
501        if format_encode.row_encode == Encode::Json || format_encode.row_encode == Encode::Bytes {
502            "".to_owned()
503        } else {
504            " (...)".to_owned()
505        }
506    )
507}
508
509/// Bind column from source. Add key column to table columns if necessary.
510/// Return `pk_names`.
511pub(crate) async fn bind_source_pk(
512    format_encode: &FormatEncodeOptions,
513    source_info: &StreamSourceInfo,
514    columns: &mut [ColumnCatalog],
515    sql_defined_pk_names: Vec<String>,
516    with_properties: &WithOptionsSecResolved,
517) -> Result<Vec<String>> {
518    let sql_defined_pk = !sql_defined_pk_names.is_empty();
519    let include_key_column_name: Option<String> = {
520        // iter columns to check if contains additional columns from key part
521        // return the key column names if exists
522        columns.iter().find_map(|catalog| {
523            if matches!(
524                catalog.column_desc.additional_column.column_type,
525                Some(AdditionalColumnType::Key(_))
526            ) {
527                Some(catalog.name().to_owned())
528            } else {
529                None
530            }
531        })
532    };
533    let additional_column_names = columns
534        .iter()
535        .filter_map(|col| {
536            if col.column_desc.additional_column.column_type.is_some() {
537                Some(col.name().to_owned())
538            } else {
539                None
540            }
541        })
542        .collect_vec();
543
544    let res = match (&format_encode.format, &format_encode.row_encode) {
545        (Format::Native, Encode::Native) | (Format::None, Encode::None) | (Format::Plain, _) => {
546            sql_defined_pk_names
547        }
548
549        // For all Upsert formats, we only accept one and only key column as primary key.
550        // Additional KEY columns must be set in this case and must be primary key.
551        (Format::Upsert, Encode::Json | Encode::Avro | Encode::Protobuf) => {
552            if let Some(ref key_column_name) = include_key_column_name
553                && sql_defined_pk
554            {
555                // pk is set. check if it's valid
556
557                // the column name have been converted to real value in `handle_addition_columns`
558                // so we don't ignore ascii case here
559                if sql_defined_pk_names.len() != 1
560                    || !key_column_name.eq(sql_defined_pk_names[0].as_str())
561                {
562                    return Err(RwError::from(ProtocolError(format!(
563                        "Only \"{}\" can be used as primary key\n\n{}",
564                        key_column_name,
565                        hint_format_encode(format_encode)
566                    ))));
567                }
568                sql_defined_pk_names
569            } else {
570                // pk not set, or even key not included
571                return if let Some(include_key_column_name) = include_key_column_name {
572                    Err(RwError::from(ProtocolError(format!(
573                        "Primary key must be specified to {}\n\n{}",
574                        include_key_column_name,
575                        hint_format_encode(format_encode)
576                    ))))
577                } else {
578                    Err(RwError::from(ProtocolError(format!(
579                        "INCLUDE KEY clause not set\n\n{}",
580                        hint_format_encode(format_encode)
581                    ))))
582                };
583            }
584        }
585
586        (Format::Debezium, Encode::Json) => {
587            if !additional_column_names.is_empty() {
588                return Err(RwError::from(ProtocolError(format!(
589                    "FORMAT DEBEZIUM forbids additional columns, but got {:?}",
590                    additional_column_names
591                ))));
592            }
593            if !sql_defined_pk {
594                return Err(RwError::from(ProtocolError(
595                    "Primary key must be specified when creating source with FORMAT DEBEZIUM."
596                        .to_owned(),
597                )));
598            }
599            sql_defined_pk_names
600        }
601        (Format::Debezium, Encode::Avro) => {
602            if !additional_column_names.is_empty() {
603                return Err(RwError::from(ProtocolError(format!(
604                    "FORMAT DEBEZIUM forbids additional columns, but got {:?}",
605                    additional_column_names
606                ))));
607            }
608            if sql_defined_pk {
609                sql_defined_pk_names
610            } else {
611                let pk_names =
612                    extract_debezium_avro_table_pk_columns(source_info, with_properties).await?;
613                // extract pk(s) from schema registry
614                for pk_name in &pk_names {
615                    columns
616                        .iter()
617                        .find(|c: &&ColumnCatalog| c.name().eq(pk_name))
618                        .ok_or_else(|| {
619                            RwError::from(ProtocolError(format!(
620                                "avro's key column {} not exists in avro's row schema",
621                                pk_name
622                            )))
623                        })?;
624                }
625                pk_names
626            }
627        }
628        (Format::DebeziumMongo, Encode::Json) => {
629            if sql_defined_pk {
630                sql_defined_pk_names
631            } else {
632                vec!["_id".to_owned()]
633            }
634        }
635
636        (Format::Maxwell, Encode::Json) => {
637            if !additional_column_names.is_empty() {
638                return Err(RwError::from(ProtocolError(format!(
639                    "FORMAT MAXWELL forbids additional columns, but got {:?}",
640                    additional_column_names
641                ))));
642            }
643            if !sql_defined_pk {
644                return Err(RwError::from(ProtocolError(
645    "Primary key must be specified when creating source with FORMAT MAXWELL ENCODE JSON.".to_owned(),
646    )));
647            }
648            sql_defined_pk_names
649        }
650
651        (Format::Canal, Encode::Json) => {
652            if !additional_column_names.is_empty() {
653                return Err(RwError::from(ProtocolError(format!(
654                    "FORMAT CANAL forbids additional columns, but got {:?}",
655                    additional_column_names
656                ))));
657            }
658            if !sql_defined_pk {
659                return Err(RwError::from(ProtocolError(
660    "Primary key must be specified when creating source with FORMAT CANAL ENCODE JSON.".to_owned(),
661    )));
662            }
663            sql_defined_pk_names
664        }
665        (format, encoding) => {
666            return Err(RwError::from(ProtocolError(format!(
667                "Unknown combination {:?} {:?}",
668                format, encoding
669            ))));
670        }
671    };
672    Ok(res)
673}
674
675pub(super) fn bind_source_watermark(
676    session: &SessionImpl,
677    name: String,
678    source_watermarks: Vec<SourceWatermark>,
679    column_catalogs: &[ColumnCatalog],
680) -> Result<Vec<WatermarkDesc>> {
681    let mut binder = Binder::new_for_ddl(session);
682    binder.bind_columns_to_context(name.clone(), column_catalogs)?;
683
684    let watermark_descs = source_watermarks
685        .into_iter()
686        .map(|source_watermark| {
687            let col_name = source_watermark.column.real_value();
688            let watermark_idx = binder.get_column_binding_index(name.clone(), &col_name)?;
689
690            let expr = binder.bind_expr(&source_watermark.expr)?;
691            let watermark_col_type = column_catalogs[watermark_idx].data_type();
692            let watermark_expr_type = &expr.return_type();
693            if watermark_col_type != watermark_expr_type {
694                Err(RwError::from(ErrorCode::BindError(
695                    format!("The return value type of the watermark expression must be identical to the watermark column data type. Current data type of watermark return value: `{}`, column `{}`",watermark_expr_type, watermark_col_type),
696                )))
697            } else {
698                let expr_proto = expr.to_expr_proto();
699                Ok::<_, RwError>(WatermarkDesc {
700                    watermark_idx: watermark_idx as u32,
701                    expr: Some(expr_proto),
702                })
703            }
704        })
705        .try_collect()?;
706    Ok(watermark_descs)
707}
708
709/// Performs early stage checking in frontend to see if the schema of the given `columns` is
710/// compatible with the connector extracted from the properties.
711///
712/// One should only call this function after all properties of all columns are resolved, like
713/// generated column descriptors.
714pub(super) fn check_format_encode(
715    props: &WithOptionsSecResolved,
716    row_id_index: Option<usize>,
717    columns: &[ColumnCatalog],
718) -> Result<()> {
719    let Some(connector) = props.get_connector() else {
720        return Ok(());
721    };
722
723    if connector == NEXMARK_CONNECTOR {
724        check_nexmark_schema(props, row_id_index, columns)
725    } else {
726        Ok(())
727    }
728}
729
730pub fn bind_connector_props(
731    handler_args: &HandlerArgs,
732    format_encode: &FormatEncodeOptions,
733    is_create_source: bool,
734) -> Result<(WithOptions, SourceRefreshMode)> {
735    let mut with_properties = handler_args.with_options.clone().into_connector_props();
736    validate_compatibility(format_encode, &mut with_properties)?;
737    let refresh_mode = {
738        let refresh_mode = resolve_source_refresh_mode_in_with_option(&mut with_properties)?;
739        if is_create_source && refresh_mode.is_some() {
740            return Err(RwError::from(ProtocolError(
741                "`refresh_mode` only supported for CREATE TABLE".to_owned(),
742            )));
743        }
744
745        refresh_mode.unwrap_or(SourceRefreshMode {
746            refresh_mode: Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})),
747        })
748    };
749
750    let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
751
752    if !is_create_source && with_properties.is_shareable_only_cdc_connector() {
753        return Err(RwError::from(ProtocolError(format!(
754            "connector {} does not support `CREATE TABLE`, please use `CREATE SOURCE` instead",
755            with_properties.get_connector().unwrap(),
756        ))));
757    }
758    if is_create_source && create_cdc_source_job {
759        if let Some(value) = with_properties.get(AUTO_SCHEMA_CHANGE_KEY)
760            && value.parse::<bool>().map_err(|_| {
761                ErrorCode::InvalidInputSyntax(format!(
762                    "invalid value of '{}' option",
763                    AUTO_SCHEMA_CHANGE_KEY
764                ))
765            })?
766        {
767            Feature::CdcAutoSchemaChange.check_available()?;
768        }
769
770        // set connector to backfill mode
771        with_properties.insert(CDC_SNAPSHOT_MODE_KEY.into(), CDC_SNAPSHOT_BACKFILL.into());
772        // enable cdc sharing mode, which will capture all tables in the given `database.name`
773        with_properties.insert(CDC_SHARING_MODE_KEY.into(), "true".into());
774        // enable transactional cdc
775        if with_properties.enable_transaction_metadata() {
776            with_properties.insert(CDC_TRANSACTIONAL_KEY.into(), "true".into());
777        }
778        // Only set CDC_WAIT_FOR_STREAMING_START_TIMEOUT if not already specified by user.
779        if !with_properties.contains_key(CDC_WAIT_FOR_STREAMING_START_TIMEOUT) {
780            with_properties.insert(
781                CDC_WAIT_FOR_STREAMING_START_TIMEOUT.into(),
782                handler_args
783                    .session
784                    .config()
785                    .cdc_source_wait_streaming_start_timeout()
786                    .to_string(),
787            );
788        }
789    }
790    if with_properties.is_mysql_cdc_connector() {
791        // Generate a random server id for mysql cdc source if needed
792        // `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication
793        // group (that is, different from any other server id being used by any master or slave)
794        with_properties
795            .entry("server.id".to_owned())
796            .or_insert(rand::rng().random_range(1..u32::MAX).to_string());
797    }
798    Ok((with_properties, refresh_mode))
799}
800
801/// When the schema can be inferred from external system (like schema registry),
802/// how to handle the regular columns (i.e., non-generated) defined in SQL?
803pub enum SqlColumnStrategy {
804    /// Follow all columns defined in SQL, ignore the columns from external system.
805    /// This ensures that no accidental side effect will change the schema.
806    ///
807    /// This is the behavior when re-planning the target table of `SINK INTO`.
808    FollowUnchecked,
809
810    /// Follow all column defined in SQL, check the columns from external system with name & type.
811    /// This ensures that no accidental side effect will change the schema.
812    ///
813    /// This is the behavior when creating a new table or source with a resolvable schema;
814    /// adding, or dropping columns on that table.
815    // TODO(purify): `ALTER SOURCE` currently has its own code path and does not check.
816    FollowChecked,
817
818    /// Merge the generated columns defined in SQL and columns from external system. If there
819    /// are also regular columns defined in SQL, ignore silently.
820    ///
821    /// This is the behavior when `REFRESH SCHEMA` atop the purified SQL.
822    Ignore,
823}
824
825/// Entrypoint for binding source connector.
826/// Common logic shared by `CREATE SOURCE` and `CREATE TABLE`.
827#[allow(clippy::too_many_arguments)]
828pub async fn bind_create_source_or_table_with_connector(
829    handler_args: HandlerArgs,
830    full_name: ObjectName,
831    format_encode: FormatEncodeOptions,
832    with_properties: WithOptions,
833    sql_columns_defs: &[ColumnDef],
834    constraints: Vec<TableConstraint>,
835    wildcard_idx: Option<usize>,
836    source_watermarks: Vec<SourceWatermark>,
837    columns_from_resolve_source: Option<Vec<ColumnCatalog>>,
838    source_info: StreamSourceInfo,
839    include_column_options: IncludeOption,
840    col_id_gen: &mut ColumnIdGenerator,
841    create_source_type: CreateSourceType,
842    source_rate_limit: Option<u32>,
843    sql_column_strategy: SqlColumnStrategy,
844    refresh_mode: SourceRefreshMode,
845) -> Result<SourceCatalog> {
846    let session = &handler_args.session;
847    let db_name: &str = &session.database();
848    let (schema_name, source_name) = Binder::resolve_schema_qualified_name(db_name, &full_name)?;
849    let (database_id, schema_id) =
850        session.get_database_and_schema_id_for_create(schema_name.clone())?;
851
852    let is_create_source = create_source_type != CreateSourceType::Table;
853
854    if is_create_source {
855        // reject refreshable batch source
856        if with_properties.is_batch_connector() {
857            return Err(ErrorCode::BindError(
858            "can't CREATE SOURCE with refreshable batch connector\n\nHint: use CREATE TABLE instead"
859                .to_owned(),
860        )
861        .into());
862        }
863
864        match format_encode.format {
865            // reject unsupported formats for CREATE SOURCE
866            Format::Debezium | Format::DebeziumMongo | Format::Maxwell | Format::Canal => {
867                return Err(ErrorCode::BindError(format!(
868                    "can't CREATE SOURCE with FORMAT {}.\n\nHint: use CREATE TABLE instead\n\n{}",
869                    format_encode.format,
870                    hint_format_encode(&format_encode)
871                ))
872                .into());
873            }
874            // hint limitations for some other formats
875            Format::Upsert => {
876                notice_to_user(format!(
877                    "Streaming queries on sources with `FORMAT {}` may have limitations. If your query isn't supported, consider using `CREATE TABLE` instead.",
878                    format_encode.format
879                ));
880            }
881            _ => {}
882        }
883    }
884
885    let sql_pk_names = bind_sql_pk_names(sql_columns_defs, bind_table_constraints(&constraints)?)?;
886
887    // FIXME: ideally we can support it, but current way of handling iceberg additional columns are problematic.
888    // They are treated as normal user columns, so they will be lost if we allow user to specify columns.
889    // See `extract_iceberg_columns`
890    if with_properties.is_iceberg_connector() && !sql_columns_defs.is_empty() {
891        return Err(RwError::from(InvalidInputSyntax(
892            r#"Schema is automatically inferred for iceberg source and should not be specified
893
894HINT: use `CREATE SOURCE <name> WITH (...)` instead of `CREATE SOURCE <name> (<columns>) WITH (...)`."#.to_owned(),
895        )));
896    }
897
898    // Same for ADBC Snowflake connector - schema is automatically inferred
899    if with_properties.is_batch_connector()
900        && with_properties
901            .get(UPSTREAM_SOURCE_KEY)
902            .is_some_and(|s| s.eq_ignore_ascii_case(ADBC_SNOWFLAKE_CONNECTOR))
903        && !sql_columns_defs.is_empty()
904    {
905        return Err(RwError::from(InvalidInputSyntax(
906            r#"Schema is automatically inferred for ADBC Snowflake source and should not be specified
907
908HINT: use `CREATE TABLE <name> WITH (...)` instead of `CREATE TABLE <name> (<columns>) WITH (...)`."#.to_owned(),
909        )));
910    }
911    let columns_from_sql = bind_sql_columns(sql_columns_defs, false)?;
912
913    let mut columns = bind_all_columns(
914        &format_encode,
915        columns_from_resolve_source,
916        columns_from_sql,
917        sql_columns_defs,
918        wildcard_idx,
919        sql_column_strategy,
920    )?;
921
922    // add additional columns before bind pk, because `format upsert` requires the key column
923    handle_addition_columns(
924        Some(&format_encode),
925        &with_properties,
926        include_column_options,
927        &mut columns,
928        false,
929    )?;
930
931    if columns.is_empty() {
932        return Err(RwError::from(ProtocolError(
933            "Schema definition is required, either from SQL or schema registry.".to_owned(),
934        )));
935    }
936
937    // compatible with the behavior that add a hidden column `_rw_kafka_timestamp` to each message from Kafka source
938    if is_create_source {
939        // must behind `handle_addition_columns`
940        check_and_add_timestamp_column(&with_properties, &mut columns);
941
942        // For shared sources, we will include partition and offset cols in the SourceExecutor's *output*, to be used by the SourceBackfillExecutor.
943        // For shared CDC source, the schema is different. See ColumnCatalog::debezium_cdc_source_cols(), CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS
944        if create_source_type == CreateSourceType::SharedNonCdc {
945            let (columns_exist, additional_columns) = source_add_partition_offset_cols(
946                &columns,
947                &with_properties.get_connector().unwrap(),
948                true, // col_id filled below at col_id_gen.generate
949            );
950            for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) {
951                if !existed {
952                    columns.push(ColumnCatalog::hidden(c));
953                }
954            }
955        }
956    }
957
958    // resolve privatelink connection for Kafka
959    let mut with_properties = with_properties;
960    resolve_privatelink_in_with_option(&mut with_properties)?;
961
962    // check the system parameter `enforce_secret`
963    if session
964        .env()
965        .system_params_manager()
966        .get_params()
967        .load()
968        .enforce_secret()
969        && Feature::SecretManagement.check_available().is_ok()
970    {
971        // check enforce using secret for some props on cloud
972        ConnectorProperties::enforce_secret_source(&with_properties)?;
973    }
974
975    let (with_properties, connection_type, connector_conn_ref) =
976        resolve_connection_ref_and_secret_ref(
977            with_properties,
978            session,
979            Some(TelemetryDatabaseObject::Source),
980        )?;
981    ensure_connection_type_allowed(connection_type, &SOURCE_ALLOWED_CONNECTION_CONNECTOR)?;
982
983    // if not using connection, we don't need to check connector match connection type
984    if !matches!(connection_type, PbConnectionType::Unspecified) {
985        let Some(connector) = with_properties.get_connector() else {
986            return Err(RwError::from(ProtocolError(format!(
987                "missing field '{}' in WITH clause",
988                UPSTREAM_SOURCE_KEY
989            ))));
990        };
991        check_connector_match_connection_type(connector.as_str(), &connection_type)?;
992    }
993
994    let pk_names = bind_source_pk(
995        &format_encode,
996        &source_info,
997        &mut columns,
998        sql_pk_names,
999        &with_properties,
1000    )
1001    .await?;
1002
1003    // User may specify a generated or additional column with the same name as one from the external schema.
1004    // Ensure duplicated column names are handled here.
1005    if let Some(duplicated_name) = columns.iter().map(|c| c.name()).duplicates().next() {
1006        return Err(ErrorCode::InvalidInputSyntax(format!(
1007            "column \"{}\" specified more than once",
1008            duplicated_name
1009        ))
1010        .into());
1011    }
1012
1013    // XXX: why do we use col_id_gen here? It doesn't seem to be very necessary.
1014    for c in &mut columns {
1015        let original_data_type = c.data_type().clone();
1016        col_id_gen.generate(c)?;
1017        // TODO: Now we restore the data type for `CREATE SOURCE`, so that keep the nested field id unset.
1018        //       This behavior is inconsistent with `CREATE TABLE`, and should be fixed once we refactor
1019        //       `ALTER SOURCE` to also use `ColumnIdGenerator` in the future.
1020        if is_create_source {
1021            c.column_desc.data_type = original_data_type;
1022        }
1023    }
1024    debug_assert_column_ids_distinct(&columns);
1025
1026    let (mut columns, pk_col_ids, row_id_index) =
1027        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
1028
1029    let watermark_descs =
1030        bind_source_watermark(session, source_name.clone(), source_watermarks, &columns)?;
1031    // TODO(yuhao): allow multiple watermark on source.
1032    assert!(watermark_descs.len() <= 1);
1033
1034    let append_only = row_id_index.is_some();
1035    if is_create_source && !append_only && !watermark_descs.is_empty() {
1036        return Err(ErrorCode::NotSupported(
1037            "Defining watermarks on source requires the source connector to be append only."
1038                .to_owned(),
1039            "Use the key words `FORMAT PLAIN`".to_owned(),
1040        )
1041        .into());
1042    }
1043
1044    bind_sql_column_constraints(
1045        session,
1046        source_name.clone(),
1047        &mut columns,
1048        // TODO(st1page): pass the ref
1049        sql_columns_defs,
1050        &pk_col_ids,
1051    )?;
1052    check_format_encode(&with_properties, row_id_index, &columns)?;
1053
1054    let definition = handler_args.normalized_sql.clone();
1055
1056    let associated_table_id = if is_create_source {
1057        None
1058    } else {
1059        Some(TableId::placeholder())
1060    };
1061    let source = SourceCatalog {
1062        id: SourceId::placeholder(),
1063        name: source_name,
1064        schema_id,
1065        database_id,
1066        columns,
1067        pk_col_ids,
1068        append_only,
1069        owner: session.user_id(),
1070        info: source_info,
1071        row_id_index,
1072        with_properties,
1073        watermark_descs,
1074        associated_table_id,
1075        definition,
1076        connection_id: connector_conn_ref,
1077        created_at_epoch: None,
1078        initialized_at_epoch: None,
1079        version: INITIAL_SOURCE_VERSION_ID,
1080        created_at_cluster_version: None,
1081        initialized_at_cluster_version: None,
1082        rate_limit: source_rate_limit,
1083        refresh_mode: Some(refresh_mode),
1084    };
1085    Ok(source)
1086}
1087
1088pub async fn handle_create_source(
1089    mut handler_args: HandlerArgs,
1090    stmt: CreateSourceStatement,
1091) -> Result<RwPgResponse> {
1092    let session = handler_args.session.clone();
1093    let overwrite_options = OverwriteOptions::new(&mut handler_args);
1094
1095    if let Either::Right(resp) = session.check_relation_name_duplicated(
1096        stmt.source_name.clone(),
1097        StatementType::CREATE_SOURCE,
1098        stmt.if_not_exists,
1099    )? {
1100        return Ok(resp);
1101    }
1102
1103    if stmt
1104        .source_name
1105        .base_name()
1106        .starts_with(ICEBERG_SOURCE_PREFIX)
1107    {
1108        return Err(RwError::from(InvalidInputSyntax(format!(
1109            "Source name cannot start with reserved prefix '{}'",
1110            ICEBERG_SOURCE_PREFIX
1111        ))));
1112    }
1113
1114    if handler_args.with_options.is_empty() {
1115        return Err(RwError::from(InvalidInputSyntax(
1116            "missing WITH clause".to_owned(),
1117        )));
1118    }
1119
1120    let format_encode = stmt.format_encode.into_v2_with_warning();
1121    let (with_properties, refresh_mode) =
1122        bind_connector_props(&handler_args, &format_encode, true)?;
1123
1124    let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
1125    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
1126        &session,
1127        &format_encode,
1128        Either::Left(&with_properties),
1129        create_source_type,
1130    )
1131    .await?;
1132    let mut col_id_gen = ColumnIdGenerator::new_initial();
1133
1134    if stmt.columns.iter().any(|col| {
1135        col.options
1136            .iter()
1137            .any(|def| matches!(def.option, ColumnOption::NotNull))
1138    }) {
1139        return Err(RwError::from(InvalidInputSyntax(
1140            "NOT NULL constraint is not supported in source schema".to_owned(),
1141        )));
1142    }
1143
1144    let source_catalog = bind_create_source_or_table_with_connector(
1145        handler_args.clone(),
1146        stmt.source_name,
1147        format_encode,
1148        with_properties,
1149        &stmt.columns,
1150        stmt.constraints,
1151        stmt.wildcard_idx,
1152        stmt.source_watermarks,
1153        columns_from_resolve_source,
1154        source_info,
1155        stmt.include_column_options,
1156        &mut col_id_gen,
1157        create_source_type,
1158        overwrite_options.source_rate_limit,
1159        SqlColumnStrategy::FollowChecked,
1160        refresh_mode,
1161    )
1162    .await?;
1163
1164    // If it is a temporary source, put it into SessionImpl.
1165    if stmt.temporary {
1166        if session.get_temporary_source(&source_catalog.name).is_some() {
1167            return Err(CatalogError::duplicated("source", source_catalog.name.clone()).into());
1168        }
1169        session.create_temporary_source(source_catalog);
1170        return Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE));
1171    }
1172
1173    let source = source_catalog.to_prost();
1174
1175    let catalog_writer = session.catalog_writer()?;
1176
1177    if create_source_type.is_shared() {
1178        let graph = generate_stream_graph_for_source(handler_args, source_catalog)?;
1179        catalog_writer
1180            .create_source(source, Some(graph), stmt.if_not_exists)
1181            .await?;
1182    } else {
1183        // For other sources we don't create a streaming job
1184        catalog_writer
1185            .create_source(source, None, stmt.if_not_exists)
1186            .await?;
1187    }
1188
1189    Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE))
1190}
1191
1192pub(super) fn generate_stream_graph_for_source(
1193    handler_args: HandlerArgs,
1194    source_catalog: SourceCatalog,
1195) -> Result<PbStreamFragmentGraph> {
1196    let context = OptimizerContext::from_handler_args(handler_args);
1197    let source_node = LogicalSource::with_catalog(
1198        Rc::new(source_catalog),
1199        SourceNodeKind::CreateSharedSource,
1200        context.into(),
1201        None,
1202    )?;
1203
1204    let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?;
1205    let graph = build_graph(stream_plan, Some(GraphJobType::Source))?;
1206    Ok(graph)
1207}
1208
1209#[cfg(test)]
1210pub mod tests {
1211    use std::collections::HashMap;
1212    use std::sync::Arc;
1213
1214    use risingwave_common::catalog::{
1215        DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME,
1216    };
1217    use risingwave_common::types::{DataType, StructType};
1218
1219    use crate::catalog::root_catalog::SchemaPath;
1220    use crate::catalog::source_catalog::SourceCatalog;
1221    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
1222
1223    const GET_COLUMN_FROM_CATALOG: fn(&Arc<SourceCatalog>) -> HashMap<&str, DataType> =
1224        |catalog: &Arc<SourceCatalog>| -> HashMap<&str, DataType> {
1225            catalog
1226                .columns
1227                .iter()
1228                .map(|col| (col.name(), col.data_type().clone()))
1229                .collect::<HashMap<&str, DataType>>()
1230        };
1231
1232    #[tokio::test]
1233    async fn test_create_source_handler() {
1234        let proto_file = create_proto_file(PROTO_FILE_DATA);
1235        let sql = format!(
1236            r#"CREATE SOURCE t
1237    WITH (connector = 'kinesis')
1238    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
1239            proto_file.path().to_str().unwrap()
1240        );
1241        let frontend = LocalFrontend::new(Default::default()).await;
1242        frontend.run_sql(sql).await.unwrap();
1243
1244        let session = frontend.session_ref();
1245        let catalog_reader = session.env().catalog_reader().read_guard();
1246        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1247
1248        // Check source exists.
1249        let (source, _) = catalog_reader
1250            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
1251            .unwrap();
1252        assert_eq!(source.name, "t");
1253
1254        let columns = GET_COLUMN_FROM_CATALOG(source);
1255
1256        let city_type = StructType::new(vec![
1257            ("address", DataType::Varchar),
1258            ("zipcode", DataType::Varchar),
1259        ])
1260        // .with_ids([5, 6].map(ColumnId::new))
1261        .into();
1262        let expected_columns = maplit::hashmap! {
1263            ROW_ID_COLUMN_NAME => DataType::Serial,
1264            "id" => DataType::Int32,
1265            "zipcode" => DataType::Int64,
1266            "rate" => DataType::Float32,
1267            "country" => StructType::new(
1268                vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
1269            )
1270            // .with_ids([3, 4, 7].map(ColumnId::new))
1271            .into(),
1272        };
1273        assert_eq!(columns, expected_columns, "{columns:#?}");
1274    }
1275
1276    #[tokio::test]
1277    async fn test_duplicate_props_options() {
1278        let proto_file = create_proto_file(PROTO_FILE_DATA);
1279        let sql = format!(
1280            r#"CREATE SOURCE t
1281    WITH (
1282        connector = 'kinesis',
1283        aws.region='user_test_topic',
1284        endpoint='172.10.1.1:9090,172.10.1.2:9090',
1285        aws.credentials.access_key_id = 'your_access_key_1',
1286        aws.credentials.secret_access_key = 'your_secret_key_1'
1287    )
1288    FORMAT PLAIN ENCODE PROTOBUF (
1289        message = '.test.TestRecord',
1290        aws.credentials.access_key_id = 'your_access_key_2',
1291        aws.credentials.secret_access_key = 'your_secret_key_2',
1292        schema.location = 'file://{}',
1293    )"#,
1294            proto_file.path().to_str().unwrap()
1295        );
1296        let frontend = LocalFrontend::new(Default::default()).await;
1297        frontend.run_sql(sql).await.unwrap();
1298
1299        let session = frontend.session_ref();
1300        let catalog_reader = session.env().catalog_reader().read_guard();
1301        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1302
1303        // Check source exists.
1304        let (source, _) = catalog_reader
1305            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
1306            .unwrap();
1307        assert_eq!(source.name, "t");
1308
1309        // AwsAuth params exist in options.
1310        assert_eq!(
1311            source
1312                .info
1313                .format_encode_options
1314                .get("aws.credentials.access_key_id")
1315                .unwrap(),
1316            "your_access_key_2"
1317        );
1318        assert_eq!(
1319            source
1320                .info
1321                .format_encode_options
1322                .get("aws.credentials.secret_access_key")
1323                .unwrap(),
1324            "your_secret_key_2"
1325        );
1326
1327        // AwsAuth params exist in props.
1328        assert_eq!(
1329            source
1330                .with_properties
1331                .get("aws.credentials.access_key_id")
1332                .unwrap(),
1333            "your_access_key_1"
1334        );
1335        assert_eq!(
1336            source
1337                .with_properties
1338                .get("aws.credentials.secret_access_key")
1339                .unwrap(),
1340            "your_secret_key_1"
1341        );
1342
1343        // Options are not merged into props.
1344        assert!(!source.with_properties.contains_key("schema.location"));
1345    }
1346
1347    #[tokio::test]
1348    async fn test_multi_table_cdc_create_source_handler() {
1349        let sql =
1350            "CREATE SOURCE t2 WITH (connector = 'mysql-cdc') FORMAT PLAIN ENCODE JSON".to_owned();
1351        let frontend = LocalFrontend::new(Default::default()).await;
1352        let session = frontend.session_ref();
1353
1354        frontend
1355            .run_sql_with_session(session.clone(), sql)
1356            .await
1357            .unwrap();
1358        let catalog_reader = session.env().catalog_reader().read_guard();
1359        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1360
1361        // Check source exists.
1362        let (source, _) = catalog_reader
1363            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t2")
1364            .unwrap();
1365        assert_eq!(source.name, "t2");
1366
1367        let columns = source
1368            .columns
1369            .iter()
1370            .map(|col| (col.name(), col.data_type().clone()))
1371            .collect::<Vec<(&str, DataType)>>();
1372
1373        expect_test::expect![[r#"
1374            [
1375                (
1376                    "payload",
1377                    Jsonb,
1378                ),
1379                (
1380                    "_rw_offset",
1381                    Varchar,
1382                ),
1383                (
1384                    "_rw_table_name",
1385                    Varchar,
1386                ),
1387                (
1388                    "_row_id",
1389                    Serial,
1390                ),
1391            ]
1392        "#]]
1393        .assert_debug_eq(&columns);
1394    }
1395
1396    #[tokio::test]
1397    async fn test_source_addition_columns() {
1398        // test derive include column for format plain
1399        let sql =
1400            "CREATE SOURCE s (v1 int) include key as _rw_kafka_key with (connector = 'kafka') format plain encode json".to_owned();
1401        let frontend = LocalFrontend::new(Default::default()).await;
1402        frontend.run_sql(sql).await.unwrap();
1403        let session = frontend.session_ref();
1404        let catalog_reader = session.env().catalog_reader().read_guard();
1405        let (source, _) = catalog_reader
1406            .get_source_by_name(
1407                DEFAULT_DATABASE_NAME,
1408                SchemaPath::Name(DEFAULT_SCHEMA_NAME),
1409                "s",
1410            )
1411            .unwrap();
1412        assert_eq!(source.name, "s");
1413
1414        let columns = source
1415            .columns
1416            .iter()
1417            .map(|col| (col.name(), col.data_type().clone()))
1418            .collect::<Vec<(&str, DataType)>>();
1419
1420        expect_test::expect![[r#"
1421            [
1422                (
1423                    "v1",
1424                    Int32,
1425                ),
1426                (
1427                    "_rw_kafka_key",
1428                    Bytea,
1429                ),
1430                (
1431                    "_rw_kafka_timestamp",
1432                    Timestamptz,
1433                ),
1434                (
1435                    "_rw_kafka_partition",
1436                    Varchar,
1437                ),
1438                (
1439                    "_rw_kafka_offset",
1440                    Varchar,
1441                ),
1442                (
1443                    "_row_id",
1444                    Serial,
1445                ),
1446            ]
1447        "#]]
1448        .assert_debug_eq(&columns);
1449
1450        let sql =
1451            "CREATE SOURCE s3 (v1 int) include timestamp 'header1' as header_col with (connector = 'kafka') format plain encode json".to_owned();
1452        match frontend.run_sql(sql).await {
1453            Err(e) => {
1454                assert_eq!(
1455                    e.to_string(),
1456                    "Protocol error: Only header column can have inner field, but got \"timestamp\""
1457                )
1458            }
1459            _ => unreachable!(),
1460        }
1461    }
1462}