risingwave_frontend/handler/
create_source.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::LazyLock;
18
19use anyhow::{Context, anyhow};
20use either::Either;
21use external_schema::debezium::extract_debezium_avro_table_pk_columns;
22use external_schema::nexmark::check_nexmark_schema;
23use itertools::Itertools;
24use maplit::{convert_args, hashmap, hashset};
25use pgwire::pg_response::{PgResponse, StatementType};
26use rand::Rng;
27use risingwave_common::array::arrow::{IcebergArrowConvert, arrow_schema_iceberg};
28use risingwave_common::bail_not_implemented;
29use risingwave_common::catalog::{
30    ColumnCatalog, ColumnDesc, ColumnId, INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME,
31    ROW_ID_COLUMN_NAME, TableId, debug_assert_column_ids_distinct,
32};
33use risingwave_common::license::Feature;
34use risingwave_common::secret::LocalSecretManager;
35use risingwave_common::system_param::reader::SystemParamsRead;
36use risingwave_common::types::DataType;
37use risingwave_common::util::iter_util::ZipEqFast;
38use risingwave_connector::parser::additional_columns::{
39    build_additional_column_desc, get_supported_additional_columns,
40    source_add_partition_offset_cols,
41};
42use risingwave_connector::parser::{
43    AvroParserConfig, DEBEZIUM_IGNORE_KEY, DebeziumAvroParserConfig, ProtobufParserConfig,
44    SchemaLocation, SpecificParserConfig, TimestamptzHandling,
45    fetch_json_schema_and_map_to_columns,
46};
47use risingwave_connector::schema::AWS_GLUE_SCHEMA_ARN_KEY;
48use risingwave_connector::schema::schema_registry::{
49    SCHEMA_REGISTRY_BACKOFF_DURATION_KEY, SCHEMA_REGISTRY_BACKOFF_FACTOR_KEY,
50    SCHEMA_REGISTRY_MAX_DELAY_KEY, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_RETRIES_MAX_KEY,
51    SCHEMA_REGISTRY_USERNAME, SchemaRegistryConfig, name_strategy_from_str,
52};
53use risingwave_connector::source::cdc::{
54    CDC_MONGODB_STRONG_SCHEMA_KEY, CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL,
55    CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY, CDC_WAIT_FOR_STREAMING_START_TIMEOUT,
56    CITUS_CDC_CONNECTOR, MONGODB_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR,
57    SQL_SERVER_CDC_CONNECTOR,
58};
59use risingwave_connector::source::datagen::DATAGEN_CONNECTOR;
60use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
61use risingwave_connector::source::nexmark::source::{EventType, get_event_data_types_with_names};
62use risingwave_connector::source::test_source::TEST_CONNECTOR;
63pub use risingwave_connector::source::{
64    ADBC_SNOWFLAKE_CONNECTOR, UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR,
65};
66use risingwave_connector::source::{
67    AZBLOB_CONNECTOR, ConnectorProperties, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR,
68    KINESIS_CONNECTOR, LEGACY_S3_CONNECTOR, MQTT_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR,
69    OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR,
70};
71use risingwave_connector::{AUTO_SCHEMA_CHANGE_KEY, WithPropertiesExt};
72use risingwave_pb::catalog::connection_params::PbConnectionType;
73use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc};
74use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
75use risingwave_pb::plan_common::source_refresh_mode::{RefreshMode, SourceRefreshModeStreaming};
76use risingwave_pb::plan_common::{EncodeType, FormatType, SourceRefreshMode};
77use risingwave_pb::stream_plan::PbStreamFragmentGraph;
78use risingwave_pb::telemetry::TelemetryDatabaseObject;
79use risingwave_sqlparser::ast::{
80    AstString, ColumnDef, ColumnOption, CreateSourceStatement, Encode, Format, FormatEncodeOptions,
81    ObjectName, SourceWatermark, SqlOptionValue, TableConstraint, Value, get_delimiter,
82};
83use risingwave_sqlparser::parser::{IncludeOption, IncludeOptionItem};
84use thiserror_ext::AsReport;
85
86use super::RwPgResponse;
87use crate::binder::Binder;
88use crate::catalog::CatalogError;
89use crate::catalog::source_catalog::SourceCatalog;
90use crate::error::ErrorCode::{self, Deprecated, InvalidInputSyntax, NotSupported, ProtocolError};
91use crate::error::{Result, RwError};
92use crate::expr::Expr;
93use crate::handler::HandlerArgs;
94use crate::handler::create_table::{
95    ColumnIdGenerator, bind_pk_and_row_id_on_relation, bind_sql_column_constraints,
96    bind_sql_columns, bind_sql_pk_names, bind_table_constraints,
97};
98use crate::handler::util::{
99    SourceSchemaCompatExt, check_connector_match_connection_type, ensure_connection_type_allowed,
100};
101use crate::optimizer::plan_node::generic::SourceNodeKind;
102use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext};
103use crate::session::SessionImpl;
104use crate::session::current::notice_to_user;
105use crate::utils::{
106    OverwriteOptions, resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option,
107    resolve_secret_ref_in_with_options, resolve_source_refresh_mode_in_with_option,
108};
109use crate::{OptimizerContext, WithOptions, WithOptionsSecResolved, bind_data_type, build_graph};
110
111mod external_schema;
112pub use external_schema::{
113    bind_columns_from_source, get_schema_location, schema_has_schema_registry,
114};
115mod validate;
116pub use validate::validate_compatibility;
117use validate::{SOURCE_ALLOWED_CONNECTION_CONNECTOR, SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY};
118mod additional_column;
119use additional_column::check_and_add_timestamp_column;
120pub use additional_column::handle_addition_columns;
121use risingwave_common::catalog::ICEBERG_SOURCE_PREFIX;
122use risingwave_common::id::SourceId;
123
124use crate::stream_fragmenter::GraphJobType;
125
126fn non_generated_sql_columns(columns: &[ColumnDef]) -> Vec<ColumnDef> {
127    columns
128        .iter()
129        .filter(|c| !c.is_generated())
130        .cloned()
131        .collect()
132}
133
134fn try_consume_string_from_options(
135    format_encode_options: &mut BTreeMap<String, String>,
136    key: &str,
137) -> Option<AstString> {
138    format_encode_options.remove(key).map(AstString)
139}
140
141fn try_consume_schema_registry_config_from_options(
142    format_encode_options: &mut BTreeMap<String, String>,
143) {
144    [
145        SCHEMA_REGISTRY_USERNAME,
146        SCHEMA_REGISTRY_PASSWORD,
147        SCHEMA_REGISTRY_MAX_DELAY_KEY,
148        SCHEMA_REGISTRY_BACKOFF_DURATION_KEY,
149        SCHEMA_REGISTRY_BACKOFF_FACTOR_KEY,
150        SCHEMA_REGISTRY_RETRIES_MAX_KEY,
151    ]
152    .iter()
153    .for_each(|key| {
154        try_consume_string_from_options(format_encode_options, key);
155    });
156}
157
158fn consume_string_from_options(
159    format_encode_options: &mut BTreeMap<String, String>,
160    key: &str,
161) -> Result<AstString> {
162    try_consume_string_from_options(format_encode_options, key).ok_or(RwError::from(ProtocolError(
163        format!("missing field {} in options", key),
164    )))
165}
166
167fn consume_aws_config_from_options(format_encode_options: &mut BTreeMap<String, String>) {
168    format_encode_options.retain(|key, _| !key.starts_with("aws."))
169}
170
171#[derive(Debug, Clone, Copy, PartialEq, Eq)]
172pub enum CreateSourceType {
173    SharedCdc,
174    /// e.g., shared Kafka source
175    SharedNonCdc,
176    NonShared,
177    /// create table with connector
178    Table,
179}
180
181impl CreateSourceType {
182    /// Note: shouldn't be used for `ALTER SOURCE`, since session variables should not affect existing source. We should respect the original type instead.
183    pub fn for_newly_created(
184        session: &SessionImpl,
185        with_properties: &impl WithPropertiesExt,
186    ) -> Self {
187        if with_properties.is_shareable_cdc_connector() {
188            CreateSourceType::SharedCdc
189        } else if with_properties.is_shareable_non_cdc_connector()
190            && session
191                .env()
192                .streaming_config()
193                .developer
194                .enable_shared_source
195            && session.config().streaming_use_shared_source()
196        {
197            CreateSourceType::SharedNonCdc
198        } else {
199            CreateSourceType::NonShared
200        }
201    }
202
203    pub fn for_replace(catalog: &SourceCatalog) -> Self {
204        if !catalog.info.is_shared() {
205            CreateSourceType::NonShared
206        } else if catalog.with_properties.is_shareable_cdc_connector() {
207            CreateSourceType::SharedCdc
208        } else {
209            CreateSourceType::SharedNonCdc
210        }
211    }
212
213    pub fn is_shared(&self) -> bool {
214        matches!(
215            self,
216            CreateSourceType::SharedCdc | CreateSourceType::SharedNonCdc
217        )
218    }
219}
220
221/// Bind columns from both source and sql defined.
222pub(crate) fn bind_all_columns(
223    format_encode: &FormatEncodeOptions,
224    cols_from_source: Option<Vec<ColumnCatalog>>,
225    cols_from_sql: Vec<ColumnCatalog>,
226    col_defs_from_sql: &[ColumnDef],
227    wildcard_idx: Option<usize>,
228    sql_column_strategy: SqlColumnStrategy,
229) -> Result<Vec<ColumnCatalog>> {
230    if let Some(cols_from_source) = cols_from_source {
231        // Need to check `col_defs` to see if a column is generated, as we haven't bind the
232        // `GeneratedColumnDesc` in `ColumnCatalog` yet.
233        let generated_cols_from_sql = cols_from_sql
234            .iter()
235            .filter(|c| {
236                col_defs_from_sql
237                    .iter()
238                    .find(|d| d.name.real_value() == c.name())
239                    .unwrap()
240                    .is_generated()
241            })
242            .cloned()
243            .collect_vec();
244
245        #[allow(clippy::collapsible_else_if)]
246        match sql_column_strategy {
247            // Ignore `cols_from_source`, follow `cols_from_sql` without checking.
248            SqlColumnStrategy::FollowUnchecked => {
249                assert!(
250                    wildcard_idx.is_none(),
251                    "wildcard still exists while strategy is Follows, not correctly purified?"
252                );
253                return Ok(cols_from_sql);
254            }
255
256            // Will merge `generated_cols_from_sql` into `cols_from_source`.
257            SqlColumnStrategy::Ignore => {}
258
259            SqlColumnStrategy::FollowChecked => {
260                let has_regular_cols_from_sql =
261                    generated_cols_from_sql.len() != cols_from_sql.len();
262
263                if has_regular_cols_from_sql {
264                    if wildcard_idx.is_some() {
265                        // (*, normal_column INT)
266                        return Err(RwError::from(NotSupported(
267                            "When there's a wildcard (\"*\"), \
268                             only generated columns are allowed in user-defined schema from SQL"
269                                .to_owned(),
270                            "Remove the non-generated columns".to_owned(),
271                        )));
272                    } else {
273                        // (normal_column INT)
274                        // Follow `cols_from_sql` with name & type checking.
275                        for col in &cols_from_sql {
276                            if generated_cols_from_sql.contains(col) {
277                                continue;
278                            }
279                            let Some(col_from_source) =
280                                cols_from_source.iter().find(|c| c.name() == col.name())
281                            else {
282                                return Err(RwError::from(ProtocolError(format!(
283                                    "Column \"{}\" is defined in SQL but not found in the source",
284                                    col.name()
285                                ))));
286                            };
287
288                            if col_from_source.data_type() != col.data_type() {
289                                return Err(RwError::from(ProtocolError(format!(
290                                    "Data type mismatch for column \"{}\". \
291                                     Defined in SQL as \"{}\", but found in the source as \"{}\"",
292                                    col.name(),
293                                    col.data_type(),
294                                    col_from_source.data_type()
295                                ))));
296                            }
297                        }
298                        return Ok(cols_from_sql);
299                    }
300                } else {
301                    if wildcard_idx.is_some() {
302                        // (*)
303                        // (*, generated_column INT)
304                        // Good, expand the wildcard later.
305                    } else {
306                        // ()
307                        // (generated_column INT)
308                        // Interpreted as if there's a wildcard for backward compatibility.
309                        // TODO: the behavior is not that consistent, making it impossible to ingest no
310                        //       columns from the source (though not useful in practice). Currently we
311                        //       just notice the user but we may want to interpret it as empty columns
312                        //       in the future.
313                        notice_to_user("\
314                            Neither wildcard (\"*\") nor regular (non-generated) columns appear in the user-defined schema from SQL. \
315                            For backward compatibility, all columns from the source will be included at the beginning. \
316                            For clarity, consider adding a wildcard (\"*\") to indicate where the columns from the source should be included, \
317                            or specifying the columns you want to include from the source.
318                        ");
319                    }
320                }
321            }
322        }
323
324        // In some cases the wildcard may be absent:
325        // - plan based on a purified SQL
326        // - interpret `()` as `(*)` for backward compatibility (see notice above)
327        // Default to 0 to expand the wildcard at the beginning.
328        let wildcard_idx = wildcard_idx.unwrap_or(0).min(generated_cols_from_sql.len());
329
330        // Merge `generated_cols_from_sql` with `cols_from_source`.
331        let mut merged_cols = generated_cols_from_sql;
332        let merged_cols_r = merged_cols.split_off(wildcard_idx);
333        merged_cols.extend(cols_from_source);
334        merged_cols.extend(merged_cols_r);
335
336        Ok(merged_cols)
337    } else {
338        if wildcard_idx.is_some() {
339            return Err(RwError::from(NotSupported(
340                "Wildcard in user-defined schema is only allowed when there exists columns from external schema".to_owned(),
341                "Remove the wildcard or use a source with external schema".to_owned(),
342            )));
343        }
344        let non_generated_sql_defined_columns = non_generated_sql_columns(col_defs_from_sql);
345
346        match (&format_encode.format, &format_encode.row_encode) {
347            (Format::DebeziumMongo, Encode::Json) => {
348                let strong_schema = format_encode
349                    .row_options
350                    .iter()
351                    .find(|k| k.name.real_value().to_lowercase() == CDC_MONGODB_STRONG_SCHEMA_KEY)
352                    .map(|k| matches!(k.value, SqlOptionValue::Value(Value::Boolean(true))))
353                    .unwrap_or(false);
354
355                // strong schema requires a '_id' column at the first position with a specific type
356                if strong_schema {
357                    let (_, id_column) = non_generated_sql_defined_columns
358                        .iter()
359                        .enumerate()
360                        .find(|(idx, col)| *idx == 0 && col.name.real_value() == "_id")
361                        .ok_or_else(|| {
362                            RwError::from(ProtocolError(
363                                "The `_id` column of the source with row format DebeziumMongoJson must be defined as the first column in SQL".to_owned(),
364                            ))
365                        })?;
366
367                    let id_data_type = bind_data_type(id_column.data_type.as_ref().unwrap())?;
368                    if !matches!(
369                        id_data_type,
370                        DataType::Varchar | DataType::Int32 | DataType::Int64 | DataType::Jsonb
371                    ) {
372                        return Err(RwError::from(ProtocolError(
373                            "the `_id` column of the source with row format DebeziumMongoJson must be [Jsonb | Varchar | Int32 | Int64]".to_owned(),
374                        )));
375                    }
376
377                    let mut columns = Vec::with_capacity(non_generated_sql_defined_columns.len());
378                    columns.push(
379                        // id column
380                        ColumnCatalog {
381                            column_desc: ColumnDesc::named("_id", 0.into(), id_data_type),
382                            is_hidden: false,
383                        },
384                    );
385
386                    // bind rest of the columns
387                    for (idx, col) in non_generated_sql_defined_columns
388                        .into_iter()
389                        // skip the first column
390                        .skip(1)
391                        .enumerate()
392                    {
393                        columns.push(ColumnCatalog {
394                            column_desc: ColumnDesc::named(
395                                col.name.real_value(),
396                                (idx as i32).into(),
397                                bind_data_type(col.data_type.as_ref().unwrap())?,
398                            ),
399                            is_hidden: false,
400                        });
401                    }
402
403                    return Ok(columns);
404                }
405
406                let mut columns = vec![
407                    ColumnCatalog {
408                        column_desc: ColumnDesc::named("_id", 0.into(), DataType::Varchar),
409                        is_hidden: false,
410                    },
411                    ColumnCatalog {
412                        column_desc: ColumnDesc::named("payload", 0.into(), DataType::Jsonb),
413                        is_hidden: false,
414                    },
415                ];
416
417                if non_generated_sql_defined_columns.len() != 2
418                    || non_generated_sql_defined_columns[0].name.real_value() != columns[0].name()
419                    || non_generated_sql_defined_columns[1].name.real_value() != columns[1].name()
420                {
421                    return Err(RwError::from(ProtocolError(
422                        "the not generated columns of the source with row format DebeziumMongoJson
423        must be (_id [Jsonb | Varchar | Int32 | Int64], payload jsonb)."
424                            .to_owned(),
425                    )));
426                }
427                // ok to unwrap since it was checked at `bind_sql_columns`
428                let key_data_type = bind_data_type(
429                    non_generated_sql_defined_columns[0]
430                        .data_type
431                        .as_ref()
432                        .unwrap(),
433                )?;
434                match key_data_type {
435                    DataType::Jsonb | DataType::Varchar | DataType::Int32 | DataType::Int64 => {
436                        columns[0].column_desc.data_type = key_data_type;
437                    }
438                    _ => {
439                        return Err(RwError::from(ProtocolError(
440                            "the `_id` column of the source with row format DebeziumMongoJson
441        must be [Jsonb | Varchar | Int32 | Int64]"
442                                .to_owned(),
443                        )));
444                    }
445                }
446
447                // ok to unwrap since it was checked at `bind_sql_columns`
448                let value_data_type = bind_data_type(
449                    non_generated_sql_defined_columns[1]
450                        .data_type
451                        .as_ref()
452                        .unwrap(),
453                )?;
454                if !matches!(value_data_type, DataType::Jsonb) {
455                    return Err(RwError::from(ProtocolError(
456                        "the `payload` column of the source with row format DebeziumMongoJson
457        must be Jsonb datatype"
458                            .to_owned(),
459                    )));
460                }
461                Ok(columns)
462            }
463            (Format::Plain, Encode::Bytes) => {
464                let err = Err(RwError::from(ProtocolError(
465                    "ENCODE BYTES only accepts one BYTEA type column".to_owned(),
466                )));
467                if non_generated_sql_defined_columns.len() == 1 {
468                    // ok to unwrap `data_type`` since it was checked at `bind_sql_columns`
469                    let col_data_type = bind_data_type(
470                        non_generated_sql_defined_columns[0]
471                            .data_type
472                            .as_ref()
473                            .unwrap(),
474                    )?;
475                    if col_data_type == DataType::Bytea {
476                        Ok(cols_from_sql)
477                    } else {
478                        err
479                    }
480                } else {
481                    err
482                }
483            }
484            (_, _) => Ok(cols_from_sql),
485        }
486    }
487}
488
489/// TODO: perhaps put the hint in notice is better. The error message format might be not that reliable.
490fn hint_format_encode(format_encode: &FormatEncodeOptions) -> String {
491    format!(
492        r#"Hint: For FORMAT {0} ENCODE {1}, INCLUDE KEY must be specified and the key column must be used as primary key.
493example:
494    CREATE TABLE <table_name> ( PRIMARY KEY ([rw_key | <key_name>]) )
495    INCLUDE KEY [AS <key_name>]
496    WITH (...)
497    FORMAT {0} ENCODE {1}{2}
498"#,
499        format_encode.format,
500        format_encode.row_encode,
501        if format_encode.row_encode == Encode::Json || format_encode.row_encode == Encode::Bytes {
502            "".to_owned()
503        } else {
504            " (...)".to_owned()
505        }
506    )
507}
508
509/// Bind column from source. Add key column to table columns if necessary.
510/// Return `pk_names`.
511pub(crate) async fn bind_source_pk(
512    format_encode: &FormatEncodeOptions,
513    source_info: &StreamSourceInfo,
514    columns: &mut [ColumnCatalog],
515    sql_defined_pk_names: Vec<String>,
516    with_properties: &WithOptionsSecResolved,
517) -> Result<Vec<String>> {
518    let sql_defined_pk = !sql_defined_pk_names.is_empty();
519    let include_key_column_name: Option<String> = {
520        // iter columns to check if contains additional columns from key part
521        // return the key column names if exists
522        columns.iter().find_map(|catalog| {
523            if matches!(
524                catalog.column_desc.additional_column.column_type,
525                Some(AdditionalColumnType::Key(_))
526            ) {
527                Some(catalog.name().to_owned())
528            } else {
529                None
530            }
531        })
532    };
533    let additional_column_names = columns
534        .iter()
535        .filter_map(|col| {
536            if col.column_desc.additional_column.column_type.is_some() {
537                Some(col.name().to_owned())
538            } else {
539                None
540            }
541        })
542        .collect_vec();
543
544    let res = match (&format_encode.format, &format_encode.row_encode) {
545        (Format::Native, Encode::Native) | (Format::None, Encode::None) | (Format::Plain, _) => {
546            sql_defined_pk_names
547        }
548
549        // For all Upsert formats, we only accept one and only key column as primary key.
550        // Additional KEY columns must be set in this case and must be primary key.
551        (Format::Upsert, Encode::Json | Encode::Avro | Encode::Protobuf) => {
552            if let Some(ref key_column_name) = include_key_column_name
553                && sql_defined_pk
554            {
555                // pk is set. check if it's valid
556
557                // the column name have been converted to real value in `handle_addition_columns`
558                // so we don't ignore ascii case here
559                if sql_defined_pk_names.len() != 1
560                    || !key_column_name.eq(sql_defined_pk_names[0].as_str())
561                {
562                    return Err(RwError::from(ProtocolError(format!(
563                        "Only \"{}\" can be used as primary key\n\n{}",
564                        key_column_name,
565                        hint_format_encode(format_encode)
566                    ))));
567                }
568                sql_defined_pk_names
569            } else {
570                // pk not set, or even key not included
571                return if let Some(include_key_column_name) = include_key_column_name {
572                    Err(RwError::from(ProtocolError(format!(
573                        "Primary key must be specified to {}\n\n{}",
574                        include_key_column_name,
575                        hint_format_encode(format_encode)
576                    ))))
577                } else {
578                    Err(RwError::from(ProtocolError(format!(
579                        "INCLUDE KEY clause not set\n\n{}",
580                        hint_format_encode(format_encode)
581                    ))))
582                };
583            }
584        }
585
586        (Format::Debezium, Encode::Json) => {
587            if !additional_column_names.is_empty() {
588                return Err(RwError::from(ProtocolError(format!(
589                    "FORMAT DEBEZIUM forbids additional columns, but got {:?}",
590                    additional_column_names
591                ))));
592            }
593            if !sql_defined_pk {
594                return Err(RwError::from(ProtocolError(
595                    "Primary key must be specified when creating source with FORMAT DEBEZIUM."
596                        .to_owned(),
597                )));
598            }
599            sql_defined_pk_names
600        }
601        (Format::Debezium, Encode::Avro) => {
602            if !additional_column_names.is_empty() {
603                return Err(RwError::from(ProtocolError(format!(
604                    "FORMAT DEBEZIUM forbids additional columns, but got {:?}",
605                    additional_column_names
606                ))));
607            }
608            if sql_defined_pk {
609                sql_defined_pk_names
610            } else {
611                let pk_names =
612                    extract_debezium_avro_table_pk_columns(source_info, with_properties).await?;
613                // extract pk(s) from schema registry
614                for pk_name in &pk_names {
615                    columns
616                        .iter()
617                        .find(|c: &&ColumnCatalog| c.name().eq(pk_name))
618                        .ok_or_else(|| {
619                            RwError::from(ProtocolError(format!(
620                                "avro's key column {} not exists in avro's row schema",
621                                pk_name
622                            )))
623                        })?;
624                }
625                pk_names
626            }
627        }
628        (Format::DebeziumMongo, Encode::Json) => {
629            if sql_defined_pk {
630                sql_defined_pk_names
631            } else {
632                vec!["_id".to_owned()]
633            }
634        }
635
636        (Format::Maxwell, Encode::Json) => {
637            if !additional_column_names.is_empty() {
638                return Err(RwError::from(ProtocolError(format!(
639                    "FORMAT MAXWELL forbids additional columns, but got {:?}",
640                    additional_column_names
641                ))));
642            }
643            if !sql_defined_pk {
644                return Err(RwError::from(ProtocolError(
645    "Primary key must be specified when creating source with FORMAT MAXWELL ENCODE JSON.".to_owned(),
646    )));
647            }
648            sql_defined_pk_names
649        }
650
651        (Format::Canal, Encode::Json) => {
652            if !additional_column_names.is_empty() {
653                return Err(RwError::from(ProtocolError(format!(
654                    "FORMAT CANAL forbids additional columns, but got {:?}",
655                    additional_column_names
656                ))));
657            }
658            if !sql_defined_pk {
659                return Err(RwError::from(ProtocolError(
660    "Primary key must be specified when creating source with FORMAT CANAL ENCODE JSON.".to_owned(),
661    )));
662            }
663            sql_defined_pk_names
664        }
665        (format, encoding) => {
666            return Err(RwError::from(ProtocolError(format!(
667                "Unknown combination {:?} {:?}",
668                format, encoding
669            ))));
670        }
671    };
672    Ok(res)
673}
674
675pub(super) fn bind_source_watermark(
676    session: &SessionImpl,
677    name: String,
678    source_watermarks: Vec<SourceWatermark>,
679    column_catalogs: &[ColumnCatalog],
680) -> Result<Vec<WatermarkDesc>> {
681    let mut binder = Binder::new_for_ddl(session);
682    binder.bind_columns_to_context(name.clone(), column_catalogs)?;
683
684    let watermark_descs = source_watermarks
685        .into_iter()
686        .map(|source_watermark| {
687            let col_name = source_watermark.column.real_value();
688            let watermark_idx = binder.get_column_binding_index(name.clone(), &col_name)?;
689
690            let expr = binder.bind_expr(&source_watermark.expr)?;
691            let watermark_col_type = column_catalogs[watermark_idx].data_type();
692            let watermark_expr_type = &expr.return_type();
693            if watermark_col_type != watermark_expr_type {
694                Err(RwError::from(ErrorCode::BindError(
695                    format!("The return value type of the watermark expression must be identical to the watermark column data type. Current data type of watermark return value: `{}`, column `{}`",watermark_expr_type, watermark_col_type),
696                )))
697            } else {
698                let expr_proto = expr.to_expr_proto();
699                Ok::<_, RwError>(WatermarkDesc {
700                    watermark_idx: watermark_idx as u32,
701                    expr: Some(expr_proto),
702                    with_ttl: source_watermark.with_ttl,
703                })
704            }
705        })
706        .try_collect()?;
707    Ok(watermark_descs)
708}
709
710/// Performs early stage checking in frontend to see if the schema of the given `columns` is
711/// compatible with the connector extracted from the properties.
712///
713/// One should only call this function after all properties of all columns are resolved, like
714/// generated column descriptors.
715pub(super) fn check_format_encode(
716    props: &WithOptionsSecResolved,
717    row_id_index: Option<usize>,
718    columns: &[ColumnCatalog],
719) -> Result<()> {
720    let Some(connector) = props.get_connector() else {
721        return Ok(());
722    };
723
724    if connector == NEXMARK_CONNECTOR {
725        check_nexmark_schema(props, row_id_index, columns)
726    } else {
727        Ok(())
728    }
729}
730
731pub fn bind_connector_props(
732    handler_args: &HandlerArgs,
733    format_encode: &FormatEncodeOptions,
734    is_create_source: bool,
735) -> Result<(WithOptions, SourceRefreshMode)> {
736    let mut with_properties = handler_args.with_options.clone().into_connector_props();
737    validate_compatibility(format_encode, &mut with_properties)?;
738    let refresh_mode = {
739        let refresh_mode = resolve_source_refresh_mode_in_with_option(&mut with_properties)?;
740        if is_create_source && refresh_mode.is_some() {
741            return Err(RwError::from(ProtocolError(
742                "`refresh_mode` only supported for CREATE TABLE".to_owned(),
743            )));
744        }
745
746        refresh_mode.unwrap_or(SourceRefreshMode {
747            refresh_mode: Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})),
748        })
749    };
750
751    let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
752
753    if !is_create_source && with_properties.is_shareable_only_cdc_connector() {
754        return Err(RwError::from(ProtocolError(format!(
755            "connector {} does not support `CREATE TABLE`, please use `CREATE SOURCE` instead",
756            with_properties.get_connector().unwrap(),
757        ))));
758    }
759    if is_create_source && create_cdc_source_job {
760        if let Some(value) = with_properties.get(AUTO_SCHEMA_CHANGE_KEY)
761            && value.parse::<bool>().map_err(|_| {
762                ErrorCode::InvalidInputSyntax(format!(
763                    "invalid value of '{}' option",
764                    AUTO_SCHEMA_CHANGE_KEY
765                ))
766            })?
767        {
768            Feature::CdcAutoSchemaChange.check_available()?;
769        }
770
771        // set connector to backfill mode
772        with_properties.insert(CDC_SNAPSHOT_MODE_KEY.into(), CDC_SNAPSHOT_BACKFILL.into());
773        // enable cdc sharing mode, which will capture all tables in the given `database.name`
774        with_properties.insert(CDC_SHARING_MODE_KEY.into(), "true".into());
775        // enable transactional cdc
776        if with_properties.enable_transaction_metadata() {
777            with_properties.insert(CDC_TRANSACTIONAL_KEY.into(), "true".into());
778        }
779        // Only set CDC_WAIT_FOR_STREAMING_START_TIMEOUT if not already specified by user.
780        if !with_properties.contains_key(CDC_WAIT_FOR_STREAMING_START_TIMEOUT) {
781            with_properties.insert(
782                CDC_WAIT_FOR_STREAMING_START_TIMEOUT.into(),
783                handler_args
784                    .session
785                    .config()
786                    .cdc_source_wait_streaming_start_timeout()
787                    .to_string(),
788            );
789        }
790    }
791    if with_properties.is_mysql_cdc_connector() {
792        // Generate a random server id for mysql cdc source if needed
793        // `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication
794        // group (that is, different from any other server id being used by any master or slave)
795        with_properties
796            .entry("server.id".to_owned())
797            .or_insert(rand::rng().random_range(1..u32::MAX).to_string());
798    }
799    Ok((with_properties, refresh_mode))
800}
801
802/// When the schema can be inferred from external system (like schema registry),
803/// how to handle the regular columns (i.e., non-generated) defined in SQL?
804pub enum SqlColumnStrategy {
805    /// Follow all columns defined in SQL, ignore the columns from external system.
806    /// This ensures that no accidental side effect will change the schema.
807    ///
808    /// This is the behavior when re-planning the target table of `SINK INTO`.
809    FollowUnchecked,
810
811    /// Follow all column defined in SQL, check the columns from external system with name & type.
812    /// This ensures that no accidental side effect will change the schema.
813    ///
814    /// This is the behavior when creating a new table or source with a resolvable schema;
815    /// adding, or dropping columns on that table.
816    // TODO(purify): `ALTER SOURCE` currently has its own code path and does not check.
817    FollowChecked,
818
819    /// Merge the generated columns defined in SQL and columns from external system. If there
820    /// are also regular columns defined in SQL, ignore silently.
821    ///
822    /// This is the behavior when `REFRESH SCHEMA` atop the purified SQL.
823    Ignore,
824}
825
826/// Entrypoint for binding source connector.
827/// Common logic shared by `CREATE SOURCE` and `CREATE TABLE`.
828#[allow(clippy::too_many_arguments)]
829pub async fn bind_create_source_or_table_with_connector(
830    handler_args: HandlerArgs,
831    full_name: ObjectName,
832    format_encode: FormatEncodeOptions,
833    with_properties: WithOptions,
834    sql_columns_defs: &[ColumnDef],
835    constraints: Vec<TableConstraint>,
836    wildcard_idx: Option<usize>,
837    source_watermarks: Vec<SourceWatermark>,
838    columns_from_resolve_source: Option<Vec<ColumnCatalog>>,
839    source_info: StreamSourceInfo,
840    include_column_options: IncludeOption,
841    col_id_gen: &mut ColumnIdGenerator,
842    create_source_type: CreateSourceType,
843    source_rate_limit: Option<u32>,
844    sql_column_strategy: SqlColumnStrategy,
845    refresh_mode: SourceRefreshMode,
846) -> Result<SourceCatalog> {
847    let session = &handler_args.session;
848    let db_name: &str = &session.database();
849    let (schema_name, source_name) = Binder::resolve_schema_qualified_name(db_name, &full_name)?;
850    let (database_id, schema_id) =
851        session.get_database_and_schema_id_for_create(schema_name.clone())?;
852
853    let is_create_source = create_source_type != CreateSourceType::Table;
854
855    if is_create_source {
856        // reject refreshable batch source
857        if with_properties.is_batch_connector() {
858            return Err(ErrorCode::BindError(
859            "can't CREATE SOURCE with refreshable batch connector\n\nHint: use CREATE TABLE instead"
860                .to_owned(),
861        )
862        .into());
863        }
864
865        match format_encode.format {
866            // reject unsupported formats for CREATE SOURCE
867            Format::Debezium | Format::DebeziumMongo | Format::Maxwell | Format::Canal => {
868                return Err(ErrorCode::BindError(format!(
869                    "can't CREATE SOURCE with FORMAT {}.\n\nHint: use CREATE TABLE instead\n\n{}",
870                    format_encode.format,
871                    hint_format_encode(&format_encode)
872                ))
873                .into());
874            }
875            // hint limitations for some other formats
876            Format::Upsert => {
877                notice_to_user(format!(
878                    "Streaming queries on sources with `FORMAT {}` may have limitations. If your query isn't supported, consider using `CREATE TABLE` instead.",
879                    format_encode.format
880                ));
881            }
882            _ => {}
883        }
884    }
885
886    let sql_pk_names = bind_sql_pk_names(sql_columns_defs, bind_table_constraints(&constraints)?)?;
887
888    // FIXME: ideally we can support it, but current way of handling iceberg additional columns are problematic.
889    // They are treated as normal user columns, so they will be lost if we allow user to specify columns.
890    // See `extract_iceberg_columns`
891    if with_properties.is_iceberg_connector() && !sql_columns_defs.is_empty() {
892        return Err(RwError::from(InvalidInputSyntax(
893            r#"Schema is automatically inferred for iceberg source and should not be specified
894
895HINT: use `CREATE SOURCE <name> WITH (...)` instead of `CREATE SOURCE <name> (<columns>) WITH (...)`."#.to_owned(),
896        )));
897    }
898
899    // Same for ADBC Snowflake connector - schema is automatically inferred
900    if with_properties.is_batch_connector()
901        && with_properties
902            .get(UPSTREAM_SOURCE_KEY)
903            .is_some_and(|s| s.eq_ignore_ascii_case(ADBC_SNOWFLAKE_CONNECTOR))
904        && !sql_columns_defs.is_empty()
905    {
906        return Err(RwError::from(InvalidInputSyntax(
907            r#"Schema is automatically inferred for ADBC Snowflake source and should not be specified
908
909HINT: use `CREATE TABLE <name> WITH (...)` instead of `CREATE TABLE <name> (<columns>) WITH (...)`."#.to_owned(),
910        )));
911    }
912    let columns_from_sql = bind_sql_columns(sql_columns_defs, false)?;
913
914    let mut columns = bind_all_columns(
915        &format_encode,
916        columns_from_resolve_source,
917        columns_from_sql,
918        sql_columns_defs,
919        wildcard_idx,
920        sql_column_strategy,
921    )?;
922
923    // add additional columns before bind pk, because `format upsert` requires the key column
924    handle_addition_columns(
925        Some(&format_encode),
926        &with_properties,
927        include_column_options,
928        &mut columns,
929        false,
930    )?;
931
932    if columns.is_empty() {
933        return Err(RwError::from(ProtocolError(
934            "Schema definition is required, either from SQL or schema registry.".to_owned(),
935        )));
936    }
937
938    // compatible with the behavior that add a hidden column `_rw_kafka_timestamp` to each message from Kafka source
939    if is_create_source {
940        // must behind `handle_addition_columns`
941        check_and_add_timestamp_column(&with_properties, &mut columns);
942
943        // For shared sources, we will include partition and offset cols in the SourceExecutor's *output*, to be used by the SourceBackfillExecutor.
944        // For shared CDC source, the schema is different. See ColumnCatalog::debezium_cdc_source_cols(), CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS
945        if create_source_type == CreateSourceType::SharedNonCdc {
946            let (columns_exist, additional_columns) = source_add_partition_offset_cols(
947                &columns,
948                &with_properties.get_connector().unwrap(),
949                true, // col_id filled below at col_id_gen.generate
950            );
951            for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) {
952                if !existed {
953                    columns.push(ColumnCatalog::hidden(c));
954                }
955            }
956        }
957    }
958
959    // resolve privatelink connection for Kafka
960    let mut with_properties = with_properties;
961    resolve_privatelink_in_with_option(&mut with_properties)?;
962
963    // check the system parameter `enforce_secret`
964    if session
965        .env()
966        .system_params_manager()
967        .get_params()
968        .load()
969        .enforce_secret()
970        && Feature::SecretManagement.check_available().is_ok()
971    {
972        // check enforce using secret for some props on cloud
973        ConnectorProperties::enforce_secret_source(&with_properties)?;
974    }
975
976    let (with_properties, connection_type, connector_conn_ref) =
977        resolve_connection_ref_and_secret_ref(
978            with_properties,
979            session,
980            Some(TelemetryDatabaseObject::Source),
981        )?;
982    ensure_connection_type_allowed(connection_type, &SOURCE_ALLOWED_CONNECTION_CONNECTOR)?;
983
984    // if not using connection, we don't need to check connector match connection type
985    if !matches!(connection_type, PbConnectionType::Unspecified) {
986        let Some(connector) = with_properties.get_connector() else {
987            return Err(RwError::from(ProtocolError(format!(
988                "missing field '{}' in WITH clause",
989                UPSTREAM_SOURCE_KEY
990            ))));
991        };
992        check_connector_match_connection_type(connector.as_str(), &connection_type)?;
993    }
994
995    let pk_names = bind_source_pk(
996        &format_encode,
997        &source_info,
998        &mut columns,
999        sql_pk_names,
1000        &with_properties,
1001    )
1002    .await?;
1003
1004    // User may specify a generated or additional column with the same name as one from the external schema.
1005    // Ensure duplicated column names are handled here.
1006    if let Some(duplicated_name) = columns.iter().map(|c| c.name()).duplicates().next() {
1007        return Err(ErrorCode::InvalidInputSyntax(format!(
1008            "column \"{}\" specified more than once",
1009            duplicated_name
1010        ))
1011        .into());
1012    }
1013
1014    // XXX: why do we use col_id_gen here? It doesn't seem to be very necessary.
1015    for c in &mut columns {
1016        let original_data_type = c.data_type().clone();
1017        col_id_gen.generate(c)?;
1018        // TODO: Now we restore the data type for `CREATE SOURCE`, so that keep the nested field id unset.
1019        //       This behavior is inconsistent with `CREATE TABLE`, and should be fixed once we refactor
1020        //       `ALTER SOURCE` to also use `ColumnIdGenerator` in the future.
1021        if is_create_source {
1022            c.column_desc.data_type = original_data_type;
1023        }
1024    }
1025    debug_assert_column_ids_distinct(&columns);
1026
1027    let (mut columns, pk_col_ids, row_id_index) =
1028        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
1029
1030    let watermark_descs =
1031        bind_source_watermark(session, source_name.clone(), source_watermarks, &columns)?;
1032    // TODO(yuhao): allow multiple watermark on source.
1033    assert!(watermark_descs.len() <= 1);
1034    if is_create_source && watermark_descs.iter().any(|d| d.with_ttl) {
1035        return Err(ErrorCode::NotSupported(
1036            "WITH TTL is not supported in WATERMARK clause for CREATE SOURCE.".to_owned(),
1037            "Use `CREATE TABLE ... WATERMARK ... WITH TTL` instead.".to_owned(),
1038        )
1039        .into());
1040    }
1041
1042    let append_only = row_id_index.is_some();
1043    if is_create_source && !append_only && !watermark_descs.is_empty() {
1044        return Err(ErrorCode::NotSupported(
1045            "Defining watermarks on source requires the source connector to be append only."
1046                .to_owned(),
1047            "Use the key words `FORMAT PLAIN`".to_owned(),
1048        )
1049        .into());
1050    }
1051
1052    bind_sql_column_constraints(
1053        session,
1054        source_name.clone(),
1055        &mut columns,
1056        // TODO(st1page): pass the ref
1057        sql_columns_defs,
1058        &pk_col_ids,
1059    )?;
1060    check_format_encode(&with_properties, row_id_index, &columns)?;
1061
1062    let definition = handler_args.normalized_sql.clone();
1063
1064    let associated_table_id = if is_create_source {
1065        None
1066    } else {
1067        Some(TableId::placeholder())
1068    };
1069    let source = SourceCatalog {
1070        id: SourceId::placeholder(),
1071        name: source_name,
1072        schema_id,
1073        database_id,
1074        columns,
1075        pk_col_ids,
1076        append_only,
1077        owner: session.user_id(),
1078        info: source_info,
1079        row_id_index,
1080        with_properties,
1081        watermark_descs,
1082        associated_table_id,
1083        definition,
1084        connection_id: connector_conn_ref,
1085        created_at_epoch: None,
1086        initialized_at_epoch: None,
1087        version: INITIAL_SOURCE_VERSION_ID,
1088        created_at_cluster_version: None,
1089        initialized_at_cluster_version: None,
1090        rate_limit: source_rate_limit,
1091        refresh_mode: Some(refresh_mode),
1092    };
1093    Ok(source)
1094}
1095
1096pub async fn handle_create_source(
1097    mut handler_args: HandlerArgs,
1098    stmt: CreateSourceStatement,
1099) -> Result<RwPgResponse> {
1100    let session = handler_args.session.clone();
1101    let overwrite_options = OverwriteOptions::new(&mut handler_args);
1102
1103    if let Either::Right(resp) = session.check_relation_name_duplicated(
1104        stmt.source_name.clone(),
1105        StatementType::CREATE_SOURCE,
1106        stmt.if_not_exists,
1107    )? {
1108        return Ok(resp);
1109    }
1110
1111    if stmt
1112        .source_name
1113        .base_name()
1114        .starts_with(ICEBERG_SOURCE_PREFIX)
1115    {
1116        return Err(RwError::from(InvalidInputSyntax(format!(
1117            "Source name cannot start with reserved prefix '{}'",
1118            ICEBERG_SOURCE_PREFIX
1119        ))));
1120    }
1121
1122    if handler_args.with_options.is_empty() {
1123        return Err(RwError::from(InvalidInputSyntax(
1124            "missing WITH clause".to_owned(),
1125        )));
1126    }
1127
1128    let format_encode = stmt.format_encode.into_v2_with_warning();
1129    let (with_properties, refresh_mode) =
1130        bind_connector_props(&handler_args, &format_encode, true)?;
1131
1132    let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
1133    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
1134        &session,
1135        &format_encode,
1136        Either::Left(&with_properties),
1137        create_source_type,
1138    )
1139    .await?;
1140    let mut col_id_gen = ColumnIdGenerator::new_initial();
1141
1142    if stmt.columns.iter().any(|col| {
1143        col.options
1144            .iter()
1145            .any(|def| matches!(def.option, ColumnOption::NotNull))
1146    }) {
1147        return Err(RwError::from(InvalidInputSyntax(
1148            "NOT NULL constraint is not supported in source schema".to_owned(),
1149        )));
1150    }
1151
1152    let source_catalog = bind_create_source_or_table_with_connector(
1153        handler_args.clone(),
1154        stmt.source_name,
1155        format_encode,
1156        with_properties,
1157        &stmt.columns,
1158        stmt.constraints,
1159        stmt.wildcard_idx,
1160        stmt.source_watermarks,
1161        columns_from_resolve_source,
1162        source_info,
1163        stmt.include_column_options,
1164        &mut col_id_gen,
1165        create_source_type,
1166        overwrite_options.source_rate_limit,
1167        SqlColumnStrategy::FollowChecked,
1168        refresh_mode,
1169    )
1170    .await?;
1171
1172    // If it is a temporary source, put it into SessionImpl.
1173    if stmt.temporary {
1174        if session.get_temporary_source(&source_catalog.name).is_some() {
1175            return Err(CatalogError::duplicated("source", source_catalog.name.clone()).into());
1176        }
1177        session.create_temporary_source(source_catalog);
1178        return Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE));
1179    }
1180
1181    let source = source_catalog.to_prost();
1182
1183    let catalog_writer = session.catalog_writer()?;
1184
1185    if create_source_type.is_shared() {
1186        let graph = generate_stream_graph_for_source(handler_args, source_catalog)?;
1187        catalog_writer
1188            .create_source(source, Some(graph), stmt.if_not_exists)
1189            .await?;
1190    } else {
1191        // For other sources we don't create a streaming job
1192        catalog_writer
1193            .create_source(source, None, stmt.if_not_exists)
1194            .await?;
1195    }
1196
1197    Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE))
1198}
1199
1200pub(super) fn generate_stream_graph_for_source(
1201    handler_args: HandlerArgs,
1202    source_catalog: SourceCatalog,
1203) -> Result<PbStreamFragmentGraph> {
1204    let context = OptimizerContext::from_handler_args(handler_args);
1205    let source_node = LogicalSource::with_catalog(
1206        Rc::new(source_catalog),
1207        SourceNodeKind::CreateSharedSource,
1208        context.into(),
1209        None,
1210    )?;
1211
1212    let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?;
1213    let graph = build_graph(stream_plan, Some(GraphJobType::Source))?;
1214    Ok(graph)
1215}
1216
1217#[cfg(test)]
1218pub mod tests {
1219    use std::collections::HashMap;
1220    use std::sync::Arc;
1221
1222    use risingwave_common::catalog::{
1223        DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME,
1224    };
1225    use risingwave_common::types::{DataType, StructType};
1226
1227    use crate::catalog::root_catalog::SchemaPath;
1228    use crate::catalog::source_catalog::SourceCatalog;
1229    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
1230
1231    const GET_COLUMN_FROM_CATALOG: fn(&Arc<SourceCatalog>) -> HashMap<&str, DataType> =
1232        |catalog: &Arc<SourceCatalog>| -> HashMap<&str, DataType> {
1233            catalog
1234                .columns
1235                .iter()
1236                .map(|col| (col.name(), col.data_type().clone()))
1237                .collect::<HashMap<&str, DataType>>()
1238        };
1239
1240    #[tokio::test]
1241    async fn test_create_source_handler() {
1242        let proto_file = create_proto_file(PROTO_FILE_DATA);
1243        let sql = format!(
1244            r#"CREATE SOURCE t
1245    WITH (connector = 'kinesis')
1246    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
1247            proto_file.path().to_str().unwrap()
1248        );
1249        let frontend = LocalFrontend::new(Default::default()).await;
1250        frontend.run_sql(sql).await.unwrap();
1251
1252        let session = frontend.session_ref();
1253        let catalog_reader = session.env().catalog_reader().read_guard();
1254        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1255
1256        // Check source exists.
1257        let (source, _) = catalog_reader
1258            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
1259            .unwrap();
1260        assert_eq!(source.name, "t");
1261
1262        let columns = GET_COLUMN_FROM_CATALOG(source);
1263
1264        let city_type = StructType::new(vec![
1265            ("address", DataType::Varchar),
1266            ("zipcode", DataType::Varchar),
1267        ])
1268        // .with_ids([5, 6].map(ColumnId::new))
1269        .into();
1270        let expected_columns = maplit::hashmap! {
1271            ROW_ID_COLUMN_NAME => DataType::Serial,
1272            "id" => DataType::Int32,
1273            "zipcode" => DataType::Int64,
1274            "rate" => DataType::Float32,
1275            "country" => StructType::new(
1276                vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
1277            )
1278            // .with_ids([3, 4, 7].map(ColumnId::new))
1279            .into(),
1280        };
1281        assert_eq!(columns, expected_columns, "{columns:#?}");
1282    }
1283
1284    #[tokio::test]
1285    async fn test_duplicate_props_options() {
1286        let proto_file = create_proto_file(PROTO_FILE_DATA);
1287        let sql = format!(
1288            r#"CREATE SOURCE t
1289    WITH (
1290        connector = 'kinesis',
1291        aws.region='user_test_topic',
1292        endpoint='172.10.1.1:9090,172.10.1.2:9090',
1293        aws.credentials.access_key_id = 'your_access_key_1',
1294        aws.credentials.secret_access_key = 'your_secret_key_1'
1295    )
1296    FORMAT PLAIN ENCODE PROTOBUF (
1297        message = '.test.TestRecord',
1298        aws.credentials.access_key_id = 'your_access_key_2',
1299        aws.credentials.secret_access_key = 'your_secret_key_2',
1300        schema.location = 'file://{}',
1301    )"#,
1302            proto_file.path().to_str().unwrap()
1303        );
1304        let frontend = LocalFrontend::new(Default::default()).await;
1305        frontend.run_sql(sql).await.unwrap();
1306
1307        let session = frontend.session_ref();
1308        let catalog_reader = session.env().catalog_reader().read_guard();
1309        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1310
1311        // Check source exists.
1312        let (source, _) = catalog_reader
1313            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
1314            .unwrap();
1315        assert_eq!(source.name, "t");
1316
1317        // AwsAuth params exist in options.
1318        assert_eq!(
1319            source
1320                .info
1321                .format_encode_options
1322                .get("aws.credentials.access_key_id")
1323                .unwrap(),
1324            "your_access_key_2"
1325        );
1326        assert_eq!(
1327            source
1328                .info
1329                .format_encode_options
1330                .get("aws.credentials.secret_access_key")
1331                .unwrap(),
1332            "your_secret_key_2"
1333        );
1334
1335        // AwsAuth params exist in props.
1336        assert_eq!(
1337            source
1338                .with_properties
1339                .get("aws.credentials.access_key_id")
1340                .unwrap(),
1341            "your_access_key_1"
1342        );
1343        assert_eq!(
1344            source
1345                .with_properties
1346                .get("aws.credentials.secret_access_key")
1347                .unwrap(),
1348            "your_secret_key_1"
1349        );
1350
1351        // Options are not merged into props.
1352        assert!(!source.with_properties.contains_key("schema.location"));
1353    }
1354
1355    #[tokio::test]
1356    async fn test_multi_table_cdc_create_source_handler() {
1357        let sql =
1358            "CREATE SOURCE t2 WITH (connector = 'mysql-cdc') FORMAT PLAIN ENCODE JSON".to_owned();
1359        let frontend = LocalFrontend::new(Default::default()).await;
1360        let session = frontend.session_ref();
1361
1362        frontend
1363            .run_sql_with_session(session.clone(), sql)
1364            .await
1365            .unwrap();
1366        let catalog_reader = session.env().catalog_reader().read_guard();
1367        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1368
1369        // Check source exists.
1370        let (source, _) = catalog_reader
1371            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t2")
1372            .unwrap();
1373        assert_eq!(source.name, "t2");
1374
1375        let columns = source
1376            .columns
1377            .iter()
1378            .map(|col| (col.name(), col.data_type().clone()))
1379            .collect::<Vec<(&str, DataType)>>();
1380
1381        expect_test::expect![[r#"
1382            [
1383                (
1384                    "payload",
1385                    Jsonb,
1386                ),
1387                (
1388                    "_rw_offset",
1389                    Varchar,
1390                ),
1391                (
1392                    "_rw_table_name",
1393                    Varchar,
1394                ),
1395                (
1396                    "_row_id",
1397                    Serial,
1398                ),
1399            ]
1400        "#]]
1401        .assert_debug_eq(&columns);
1402    }
1403
1404    #[tokio::test]
1405    async fn test_source_addition_columns() {
1406        // test derive include column for format plain
1407        let sql =
1408            "CREATE SOURCE s (v1 int) include key as _rw_kafka_key with (connector = 'kafka') format plain encode json".to_owned();
1409        let frontend = LocalFrontend::new(Default::default()).await;
1410        frontend.run_sql(sql).await.unwrap();
1411        let session = frontend.session_ref();
1412        let catalog_reader = session.env().catalog_reader().read_guard();
1413        let (source, _) = catalog_reader
1414            .get_source_by_name(
1415                DEFAULT_DATABASE_NAME,
1416                SchemaPath::Name(DEFAULT_SCHEMA_NAME),
1417                "s",
1418            )
1419            .unwrap();
1420        assert_eq!(source.name, "s");
1421
1422        let columns = source
1423            .columns
1424            .iter()
1425            .map(|col| (col.name(), col.data_type().clone()))
1426            .collect::<Vec<(&str, DataType)>>();
1427
1428        expect_test::expect![[r#"
1429            [
1430                (
1431                    "v1",
1432                    Int32,
1433                ),
1434                (
1435                    "_rw_kafka_key",
1436                    Bytea,
1437                ),
1438                (
1439                    "_rw_kafka_timestamp",
1440                    Timestamptz,
1441                ),
1442                (
1443                    "_rw_kafka_partition",
1444                    Varchar,
1445                ),
1446                (
1447                    "_rw_kafka_offset",
1448                    Varchar,
1449                ),
1450                (
1451                    "_row_id",
1452                    Serial,
1453                ),
1454            ]
1455        "#]]
1456        .assert_debug_eq(&columns);
1457
1458        let sql =
1459            "CREATE SOURCE s3 (v1 int) include timestamp 'header1' as header_col with (connector = 'kafka') format plain encode json".to_owned();
1460        match frontend.run_sql(sql).await {
1461            Err(e) => {
1462                assert_eq!(
1463                    e.to_string(),
1464                    "Protocol error: Only header column can have inner field, but got \"timestamp\""
1465                )
1466            }
1467            _ => unreachable!(),
1468        }
1469    }
1470}