risingwave_frontend/handler/
create_source.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::LazyLock;
18
19use anyhow::{Context, anyhow};
20use either::Either;
21use external_schema::debezium::extract_debezium_avro_table_pk_columns;
22use external_schema::nexmark::check_nexmark_schema;
23use itertools::Itertools;
24use maplit::{convert_args, hashmap, hashset};
25use pgwire::pg_response::{PgResponse, StatementType};
26use rand::Rng;
27use risingwave_common::array::arrow::{IcebergArrowConvert, arrow_schema_iceberg};
28use risingwave_common::bail_not_implemented;
29use risingwave_common::catalog::{
30    ColumnCatalog, ColumnDesc, ColumnId, INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME,
31    ROW_ID_COLUMN_NAME, TableId, debug_assert_column_ids_distinct,
32};
33use risingwave_common::license::Feature;
34use risingwave_common::secret::LocalSecretManager;
35use risingwave_common::system_param::reader::SystemParamsRead;
36use risingwave_common::types::DataType;
37use risingwave_common::util::iter_util::ZipEqFast;
38use risingwave_connector::parser::additional_columns::{
39    build_additional_column_desc, get_supported_additional_columns,
40    source_add_partition_offset_cols,
41};
42use risingwave_connector::parser::{
43    AvroParserConfig, DEBEZIUM_IGNORE_KEY, DebeziumAvroParserConfig, ProtobufParserConfig,
44    SchemaLocation, SpecificParserConfig, TimestamptzHandling,
45    fetch_json_schema_and_map_to_columns,
46};
47use risingwave_connector::schema::AWS_GLUE_SCHEMA_ARN_KEY;
48use risingwave_connector::schema::schema_registry::{
49    SCHEMA_REGISTRY_BACKOFF_DURATION_KEY, SCHEMA_REGISTRY_BACKOFF_FACTOR_KEY,
50    SCHEMA_REGISTRY_CA_PEM_PATH, SCHEMA_REGISTRY_MAX_DELAY_KEY, SCHEMA_REGISTRY_PASSWORD,
51    SCHEMA_REGISTRY_RETRIES_MAX_KEY, SCHEMA_REGISTRY_USERNAME, SchemaRegistryConfig,
52    name_strategy_from_str,
53};
54use risingwave_connector::source::cdc::{
55    CDC_MONGODB_STRONG_SCHEMA_KEY, CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL,
56    CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY, CDC_WAIT_FOR_STREAMING_START_TIMEOUT,
57    CITUS_CDC_CONNECTOR, MONGODB_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR,
58    SQL_SERVER_CDC_CONNECTOR,
59};
60use risingwave_connector::source::datagen::DATAGEN_CONNECTOR;
61use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
62use risingwave_connector::source::nexmark::source::{EventType, get_event_data_types_with_names};
63use risingwave_connector::source::test_source::TEST_CONNECTOR;
64pub use risingwave_connector::source::{
65    ADBC_SNOWFLAKE_CONNECTOR, UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR,
66};
67use risingwave_connector::source::{
68    AZBLOB_CONNECTOR, ConnectorProperties, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR,
69    KINESIS_CONNECTOR, LEGACY_S3_CONNECTOR, MQTT_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR,
70    OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR,
71};
72use risingwave_connector::{AUTO_SCHEMA_CHANGE_KEY, WithPropertiesExt};
73use risingwave_pb::catalog::connection_params::PbConnectionType;
74use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc};
75use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
76use risingwave_pb::plan_common::source_refresh_mode::{RefreshMode, SourceRefreshModeStreaming};
77use risingwave_pb::plan_common::{EncodeType, FormatType, SourceRefreshMode};
78use risingwave_pb::stream_plan::PbStreamFragmentGraph;
79use risingwave_pb::telemetry::TelemetryDatabaseObject;
80use risingwave_sqlparser::ast::{
81    AstString, ColumnDef, ColumnOption, CreateSourceStatement, Encode, Format, FormatEncodeOptions,
82    ObjectName, SourceWatermark, SqlOptionValue, TableConstraint, Value, get_delimiter,
83};
84use risingwave_sqlparser::parser::{IncludeOption, IncludeOptionItem};
85use thiserror_ext::AsReport;
86
87use super::RwPgResponse;
88use crate::binder::Binder;
89use crate::catalog::CatalogError;
90use crate::catalog::source_catalog::SourceCatalog;
91use crate::error::ErrorCode::{self, Deprecated, InvalidInputSyntax, NotSupported, ProtocolError};
92use crate::error::{Result, RwError};
93use crate::expr::{Expr, ExprRewriter, SessionTimezone};
94use crate::handler::HandlerArgs;
95use crate::handler::create_table::{
96    ColumnIdGenerator, bind_pk_and_row_id_on_relation, bind_sql_column_constraints,
97    bind_sql_columns, bind_sql_pk_names, bind_table_constraints,
98};
99use crate::handler::util::{
100    SourceSchemaCompatExt, check_connector_match_connection_type, ensure_connection_type_allowed,
101};
102use crate::optimizer::plan_node::generic::SourceNodeKind;
103use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext};
104use crate::session::SessionImpl;
105use crate::session::current::notice_to_user;
106use crate::utils::{
107    OverwriteOptions, resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option,
108    resolve_secret_ref_in_with_options, resolve_source_refresh_mode_in_with_option,
109};
110use crate::{OptimizerContext, WithOptions, WithOptionsSecResolved, bind_data_type, build_graph};
111
112mod external_schema;
113pub use external_schema::{
114    bind_columns_from_source, get_schema_location, schema_has_schema_registry,
115};
116mod validate;
117pub use validate::validate_compatibility;
118use validate::{SOURCE_ALLOWED_CONNECTION_CONNECTOR, SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY};
119mod additional_column;
120use additional_column::check_and_add_timestamp_column;
121pub use additional_column::handle_addition_columns;
122use risingwave_common::catalog::ICEBERG_SOURCE_PREFIX;
123use risingwave_common::id::SourceId;
124
125use crate::stream_fragmenter::GraphJobType;
126
127fn non_generated_sql_columns(columns: &[ColumnDef]) -> Vec<ColumnDef> {
128    columns
129        .iter()
130        .filter(|c| !c.is_generated())
131        .cloned()
132        .collect()
133}
134
135fn try_consume_string_from_options(
136    format_encode_options: &mut BTreeMap<String, String>,
137    key: &str,
138) -> Option<AstString> {
139    format_encode_options.remove(key).map(AstString)
140}
141
142fn try_consume_schema_registry_config_from_options(
143    format_encode_options: &mut BTreeMap<String, String>,
144) {
145    [
146        SCHEMA_REGISTRY_USERNAME,
147        SCHEMA_REGISTRY_PASSWORD,
148        SCHEMA_REGISTRY_CA_PEM_PATH,
149        SCHEMA_REGISTRY_MAX_DELAY_KEY,
150        SCHEMA_REGISTRY_BACKOFF_DURATION_KEY,
151        SCHEMA_REGISTRY_BACKOFF_FACTOR_KEY,
152        SCHEMA_REGISTRY_RETRIES_MAX_KEY,
153    ]
154    .iter()
155    .for_each(|key| {
156        try_consume_string_from_options(format_encode_options, key);
157    });
158}
159
160fn consume_string_from_options(
161    format_encode_options: &mut BTreeMap<String, String>,
162    key: &str,
163) -> Result<AstString> {
164    try_consume_string_from_options(format_encode_options, key).ok_or(RwError::from(ProtocolError(
165        format!("missing field {} in options", key),
166    )))
167}
168
169fn consume_aws_config_from_options(format_encode_options: &mut BTreeMap<String, String>) {
170    format_encode_options.retain(|key, _| !key.starts_with("aws."))
171}
172
173#[derive(Debug, Clone, Copy, PartialEq, Eq)]
174pub enum CreateSourceType {
175    SharedCdc,
176    /// e.g., shared Kafka source
177    SharedNonCdc,
178    NonShared,
179    /// create table with connector
180    Table,
181}
182
183impl CreateSourceType {
184    /// Note: shouldn't be used for `ALTER SOURCE`, since session variables should not affect existing source. We should respect the original type instead.
185    pub fn for_newly_created(
186        session: &SessionImpl,
187        with_properties: &impl WithPropertiesExt,
188    ) -> Self {
189        if with_properties.is_shareable_cdc_connector() {
190            CreateSourceType::SharedCdc
191        } else if with_properties.is_shareable_non_cdc_connector()
192            && session
193                .env()
194                .streaming_config()
195                .developer
196                .enable_shared_source
197            && session.config().streaming_use_shared_source()
198        {
199            CreateSourceType::SharedNonCdc
200        } else {
201            CreateSourceType::NonShared
202        }
203    }
204
205    pub fn for_replace(catalog: &SourceCatalog) -> Self {
206        if !catalog.info.is_shared() {
207            CreateSourceType::NonShared
208        } else if catalog.with_properties.is_shareable_cdc_connector() {
209            CreateSourceType::SharedCdc
210        } else {
211            CreateSourceType::SharedNonCdc
212        }
213    }
214
215    pub fn is_shared(&self) -> bool {
216        matches!(
217            self,
218            CreateSourceType::SharedCdc | CreateSourceType::SharedNonCdc
219        )
220    }
221}
222
223/// Bind columns from both source and sql defined.
224pub(crate) fn bind_all_columns(
225    format_encode: &FormatEncodeOptions,
226    cols_from_source: Option<Vec<ColumnCatalog>>,
227    cols_from_sql: Vec<ColumnCatalog>,
228    col_defs_from_sql: &[ColumnDef],
229    wildcard_idx: Option<usize>,
230    sql_column_strategy: SqlColumnStrategy,
231) -> Result<Vec<ColumnCatalog>> {
232    if let Some(cols_from_source) = cols_from_source {
233        // Need to check `col_defs` to see if a column is generated, as we haven't bind the
234        // `GeneratedColumnDesc` in `ColumnCatalog` yet.
235        let generated_cols_from_sql = cols_from_sql
236            .iter()
237            .filter(|c| {
238                col_defs_from_sql
239                    .iter()
240                    .find(|d| d.name.real_value() == c.name())
241                    .unwrap()
242                    .is_generated()
243            })
244            .cloned()
245            .collect_vec();
246
247        #[allow(clippy::collapsible_else_if)]
248        match sql_column_strategy {
249            // Ignore `cols_from_source`, follow `cols_from_sql` without checking.
250            SqlColumnStrategy::FollowUnchecked => {
251                assert!(
252                    wildcard_idx.is_none(),
253                    "wildcard still exists while strategy is Follows, not correctly purified?"
254                );
255                return Ok(cols_from_sql);
256            }
257
258            // Will merge `generated_cols_from_sql` into `cols_from_source`.
259            SqlColumnStrategy::Ignore => {}
260
261            SqlColumnStrategy::FollowChecked => {
262                let has_regular_cols_from_sql =
263                    generated_cols_from_sql.len() != cols_from_sql.len();
264
265                if has_regular_cols_from_sql {
266                    if wildcard_idx.is_some() {
267                        // (*, normal_column INT)
268                        return Err(RwError::from(NotSupported(
269                            "When there's a wildcard (\"*\"), \
270                             only generated columns are allowed in user-defined schema from SQL"
271                                .to_owned(),
272                            "Remove the non-generated columns".to_owned(),
273                        )));
274                    } else {
275                        // (normal_column INT)
276                        // Follow `cols_from_sql` with name & type checking.
277                        for col in &cols_from_sql {
278                            if generated_cols_from_sql.contains(col) {
279                                continue;
280                            }
281                            let Some(col_from_source) =
282                                cols_from_source.iter().find(|c| c.name() == col.name())
283                            else {
284                                return Err(RwError::from(ProtocolError(format!(
285                                    "Column \"{}\" is defined in SQL but not found in the source",
286                                    col.name()
287                                ))));
288                            };
289
290                            if col_from_source.data_type() != col.data_type() {
291                                return Err(RwError::from(ProtocolError(format!(
292                                    "Data type mismatch for column \"{}\". \
293                                     Defined in SQL as \"{}\", but found in the source as \"{}\"",
294                                    col.name(),
295                                    col.data_type(),
296                                    col_from_source.data_type()
297                                ))));
298                            }
299                        }
300                        return Ok(cols_from_sql);
301                    }
302                } else {
303                    if wildcard_idx.is_some() {
304                        // (*)
305                        // (*, generated_column INT)
306                        // Good, expand the wildcard later.
307                    } else {
308                        // ()
309                        // (generated_column INT)
310                        // Interpreted as if there's a wildcard for backward compatibility.
311                        // TODO: the behavior is not that consistent, making it impossible to ingest no
312                        //       columns from the source (though not useful in practice). Currently we
313                        //       just notice the user but we may want to interpret it as empty columns
314                        //       in the future.
315                        notice_to_user("\
316                            Neither wildcard (\"*\") nor regular (non-generated) columns appear in the user-defined schema from SQL. \
317                            For backward compatibility, all columns from the source will be included at the beginning. \
318                            For clarity, consider adding a wildcard (\"*\") to indicate where the columns from the source should be included, \
319                            or specifying the columns you want to include from the source.
320                        ");
321                    }
322                }
323            }
324        }
325
326        // In some cases the wildcard may be absent:
327        // - plan based on a purified SQL
328        // - interpret `()` as `(*)` for backward compatibility (see notice above)
329        // Default to 0 to expand the wildcard at the beginning.
330        let wildcard_idx = wildcard_idx.unwrap_or(0).min(generated_cols_from_sql.len());
331
332        // Merge `generated_cols_from_sql` with `cols_from_source`.
333        let mut merged_cols = generated_cols_from_sql;
334        let merged_cols_r = merged_cols.split_off(wildcard_idx);
335        merged_cols.extend(cols_from_source);
336        merged_cols.extend(merged_cols_r);
337
338        Ok(merged_cols)
339    } else {
340        if wildcard_idx.is_some() {
341            return Err(RwError::from(NotSupported(
342                "Wildcard in user-defined schema is only allowed when there exists columns from external schema".to_owned(),
343                "Remove the wildcard or use a source with external schema".to_owned(),
344            )));
345        }
346        let non_generated_sql_defined_columns = non_generated_sql_columns(col_defs_from_sql);
347
348        match (&format_encode.format, &format_encode.row_encode) {
349            (Format::DebeziumMongo, Encode::Json) => {
350                let strong_schema = format_encode
351                    .row_options
352                    .iter()
353                    .find(|k| k.name.real_value().to_lowercase() == CDC_MONGODB_STRONG_SCHEMA_KEY)
354                    .map(|k| matches!(k.value, SqlOptionValue::Value(Value::Boolean(true))))
355                    .unwrap_or(false);
356
357                // strong schema requires a '_id' column at the first position with a specific type
358                if strong_schema {
359                    let (_, id_column) = non_generated_sql_defined_columns
360                        .iter()
361                        .enumerate()
362                        .find(|(idx, col)| *idx == 0 && col.name.real_value() == "_id")
363                        .ok_or_else(|| {
364                            RwError::from(ProtocolError(
365                                "The `_id` column of the source with row format DebeziumMongoJson must be defined as the first column in SQL".to_owned(),
366                            ))
367                        })?;
368
369                    let id_data_type = bind_data_type(id_column.data_type.as_ref().unwrap())?;
370                    if !matches!(
371                        id_data_type,
372                        DataType::Varchar | DataType::Int32 | DataType::Int64 | DataType::Jsonb
373                    ) {
374                        return Err(RwError::from(ProtocolError(
375                            "the `_id` column of the source with row format DebeziumMongoJson must be [Jsonb | Varchar | Int32 | Int64]".to_owned(),
376                        )));
377                    }
378
379                    let mut columns = Vec::with_capacity(non_generated_sql_defined_columns.len());
380                    columns.push(
381                        // id column
382                        ColumnCatalog {
383                            column_desc: ColumnDesc::named("_id", 0.into(), id_data_type),
384                            is_hidden: false,
385                        },
386                    );
387
388                    // bind rest of the columns
389                    for (idx, col) in non_generated_sql_defined_columns
390                        .into_iter()
391                        // skip the first column
392                        .skip(1)
393                        .enumerate()
394                    {
395                        columns.push(ColumnCatalog {
396                            column_desc: ColumnDesc::named(
397                                col.name.real_value(),
398                                (idx as i32).into(),
399                                bind_data_type(col.data_type.as_ref().unwrap())?,
400                            ),
401                            is_hidden: false,
402                        });
403                    }
404
405                    return Ok(columns);
406                }
407
408                let mut columns = vec![
409                    ColumnCatalog {
410                        column_desc: ColumnDesc::named("_id", 0.into(), DataType::Varchar),
411                        is_hidden: false,
412                    },
413                    ColumnCatalog {
414                        column_desc: ColumnDesc::named("payload", 0.into(), DataType::Jsonb),
415                        is_hidden: false,
416                    },
417                ];
418
419                if non_generated_sql_defined_columns.len() != 2
420                    || non_generated_sql_defined_columns[0].name.real_value() != columns[0].name()
421                    || non_generated_sql_defined_columns[1].name.real_value() != columns[1].name()
422                {
423                    return Err(RwError::from(ProtocolError(
424                        "the not generated columns of the source with row format DebeziumMongoJson
425        must be (_id [Jsonb | Varchar | Int32 | Int64], payload jsonb)."
426                            .to_owned(),
427                    )));
428                }
429                // ok to unwrap since it was checked at `bind_sql_columns`
430                let key_data_type = bind_data_type(
431                    non_generated_sql_defined_columns[0]
432                        .data_type
433                        .as_ref()
434                        .unwrap(),
435                )?;
436                match key_data_type {
437                    DataType::Jsonb | DataType::Varchar | DataType::Int32 | DataType::Int64 => {
438                        columns[0].column_desc.data_type = key_data_type;
439                    }
440                    _ => {
441                        return Err(RwError::from(ProtocolError(
442                            "the `_id` column of the source with row format DebeziumMongoJson
443        must be [Jsonb | Varchar | Int32 | Int64]"
444                                .to_owned(),
445                        )));
446                    }
447                }
448
449                // ok to unwrap since it was checked at `bind_sql_columns`
450                let value_data_type = bind_data_type(
451                    non_generated_sql_defined_columns[1]
452                        .data_type
453                        .as_ref()
454                        .unwrap(),
455                )?;
456                if !matches!(value_data_type, DataType::Jsonb) {
457                    return Err(RwError::from(ProtocolError(
458                        "the `payload` column of the source with row format DebeziumMongoJson
459        must be Jsonb datatype"
460                            .to_owned(),
461                    )));
462                }
463                Ok(columns)
464            }
465            (Format::Plain, Encode::Bytes) => {
466                let err = Err(RwError::from(ProtocolError(
467                    "ENCODE BYTES only accepts one BYTEA type column".to_owned(),
468                )));
469                if non_generated_sql_defined_columns.len() == 1 {
470                    // ok to unwrap `data_type`` since it was checked at `bind_sql_columns`
471                    let col_data_type = bind_data_type(
472                        non_generated_sql_defined_columns[0]
473                            .data_type
474                            .as_ref()
475                            .unwrap(),
476                    )?;
477                    if col_data_type == DataType::Bytea {
478                        Ok(cols_from_sql)
479                    } else {
480                        err
481                    }
482                } else {
483                    err
484                }
485            }
486            (_, _) => Ok(cols_from_sql),
487        }
488    }
489}
490
491/// TODO: perhaps put the hint in notice is better. The error message format might be not that reliable.
492fn hint_format_encode(format_encode: &FormatEncodeOptions) -> String {
493    format!(
494        r#"Hint: For FORMAT {0} ENCODE {1}, INCLUDE KEY must be specified and the key column must be used as primary key.
495example:
496    CREATE TABLE <table_name> ( PRIMARY KEY ([rw_key | <key_name>]) )
497    INCLUDE KEY [AS <key_name>]
498    WITH (...)
499    FORMAT {0} ENCODE {1}{2}
500"#,
501        format_encode.format,
502        format_encode.row_encode,
503        if format_encode.row_encode == Encode::Json || format_encode.row_encode == Encode::Bytes {
504            "".to_owned()
505        } else {
506            " (...)".to_owned()
507        }
508    )
509}
510
511/// Bind column from source. Add key column to table columns if necessary.
512/// Return `pk_names`.
513pub(crate) async fn bind_source_pk(
514    format_encode: &FormatEncodeOptions,
515    source_info: &StreamSourceInfo,
516    columns: &mut [ColumnCatalog],
517    sql_defined_pk_names: Vec<String>,
518    with_properties: &WithOptionsSecResolved,
519) -> Result<Vec<String>> {
520    let sql_defined_pk = !sql_defined_pk_names.is_empty();
521    let include_key_column_name: Option<String> = {
522        // iter columns to check if contains additional columns from key part
523        // return the key column names if exists
524        columns.iter().find_map(|catalog| {
525            if matches!(
526                catalog.column_desc.additional_column.column_type,
527                Some(AdditionalColumnType::Key(_))
528            ) {
529                Some(catalog.name().to_owned())
530            } else {
531                None
532            }
533        })
534    };
535    let additional_column_names = columns
536        .iter()
537        .filter_map(|col| {
538            if col.column_desc.additional_column.column_type.is_some() {
539                Some(col.name().to_owned())
540            } else {
541                None
542            }
543        })
544        .collect_vec();
545
546    let res = match (&format_encode.format, &format_encode.row_encode) {
547        (Format::Native, Encode::Native) | (Format::None, Encode::None) | (Format::Plain, _) => {
548            sql_defined_pk_names
549        }
550
551        // For all Upsert formats, we only accept one and only key column as primary key.
552        // Additional KEY columns must be set in this case and must be primary key.
553        (Format::Upsert, Encode::Json | Encode::Avro | Encode::Protobuf) => {
554            if let Some(ref key_column_name) = include_key_column_name
555                && sql_defined_pk
556            {
557                // pk is set. check if it's valid
558
559                // the column name have been converted to real value in `handle_addition_columns`
560                // so we don't ignore ascii case here
561                if sql_defined_pk_names.len() != 1
562                    || !key_column_name.eq(sql_defined_pk_names[0].as_str())
563                {
564                    return Err(RwError::from(ProtocolError(format!(
565                        "Only \"{}\" can be used as primary key\n\n{}",
566                        key_column_name,
567                        hint_format_encode(format_encode)
568                    ))));
569                }
570                sql_defined_pk_names
571            } else {
572                // pk not set, or even key not included
573                return if let Some(include_key_column_name) = include_key_column_name {
574                    Err(RwError::from(ProtocolError(format!(
575                        "Primary key must be specified to {}\n\n{}",
576                        include_key_column_name,
577                        hint_format_encode(format_encode)
578                    ))))
579                } else {
580                    Err(RwError::from(ProtocolError(format!(
581                        "INCLUDE KEY clause not set\n\n{}",
582                        hint_format_encode(format_encode)
583                    ))))
584                };
585            }
586        }
587
588        (Format::Debezium, Encode::Json) => {
589            if !additional_column_names.is_empty() {
590                return Err(RwError::from(ProtocolError(format!(
591                    "FORMAT DEBEZIUM forbids additional columns, but got {:?}",
592                    additional_column_names
593                ))));
594            }
595            if !sql_defined_pk {
596                return Err(RwError::from(ProtocolError(
597                    "Primary key must be specified when creating source with FORMAT DEBEZIUM."
598                        .to_owned(),
599                )));
600            }
601            sql_defined_pk_names
602        }
603        (Format::Debezium, Encode::Avro) => {
604            if !additional_column_names.is_empty() {
605                return Err(RwError::from(ProtocolError(format!(
606                    "FORMAT DEBEZIUM forbids additional columns, but got {:?}",
607                    additional_column_names
608                ))));
609            }
610            if sql_defined_pk {
611                sql_defined_pk_names
612            } else {
613                let pk_names =
614                    extract_debezium_avro_table_pk_columns(source_info, with_properties).await?;
615                // extract pk(s) from schema registry
616                for pk_name in &pk_names {
617                    columns
618                        .iter()
619                        .find(|c: &&ColumnCatalog| c.name().eq(pk_name))
620                        .ok_or_else(|| {
621                            RwError::from(ProtocolError(format!(
622                                "avro's key column {} not exists in avro's row schema",
623                                pk_name
624                            )))
625                        })?;
626                }
627                pk_names
628            }
629        }
630        (Format::DebeziumMongo, Encode::Json) => {
631            if sql_defined_pk {
632                sql_defined_pk_names
633            } else {
634                vec!["_id".to_owned()]
635            }
636        }
637
638        (Format::Maxwell, Encode::Json) => {
639            if !additional_column_names.is_empty() {
640                return Err(RwError::from(ProtocolError(format!(
641                    "FORMAT MAXWELL forbids additional columns, but got {:?}",
642                    additional_column_names
643                ))));
644            }
645            if !sql_defined_pk {
646                return Err(RwError::from(ProtocolError(
647    "Primary key must be specified when creating source with FORMAT MAXWELL ENCODE JSON.".to_owned(),
648    )));
649            }
650            sql_defined_pk_names
651        }
652
653        (Format::Canal, Encode::Json) => {
654            if !additional_column_names.is_empty() {
655                return Err(RwError::from(ProtocolError(format!(
656                    "FORMAT CANAL forbids additional columns, but got {:?}",
657                    additional_column_names
658                ))));
659            }
660            if !sql_defined_pk {
661                return Err(RwError::from(ProtocolError(
662    "Primary key must be specified when creating source with FORMAT CANAL ENCODE JSON.".to_owned(),
663    )));
664            }
665            sql_defined_pk_names
666        }
667        (format, encoding) => {
668            return Err(RwError::from(ProtocolError(format!(
669                "Unknown combination {:?} {:?}",
670                format, encoding
671            ))));
672        }
673    };
674    Ok(res)
675}
676
677pub(super) fn bind_source_watermark(
678    session: &SessionImpl,
679    name: String,
680    source_watermarks: Vec<SourceWatermark>,
681    column_catalogs: &[ColumnCatalog],
682) -> Result<Vec<WatermarkDesc>> {
683    let mut binder = Binder::new_for_ddl(session);
684    binder.bind_columns_to_context(name.clone(), column_catalogs)?;
685
686    let mut session_tz = SessionTimezone::new(session.config().timezone());
687
688    let watermark_descs = source_watermarks
689        .into_iter()
690        .map(|source_watermark| {
691            let col_name = source_watermark.column.real_value();
692            let watermark_idx = binder.get_column_binding_index(name.clone(), &col_name)?;
693
694            let expr = binder.bind_expr(&source_watermark.expr)?;
695            // Apply session timezone rewrite so that timestamptz +/- interval operations
696            // are transformed into the timezone-aware variant (e.g. `subtract_with_time_zone`).
697            // Without this, intervals containing days/months would fail at runtime.
698            let expr = session_tz.rewrite_expr(expr);
699            let watermark_col_type = column_catalogs[watermark_idx].data_type();
700            let watermark_expr_type = &expr.return_type();
701            if watermark_col_type != watermark_expr_type {
702                Err(RwError::from(ErrorCode::BindError(
703                    format!("The return value type of the watermark expression must be identical to the watermark column data type. Current data type of watermark return value: `{}`, column `{}`",watermark_expr_type, watermark_col_type),
704                )))
705            } else {
706                let expr_proto = expr.to_expr_proto();
707                Ok::<_, RwError>(WatermarkDesc {
708                    watermark_idx: watermark_idx as u32,
709                    expr: Some(expr_proto),
710                    with_ttl: source_watermark.with_ttl,
711                })
712            }
713        })
714        .try_collect()?;
715    Ok(watermark_descs)
716}
717
718/// Performs early stage checking in frontend to see if the schema of the given `columns` is
719/// compatible with the connector extracted from the properties.
720///
721/// One should only call this function after all properties of all columns are resolved, like
722/// generated column descriptors.
723pub(super) fn check_format_encode(
724    props: &WithOptionsSecResolved,
725    row_id_index: Option<usize>,
726    columns: &[ColumnCatalog],
727) -> Result<()> {
728    let Some(connector) = props.get_connector() else {
729        return Ok(());
730    };
731
732    if connector == NEXMARK_CONNECTOR {
733        check_nexmark_schema(props, row_id_index, columns)
734    } else {
735        Ok(())
736    }
737}
738
739pub fn bind_connector_props(
740    handler_args: &HandlerArgs,
741    format_encode: &FormatEncodeOptions,
742    is_create_source: bool,
743) -> Result<(WithOptions, SourceRefreshMode)> {
744    let mut with_properties = handler_args.with_options.clone().into_connector_props();
745    validate_compatibility(format_encode, &mut with_properties)?;
746    let refresh_mode = {
747        let refresh_mode = resolve_source_refresh_mode_in_with_option(&mut with_properties)?;
748        if is_create_source && refresh_mode.is_some() {
749            return Err(RwError::from(ProtocolError(
750                "`refresh_mode` only supported for CREATE TABLE".to_owned(),
751            )));
752        }
753
754        refresh_mode.unwrap_or(SourceRefreshMode {
755            refresh_mode: Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})),
756        })
757    };
758
759    let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
760
761    if !is_create_source && with_properties.is_shareable_only_cdc_connector() {
762        return Err(RwError::from(ProtocolError(format!(
763            "connector {} does not support `CREATE TABLE`, please use `CREATE SOURCE` instead",
764            with_properties.get_connector().unwrap(),
765        ))));
766    }
767    if is_create_source && create_cdc_source_job {
768        if let Some(value) = with_properties.get(AUTO_SCHEMA_CHANGE_KEY)
769            && value.parse::<bool>().map_err(|_| {
770                ErrorCode::InvalidInputSyntax(format!(
771                    "invalid value of '{}' option",
772                    AUTO_SCHEMA_CHANGE_KEY
773                ))
774            })?
775        {
776            Feature::CdcAutoSchemaChange.check_available()?;
777        }
778
779        // set connector to backfill mode
780        with_properties.insert(CDC_SNAPSHOT_MODE_KEY.into(), CDC_SNAPSHOT_BACKFILL.into());
781        // enable cdc sharing mode, which will capture all tables in the given `database.name`
782        with_properties.insert(CDC_SHARING_MODE_KEY.into(), "true".into());
783        // enable transactional cdc
784        if with_properties.enable_transaction_metadata() {
785            with_properties.insert(CDC_TRANSACTIONAL_KEY.into(), "true".into());
786        }
787        // Only set CDC_WAIT_FOR_STREAMING_START_TIMEOUT if not already specified by user.
788        if !with_properties.contains_key(CDC_WAIT_FOR_STREAMING_START_TIMEOUT) {
789            with_properties.insert(
790                CDC_WAIT_FOR_STREAMING_START_TIMEOUT.into(),
791                handler_args
792                    .session
793                    .config()
794                    .cdc_source_wait_streaming_start_timeout()
795                    .to_string(),
796            );
797        }
798    }
799    if with_properties.is_mysql_cdc_connector() {
800        // Generate a random server id for mysql cdc source if needed
801        // `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication
802        // group (that is, different from any other server id being used by any master or slave)
803        with_properties
804            .entry("server.id".to_owned())
805            .or_insert(rand::rng().random_range(1..u32::MAX).to_string());
806    }
807    Ok((with_properties, refresh_mode))
808}
809
810/// When the schema can be inferred from external system (like schema registry),
811/// how to handle the regular columns (i.e., non-generated) defined in SQL?
812pub enum SqlColumnStrategy {
813    /// Follow all columns defined in SQL, ignore the columns from external system.
814    /// This ensures that no accidental side effect will change the schema.
815    ///
816    /// This is the behavior when re-planning the target table of `SINK INTO`.
817    FollowUnchecked,
818
819    /// Follow all column defined in SQL, check the columns from external system with name & type.
820    /// This ensures that no accidental side effect will change the schema.
821    ///
822    /// This is the behavior when creating a new table or source with a resolvable schema;
823    /// adding, or dropping columns on that table.
824    // TODO(purify): `ALTER SOURCE` currently has its own code path and does not check.
825    FollowChecked,
826
827    /// Merge the generated columns defined in SQL and columns from external system. If there
828    /// are also regular columns defined in SQL, ignore silently.
829    ///
830    /// This is the behavior when `REFRESH SCHEMA` atop the purified SQL.
831    Ignore,
832}
833
834/// Entrypoint for binding source connector.
835/// Common logic shared by `CREATE SOURCE` and `CREATE TABLE`.
836#[allow(clippy::too_many_arguments)]
837pub async fn bind_create_source_or_table_with_connector(
838    handler_args: HandlerArgs,
839    full_name: ObjectName,
840    format_encode: FormatEncodeOptions,
841    with_properties: WithOptions,
842    sql_columns_defs: &[ColumnDef],
843    constraints: Vec<TableConstraint>,
844    wildcard_idx: Option<usize>,
845    source_watermarks: Vec<SourceWatermark>,
846    columns_from_resolve_source: Option<Vec<ColumnCatalog>>,
847    source_info: StreamSourceInfo,
848    include_column_options: IncludeOption,
849    col_id_gen: &mut ColumnIdGenerator,
850    create_source_type: CreateSourceType,
851    source_rate_limit: Option<u32>,
852    sql_column_strategy: SqlColumnStrategy,
853    refresh_mode: SourceRefreshMode,
854) -> Result<SourceCatalog> {
855    let session = &handler_args.session;
856    let db_name: &str = &session.database();
857    let (schema_name, source_name) = Binder::resolve_schema_qualified_name(db_name, &full_name)?;
858    let (database_id, schema_id) =
859        session.get_database_and_schema_id_for_create(schema_name.clone())?;
860
861    let is_create_source = create_source_type != CreateSourceType::Table;
862
863    if is_create_source {
864        // reject refreshable batch source
865        if with_properties.is_batch_connector() {
866            return Err(ErrorCode::BindError(
867            "can't CREATE SOURCE with refreshable batch connector\n\nHint: use CREATE TABLE instead"
868                .to_owned(),
869        )
870        .into());
871        }
872
873        match format_encode.format {
874            // reject unsupported formats for CREATE SOURCE
875            Format::Debezium | Format::DebeziumMongo | Format::Maxwell | Format::Canal => {
876                return Err(ErrorCode::BindError(format!(
877                    "can't CREATE SOURCE with FORMAT {}.\n\nHint: use CREATE TABLE instead\n\n{}",
878                    format_encode.format,
879                    hint_format_encode(&format_encode)
880                ))
881                .into());
882            }
883            // hint limitations for some other formats
884            Format::Upsert => {
885                notice_to_user(format!(
886                    "Streaming queries on sources with `FORMAT {}` may have limitations. If your query isn't supported, consider using `CREATE TABLE` instead.",
887                    format_encode.format
888                ));
889            }
890            _ => {}
891        }
892    }
893
894    let sql_pk_names = bind_sql_pk_names(sql_columns_defs, bind_table_constraints(&constraints)?)?;
895
896    // FIXME: ideally we can support it, but current way of handling iceberg additional columns are problematic.
897    // They are treated as normal user columns, so they will be lost if we allow user to specify columns.
898    // See `extract_iceberg_columns`
899    if with_properties.is_iceberg_connector() && !sql_columns_defs.is_empty() {
900        return Err(RwError::from(InvalidInputSyntax(
901            r#"Schema is automatically inferred for iceberg source and should not be specified
902
903HINT: use `CREATE SOURCE <name> WITH (...)` instead of `CREATE SOURCE <name> (<columns>) WITH (...)`."#.to_owned(),
904        )));
905    }
906
907    // Same for ADBC Snowflake connector - schema is automatically inferred
908    if with_properties.is_batch_connector()
909        && with_properties
910            .get(UPSTREAM_SOURCE_KEY)
911            .is_some_and(|s| s.eq_ignore_ascii_case(ADBC_SNOWFLAKE_CONNECTOR))
912        && !sql_columns_defs.is_empty()
913    {
914        return Err(RwError::from(InvalidInputSyntax(
915            r#"Schema is automatically inferred for ADBC Snowflake source and should not be specified
916
917HINT: use `CREATE TABLE <name> WITH (...)` instead of `CREATE TABLE <name> (<columns>) WITH (...)`."#.to_owned(),
918        )));
919    }
920    let columns_from_sql = bind_sql_columns(sql_columns_defs, false)?;
921
922    let mut columns = bind_all_columns(
923        &format_encode,
924        columns_from_resolve_source,
925        columns_from_sql,
926        sql_columns_defs,
927        wildcard_idx,
928        sql_column_strategy,
929    )?;
930
931    // add additional columns before bind pk, because `format upsert` requires the key column
932    handle_addition_columns(
933        Some(&format_encode),
934        &with_properties,
935        include_column_options,
936        &mut columns,
937        false,
938    )?;
939
940    if columns.is_empty() {
941        return Err(RwError::from(ProtocolError(
942            "Schema definition is required, either from SQL or schema registry.".to_owned(),
943        )));
944    }
945
946    // compatible with the behavior that add a hidden column `_rw_kafka_timestamp` to each message from Kafka source
947    if is_create_source {
948        // must behind `handle_addition_columns`
949        check_and_add_timestamp_column(&with_properties, &mut columns);
950
951        // For shared sources, we will include partition and offset cols in the SourceExecutor's *output*, to be used by the SourceBackfillExecutor.
952        // For shared CDC source, the schema is different. See ColumnCatalog::debezium_cdc_source_cols(), CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS
953        if create_source_type == CreateSourceType::SharedNonCdc {
954            let (columns_exist, additional_columns) = source_add_partition_offset_cols(
955                &columns,
956                &with_properties.get_connector().unwrap(),
957                true, // col_id filled below at col_id_gen.generate
958            );
959            for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) {
960                if !existed {
961                    columns.push(ColumnCatalog::hidden(c));
962                }
963            }
964        }
965    }
966
967    // resolve privatelink connection for Kafka
968    let mut with_properties = with_properties;
969    resolve_privatelink_in_with_option(&mut with_properties)?;
970
971    // check the system parameter `enforce_secret`
972    if session
973        .env()
974        .system_params_manager()
975        .get_params()
976        .load()
977        .enforce_secret()
978        && Feature::SecretManagement.check_available().is_ok()
979    {
980        // check enforce using secret for some props on cloud
981        ConnectorProperties::enforce_secret_source(&with_properties)?;
982    }
983
984    let (with_properties, connection_type, connector_conn_ref) =
985        resolve_connection_ref_and_secret_ref(
986            with_properties,
987            session,
988            Some(TelemetryDatabaseObject::Source),
989        )?;
990    ensure_connection_type_allowed(connection_type, &SOURCE_ALLOWED_CONNECTION_CONNECTOR)?;
991
992    // if not using connection, we don't need to check connector match connection type
993    if !matches!(connection_type, PbConnectionType::Unspecified) {
994        let Some(connector) = with_properties.get_connector() else {
995            return Err(RwError::from(ProtocolError(format!(
996                "missing field '{}' in WITH clause",
997                UPSTREAM_SOURCE_KEY
998            ))));
999        };
1000        check_connector_match_connection_type(connector.as_str(), &connection_type)?;
1001    }
1002
1003    let pk_names = bind_source_pk(
1004        &format_encode,
1005        &source_info,
1006        &mut columns,
1007        sql_pk_names,
1008        &with_properties,
1009    )
1010    .await?;
1011
1012    // User may specify a generated or additional column with the same name as one from the external schema.
1013    // Ensure duplicated column names are handled here.
1014    if let Some(duplicated_name) = columns.iter().map(|c| c.name()).duplicates().next() {
1015        return Err(ErrorCode::InvalidInputSyntax(format!(
1016            "column \"{}\" specified more than once",
1017            duplicated_name
1018        ))
1019        .into());
1020    }
1021
1022    // XXX: why do we use col_id_gen here? It doesn't seem to be very necessary.
1023    for c in &mut columns {
1024        let original_data_type = c.data_type().clone();
1025        col_id_gen.generate(c)?;
1026        // TODO: Now we restore the data type for `CREATE SOURCE`, so that keep the nested field id unset.
1027        //       This behavior is inconsistent with `CREATE TABLE`, and should be fixed once we refactor
1028        //       `ALTER SOURCE` to also use `ColumnIdGenerator` in the future.
1029        if is_create_source {
1030            c.column_desc.data_type = original_data_type;
1031        }
1032    }
1033    debug_assert_column_ids_distinct(&columns);
1034
1035    let (mut columns, pk_col_ids, row_id_index) =
1036        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
1037
1038    let watermark_descs =
1039        bind_source_watermark(session, source_name.clone(), source_watermarks, &columns)?;
1040    // TODO(yuhao): allow multiple watermark on source.
1041    assert!(watermark_descs.len() <= 1);
1042    if is_create_source && watermark_descs.iter().any(|d| d.with_ttl) {
1043        return Err(ErrorCode::NotSupported(
1044            "WITH TTL is not supported in WATERMARK clause for CREATE SOURCE.".to_owned(),
1045            "Use `CREATE TABLE ... WATERMARK ... WITH TTL` instead.".to_owned(),
1046        )
1047        .into());
1048    }
1049
1050    let append_only = row_id_index.is_some();
1051    if is_create_source && !append_only && !watermark_descs.is_empty() {
1052        return Err(ErrorCode::NotSupported(
1053            "Defining watermarks on source requires the source connector to be append only."
1054                .to_owned(),
1055            "Use the key words `FORMAT PLAIN`".to_owned(),
1056        )
1057        .into());
1058    }
1059
1060    bind_sql_column_constraints(
1061        session,
1062        source_name.clone(),
1063        &mut columns,
1064        // TODO(st1page): pass the ref
1065        sql_columns_defs,
1066        &pk_col_ids,
1067    )?;
1068    check_format_encode(&with_properties, row_id_index, &columns)?;
1069
1070    let definition = handler_args.normalized_sql.clone();
1071
1072    let associated_table_id = if is_create_source {
1073        None
1074    } else {
1075        Some(TableId::placeholder())
1076    };
1077    let source = SourceCatalog {
1078        id: SourceId::placeholder(),
1079        name: source_name,
1080        schema_id,
1081        database_id,
1082        columns,
1083        pk_col_ids,
1084        append_only,
1085        owner: session.user_id(),
1086        info: source_info,
1087        row_id_index,
1088        with_properties,
1089        watermark_descs,
1090        associated_table_id,
1091        definition,
1092        connection_id: connector_conn_ref,
1093        created_at_epoch: None,
1094        initialized_at_epoch: None,
1095        version: INITIAL_SOURCE_VERSION_ID,
1096        created_at_cluster_version: None,
1097        initialized_at_cluster_version: None,
1098        rate_limit: source_rate_limit,
1099        refresh_mode: Some(refresh_mode),
1100    };
1101    Ok(source)
1102}
1103
1104pub async fn handle_create_source(
1105    mut handler_args: HandlerArgs,
1106    stmt: CreateSourceStatement,
1107) -> Result<RwPgResponse> {
1108    let session = handler_args.session.clone();
1109    let overwrite_options = OverwriteOptions::new(&mut handler_args);
1110
1111    if let Either::Right(resp) = session.check_relation_name_duplicated(
1112        stmt.source_name.clone(),
1113        StatementType::CREATE_SOURCE,
1114        stmt.if_not_exists,
1115    )? {
1116        return Ok(resp);
1117    }
1118
1119    if stmt
1120        .source_name
1121        .base_name()
1122        .starts_with(ICEBERG_SOURCE_PREFIX)
1123    {
1124        return Err(RwError::from(InvalidInputSyntax(format!(
1125            "Source name cannot start with reserved prefix '{}'",
1126            ICEBERG_SOURCE_PREFIX
1127        ))));
1128    }
1129
1130    if handler_args.with_options.is_empty() {
1131        return Err(RwError::from(InvalidInputSyntax(
1132            "missing WITH clause".to_owned(),
1133        )));
1134    }
1135
1136    let format_encode = stmt.format_encode.into_v2_with_warning();
1137    let (with_properties, refresh_mode) =
1138        bind_connector_props(&handler_args, &format_encode, true)?;
1139
1140    let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
1141    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
1142        &session,
1143        &format_encode,
1144        Either::Left(&with_properties),
1145        create_source_type,
1146    )
1147    .await?;
1148    let mut col_id_gen = ColumnIdGenerator::new_initial();
1149
1150    if stmt.columns.iter().any(|col| {
1151        col.options
1152            .iter()
1153            .any(|def| matches!(def.option, ColumnOption::NotNull))
1154    }) {
1155        return Err(RwError::from(InvalidInputSyntax(
1156            "NOT NULL constraint is not supported in source schema".to_owned(),
1157        )));
1158    }
1159
1160    let source_catalog = bind_create_source_or_table_with_connector(
1161        handler_args.clone(),
1162        stmt.source_name,
1163        format_encode,
1164        with_properties,
1165        &stmt.columns,
1166        stmt.constraints,
1167        stmt.wildcard_idx,
1168        stmt.source_watermarks,
1169        columns_from_resolve_source,
1170        source_info,
1171        stmt.include_column_options,
1172        &mut col_id_gen,
1173        create_source_type,
1174        overwrite_options.source_rate_limit,
1175        SqlColumnStrategy::FollowChecked,
1176        refresh_mode,
1177    )
1178    .await?;
1179
1180    // If it is a temporary source, put it into SessionImpl.
1181    if stmt.temporary {
1182        if session.get_temporary_source(&source_catalog.name).is_some() {
1183            return Err(CatalogError::duplicated("source", source_catalog.name.clone()).into());
1184        }
1185        session.create_temporary_source(source_catalog);
1186        return Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE));
1187    }
1188
1189    let source = source_catalog.to_prost();
1190
1191    let catalog_writer = session.catalog_writer()?;
1192
1193    if create_source_type.is_shared() {
1194        let graph = generate_stream_graph_for_source(handler_args, source_catalog)?;
1195        catalog_writer
1196            .create_source(source, Some(graph), stmt.if_not_exists)
1197            .await?;
1198    } else {
1199        // For other sources we don't create a streaming job
1200        catalog_writer
1201            .create_source(source, None, stmt.if_not_exists)
1202            .await?;
1203    }
1204
1205    Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE))
1206}
1207
1208pub(super) fn generate_stream_graph_for_source(
1209    handler_args: HandlerArgs,
1210    source_catalog: SourceCatalog,
1211) -> Result<PbStreamFragmentGraph> {
1212    let context = OptimizerContext::from_handler_args(handler_args);
1213    let source_node = LogicalSource::with_catalog(
1214        Rc::new(source_catalog),
1215        SourceNodeKind::CreateSharedSource,
1216        context.into(),
1217        None,
1218    )?;
1219
1220    let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?;
1221    let graph = build_graph(stream_plan, Some(GraphJobType::Source))?;
1222    Ok(graph)
1223}
1224
1225#[cfg(test)]
1226pub mod tests {
1227    use std::collections::HashMap;
1228    use std::sync::Arc;
1229
1230    use risingwave_common::catalog::{
1231        DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME,
1232    };
1233    use risingwave_common::types::{DataType, StructType};
1234
1235    use crate::catalog::root_catalog::SchemaPath;
1236    use crate::catalog::source_catalog::SourceCatalog;
1237    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
1238
1239    const GET_COLUMN_FROM_CATALOG: fn(&Arc<SourceCatalog>) -> HashMap<&str, DataType> =
1240        |catalog: &Arc<SourceCatalog>| -> HashMap<&str, DataType> {
1241            catalog
1242                .columns
1243                .iter()
1244                .map(|col| (col.name(), col.data_type().clone()))
1245                .collect::<HashMap<&str, DataType>>()
1246        };
1247
1248    #[tokio::test]
1249    async fn test_create_source_handler() {
1250        let proto_file = create_proto_file(PROTO_FILE_DATA);
1251        let sql = format!(
1252            r#"CREATE SOURCE t
1253    WITH (connector = 'kinesis')
1254    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
1255            proto_file.path().to_str().unwrap()
1256        );
1257        let frontend = LocalFrontend::new(Default::default()).await;
1258        frontend.run_sql(sql).await.unwrap();
1259
1260        let session = frontend.session_ref();
1261        let catalog_reader = session.env().catalog_reader().read_guard();
1262        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1263
1264        // Check source exists.
1265        let (source, _) = catalog_reader
1266            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
1267            .unwrap();
1268        assert_eq!(source.name, "t");
1269
1270        let columns = GET_COLUMN_FROM_CATALOG(source);
1271
1272        let city_type = StructType::new(vec![
1273            ("address", DataType::Varchar),
1274            ("zipcode", DataType::Varchar),
1275        ])
1276        // .with_ids([5, 6].map(ColumnId::new))
1277        .into();
1278        let expected_columns = maplit::hashmap! {
1279            ROW_ID_COLUMN_NAME => DataType::Serial,
1280            "id" => DataType::Int32,
1281            "zipcode" => DataType::Int64,
1282            "rate" => DataType::Float32,
1283            "country" => StructType::new(
1284                vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
1285            )
1286            // .with_ids([3, 4, 7].map(ColumnId::new))
1287            .into(),
1288        };
1289        assert_eq!(columns, expected_columns, "{columns:#?}");
1290    }
1291
1292    #[tokio::test]
1293    async fn test_duplicate_props_options() {
1294        let proto_file = create_proto_file(PROTO_FILE_DATA);
1295        let sql = format!(
1296            r#"CREATE SOURCE t
1297    WITH (
1298        connector = 'kinesis',
1299        aws.region='user_test_topic',
1300        endpoint='172.10.1.1:9090,172.10.1.2:9090',
1301        aws.credentials.access_key_id = 'your_access_key_1',
1302        aws.credentials.secret_access_key = 'your_secret_key_1'
1303    )
1304    FORMAT PLAIN ENCODE PROTOBUF (
1305        message = '.test.TestRecord',
1306        aws.credentials.access_key_id = 'your_access_key_2',
1307        aws.credentials.secret_access_key = 'your_secret_key_2',
1308        schema.location = 'file://{}',
1309    )"#,
1310            proto_file.path().to_str().unwrap()
1311        );
1312        let frontend = LocalFrontend::new(Default::default()).await;
1313        frontend.run_sql(sql).await.unwrap();
1314
1315        let session = frontend.session_ref();
1316        let catalog_reader = session.env().catalog_reader().read_guard();
1317        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1318
1319        // Check source exists.
1320        let (source, _) = catalog_reader
1321            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
1322            .unwrap();
1323        assert_eq!(source.name, "t");
1324
1325        // AwsAuth params exist in options.
1326        assert_eq!(
1327            source
1328                .info
1329                .format_encode_options
1330                .get("aws.credentials.access_key_id")
1331                .unwrap(),
1332            "your_access_key_2"
1333        );
1334        assert_eq!(
1335            source
1336                .info
1337                .format_encode_options
1338                .get("aws.credentials.secret_access_key")
1339                .unwrap(),
1340            "your_secret_key_2"
1341        );
1342
1343        // AwsAuth params exist in props.
1344        assert_eq!(
1345            source
1346                .with_properties
1347                .get("aws.credentials.access_key_id")
1348                .unwrap(),
1349            "your_access_key_1"
1350        );
1351        assert_eq!(
1352            source
1353                .with_properties
1354                .get("aws.credentials.secret_access_key")
1355                .unwrap(),
1356            "your_secret_key_1"
1357        );
1358
1359        // Options are not merged into props.
1360        assert!(!source.with_properties.contains_key("schema.location"));
1361    }
1362
1363    #[tokio::test]
1364    async fn test_multi_table_cdc_create_source_handler() {
1365        let sql =
1366            "CREATE SOURCE t2 WITH (connector = 'mysql-cdc') FORMAT PLAIN ENCODE JSON".to_owned();
1367        let frontend = LocalFrontend::new(Default::default()).await;
1368        let session = frontend.session_ref();
1369
1370        frontend
1371            .run_sql_with_session(session.clone(), sql)
1372            .await
1373            .unwrap();
1374        let catalog_reader = session.env().catalog_reader().read_guard();
1375        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1376
1377        // Check source exists.
1378        let (source, _) = catalog_reader
1379            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t2")
1380            .unwrap();
1381        assert_eq!(source.name, "t2");
1382
1383        let columns = source
1384            .columns
1385            .iter()
1386            .map(|col| (col.name(), col.data_type().clone()))
1387            .collect::<Vec<(&str, DataType)>>();
1388
1389        expect_test::expect![[r#"
1390            [
1391                (
1392                    "payload",
1393                    Jsonb,
1394                ),
1395                (
1396                    "_rw_offset",
1397                    Varchar,
1398                ),
1399                (
1400                    "_rw_table_name",
1401                    Varchar,
1402                ),
1403                (
1404                    "_row_id",
1405                    Serial,
1406                ),
1407            ]
1408        "#]]
1409        .assert_debug_eq(&columns);
1410    }
1411
1412    #[tokio::test]
1413    async fn test_source_addition_columns() {
1414        // test derive include column for format plain
1415        let sql =
1416            "CREATE SOURCE s (v1 int) include key as _rw_kafka_key with (connector = 'kafka') format plain encode json".to_owned();
1417        let frontend = LocalFrontend::new(Default::default()).await;
1418        frontend.run_sql(sql).await.unwrap();
1419        let session = frontend.session_ref();
1420        let catalog_reader = session.env().catalog_reader().read_guard();
1421        let (source, _) = catalog_reader
1422            .get_source_by_name(
1423                DEFAULT_DATABASE_NAME,
1424                SchemaPath::Name(DEFAULT_SCHEMA_NAME),
1425                "s",
1426            )
1427            .unwrap();
1428        assert_eq!(source.name, "s");
1429
1430        let columns = source
1431            .columns
1432            .iter()
1433            .map(|col| (col.name(), col.data_type().clone()))
1434            .collect::<Vec<(&str, DataType)>>();
1435
1436        expect_test::expect![[r#"
1437            [
1438                (
1439                    "v1",
1440                    Int32,
1441                ),
1442                (
1443                    "_rw_kafka_key",
1444                    Bytea,
1445                ),
1446                (
1447                    "_rw_kafka_timestamp",
1448                    Timestamptz,
1449                ),
1450                (
1451                    "_rw_kafka_partition",
1452                    Varchar,
1453                ),
1454                (
1455                    "_rw_kafka_offset",
1456                    Varchar,
1457                ),
1458                (
1459                    "_row_id",
1460                    Serial,
1461                ),
1462            ]
1463        "#]]
1464        .assert_debug_eq(&columns);
1465
1466        let sql =
1467            "CREATE SOURCE s3 (v1 int) include timestamp 'header1' as header_col with (connector = 'kafka') format plain encode json".to_owned();
1468        match frontend.run_sql(sql).await {
1469            Err(e) => {
1470                assert_eq!(
1471                    e.to_string(),
1472                    "Protocol error: Only header column can have inner field, but got \"timestamp\""
1473                )
1474            }
1475            _ => unreachable!(),
1476        }
1477    }
1478}