risingwave_frontend/handler/
create_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::LazyLock;
18
19use anyhow::{Context, anyhow};
20use either::Either;
21use external_schema::debezium::extract_debezium_avro_table_pk_columns;
22use external_schema::nexmark::check_nexmark_schema;
23use itertools::Itertools;
24use maplit::{convert_args, hashmap, hashset};
25use pgwire::pg_response::{PgResponse, StatementType};
26use rand::Rng;
27use risingwave_common::array::arrow::{IcebergArrowConvert, arrow_schema_iceberg};
28use risingwave_common::bail_not_implemented;
29use risingwave_common::catalog::{
30    ColumnCatalog, ColumnDesc, ColumnId, INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME,
31    ROW_ID_COLUMN_NAME, TableId, debug_assert_column_ids_distinct,
32};
33use risingwave_common::license::Feature;
34use risingwave_common::secret::LocalSecretManager;
35use risingwave_common::types::DataType;
36use risingwave_common::util::iter_util::ZipEqFast;
37use risingwave_connector::WithPropertiesExt;
38use risingwave_connector::parser::additional_columns::{
39    build_additional_column_desc, get_supported_additional_columns,
40    source_add_partition_offset_cols,
41};
42use risingwave_connector::parser::{
43    AvroParserConfig, DEBEZIUM_IGNORE_KEY, DebeziumAvroParserConfig, ProtobufParserConfig,
44    SchemaLocation, SpecificParserConfig, TimestamptzHandling,
45    fetch_json_schema_and_map_to_columns,
46};
47use risingwave_connector::schema::AWS_GLUE_SCHEMA_ARN_KEY;
48use risingwave_connector::schema::schema_registry::{
49    SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_USERNAME, SchemaRegistryAuth, name_strategy_from_str,
50};
51use risingwave_connector::source::cdc::{
52    CDC_AUTO_SCHEMA_CHANGE_KEY, CDC_MONGODB_STRONG_SCHEMA_KEY, CDC_SHARING_MODE_KEY,
53    CDC_SNAPSHOT_BACKFILL, CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY,
54    CDC_WAIT_FOR_STREAMING_START_TIMEOUT, CITUS_CDC_CONNECTOR, MONGODB_CDC_CONNECTOR,
55    MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR,
56};
57use risingwave_connector::source::datagen::DATAGEN_CONNECTOR;
58use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
59use risingwave_connector::source::nexmark::source::{EventType, get_event_data_types_with_names};
60use risingwave_connector::source::test_source::TEST_CONNECTOR;
61use risingwave_connector::source::{
62    AZBLOB_CONNECTOR, ConnectorProperties, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR,
63    KINESIS_CONNECTOR, LEGACY_S3_CONNECTOR, MQTT_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR,
64    OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR,
65};
66pub use risingwave_connector::source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR};
67use risingwave_pb::catalog::connection_params::PbConnectionType;
68use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc};
69use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
70use risingwave_pb::plan_common::{EncodeType, FormatType};
71use risingwave_pb::stream_plan::PbStreamFragmentGraph;
72use risingwave_pb::telemetry::TelemetryDatabaseObject;
73use risingwave_sqlparser::ast::{
74    AstString, ColumnDef, ColumnOption, CreateSourceStatement, Encode, Format, FormatEncodeOptions,
75    ObjectName, SourceWatermark, SqlOptionValue, TableConstraint, Value, get_delimiter,
76};
77use risingwave_sqlparser::parser::{IncludeOption, IncludeOptionItem};
78use thiserror_ext::AsReport;
79
80use super::RwPgResponse;
81use crate::binder::Binder;
82use crate::catalog::CatalogError;
83use crate::catalog::source_catalog::SourceCatalog;
84use crate::error::ErrorCode::{self, Deprecated, InvalidInputSyntax, NotSupported, ProtocolError};
85use crate::error::{Result, RwError};
86use crate::expr::Expr;
87use crate::handler::HandlerArgs;
88use crate::handler::create_table::{
89    ColumnIdGenerator, bind_pk_and_row_id_on_relation, bind_sql_column_constraints,
90    bind_sql_columns, bind_sql_pk_names, bind_table_constraints,
91};
92use crate::handler::util::{
93    SourceSchemaCompatExt, check_connector_match_connection_type, ensure_connection_type_allowed,
94};
95use crate::optimizer::plan_node::generic::SourceNodeKind;
96use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext};
97use crate::session::SessionImpl;
98use crate::session::current::notice_to_user;
99use crate::utils::{
100    OverwriteOptions, resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option,
101    resolve_secret_ref_in_with_options,
102};
103use crate::{OptimizerContext, WithOptions, WithOptionsSecResolved, bind_data_type, build_graph};
104
105mod external_schema;
106pub use external_schema::{
107    bind_columns_from_source, get_schema_location, schema_has_schema_registry,
108};
109mod validate;
110pub use validate::validate_compatibility;
111use validate::{SOURCE_ALLOWED_CONNECTION_CONNECTOR, SOURCE_ALLOWED_CONNECTION_SCHEMA_REGISTRY};
112mod additional_column;
113use additional_column::check_and_add_timestamp_column;
114pub use additional_column::handle_addition_columns;
115
116fn non_generated_sql_columns(columns: &[ColumnDef]) -> Vec<ColumnDef> {
117    columns
118        .iter()
119        .filter(|c| !c.is_generated())
120        .cloned()
121        .collect()
122}
123
124fn try_consume_string_from_options(
125    format_encode_options: &mut BTreeMap<String, String>,
126    key: &str,
127) -> Option<AstString> {
128    format_encode_options.remove(key).map(AstString)
129}
130
131fn consume_string_from_options(
132    format_encode_options: &mut BTreeMap<String, String>,
133    key: &str,
134) -> Result<AstString> {
135    try_consume_string_from_options(format_encode_options, key).ok_or(RwError::from(ProtocolError(
136        format!("missing field {} in options", key),
137    )))
138}
139
140fn consume_aws_config_from_options(format_encode_options: &mut BTreeMap<String, String>) {
141    format_encode_options.retain(|key, _| !key.starts_with("aws."))
142}
143
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145pub enum CreateSourceType {
146    SharedCdc,
147    /// e.g., shared Kafka source
148    SharedNonCdc,
149    NonShared,
150    /// create table with connector
151    Table,
152}
153
154impl CreateSourceType {
155    /// Note: shouldn't be used for `ALTER SOURCE`, since session variables should not affect existing source. We should respect the original type instead.
156    pub fn for_newly_created(
157        session: &SessionImpl,
158        with_properties: &impl WithPropertiesExt,
159    ) -> Self {
160        if with_properties.is_shareable_cdc_connector() {
161            CreateSourceType::SharedCdc
162        } else if with_properties.is_shareable_non_cdc_connector()
163            && session
164                .env()
165                .streaming_config()
166                .developer
167                .enable_shared_source
168            && session.config().streaming_use_shared_source()
169        {
170            CreateSourceType::SharedNonCdc
171        } else {
172            CreateSourceType::NonShared
173        }
174    }
175
176    pub fn for_replace(catalog: &SourceCatalog) -> Self {
177        if !catalog.info.is_shared() {
178            CreateSourceType::NonShared
179        } else if catalog.with_properties.is_shareable_cdc_connector() {
180            CreateSourceType::SharedCdc
181        } else {
182            CreateSourceType::SharedNonCdc
183        }
184    }
185
186    pub fn is_shared(&self) -> bool {
187        matches!(
188            self,
189            CreateSourceType::SharedCdc | CreateSourceType::SharedNonCdc
190        )
191    }
192}
193
194/// Bind columns from both source and sql defined.
195pub(crate) fn bind_all_columns(
196    format_encode: &FormatEncodeOptions,
197    cols_from_source: Option<Vec<ColumnCatalog>>,
198    cols_from_sql: Vec<ColumnCatalog>,
199    col_defs_from_sql: &[ColumnDef],
200    wildcard_idx: Option<usize>,
201    sql_column_strategy: SqlColumnStrategy,
202) -> Result<Vec<ColumnCatalog>> {
203    if let Some(cols_from_source) = cols_from_source {
204        // Need to check `col_defs` to see if a column is generated, as we haven't bind the
205        // `GeneratedColumnDesc` in `ColumnCatalog` yet.
206        let generated_cols_from_sql = cols_from_sql
207            .iter()
208            .filter(|c| {
209                col_defs_from_sql
210                    .iter()
211                    .find(|d| d.name.real_value() == c.name())
212                    .unwrap()
213                    .is_generated()
214            })
215            .cloned()
216            .collect_vec();
217
218        #[allow(clippy::collapsible_else_if)]
219        match sql_column_strategy {
220            // Ignore `cols_from_source`, follow `cols_from_sql` without checking.
221            SqlColumnStrategy::FollowUnchecked => {
222                assert!(
223                    wildcard_idx.is_none(),
224                    "wildcard still exists while strategy is Follows, not correctly purified?"
225                );
226                return Ok(cols_from_sql);
227            }
228
229            // Will merge `generated_cols_from_sql` into `cols_from_source`.
230            SqlColumnStrategy::Ignore => {}
231
232            SqlColumnStrategy::FollowChecked => {
233                let has_regular_cols_from_sql =
234                    generated_cols_from_sql.len() != cols_from_sql.len();
235
236                if has_regular_cols_from_sql {
237                    if wildcard_idx.is_some() {
238                        // (*, normal_column INT)
239                        return Err(RwError::from(NotSupported(
240                            "When there's a wildcard (\"*\"), \
241                             only generated columns are allowed in user-defined schema from SQL"
242                                .to_owned(),
243                            "Remove the non-generated columns".to_owned(),
244                        )));
245                    } else {
246                        // (normal_column INT)
247                        // Follow `cols_from_sql` with name & type checking.
248                        for col in &cols_from_sql {
249                            if generated_cols_from_sql.contains(col) {
250                                continue;
251                            }
252                            let Some(col_from_source) =
253                                cols_from_source.iter().find(|c| c.name() == col.name())
254                            else {
255                                return Err(RwError::from(ProtocolError(format!(
256                                    "Column \"{}\" is defined in SQL but not found in the source",
257                                    col.name()
258                                ))));
259                            };
260
261                            if col_from_source.data_type() != col.data_type() {
262                                return Err(RwError::from(ProtocolError(format!(
263                                    "Data type mismatch for column \"{}\". \
264                                     Defined in SQL as \"{}\", but found in the source as \"{}\"",
265                                    col.name(),
266                                    col.data_type(),
267                                    col_from_source.data_type()
268                                ))));
269                            }
270                        }
271                        return Ok(cols_from_sql);
272                    }
273                } else {
274                    if wildcard_idx.is_some() {
275                        // (*)
276                        // (*, generated_column INT)
277                        // Good, expand the wildcard later.
278                    } else {
279                        // ()
280                        // (generated_column INT)
281                        // Interpreted as if there's a wildcard for backward compatibility.
282                        // TODO: the behavior is not that consistent, making it impossible to ingest no
283                        //       columns from the source (though not useful in practice). Currently we
284                        //       just notice the user but we may want to interpret it as empty columns
285                        //       in the future.
286                        notice_to_user("\
287                            Neither wildcard (\"*\") nor regular (non-generated) columns appear in the user-defined schema from SQL. \
288                            For backward compatibility, all columns from the source will be included at the beginning. \
289                            For clarity, consider adding a wildcard (\"*\") to indicate where the columns from the source should be included, \
290                            or specifying the columns you want to include from the source.
291                        ");
292                    }
293                }
294            }
295        }
296
297        // In some cases the wildcard may be absent:
298        // - plan based on a purified SQL
299        // - interpret `()` as `(*)` for backward compatibility (see notice above)
300        // Default to 0 to expand the wildcard at the beginning.
301        let wildcard_idx = wildcard_idx.unwrap_or(0).min(generated_cols_from_sql.len());
302
303        // Merge `generated_cols_from_sql` with `cols_from_source`.
304        let mut merged_cols = generated_cols_from_sql;
305        let merged_cols_r = merged_cols.split_off(wildcard_idx);
306        merged_cols.extend(cols_from_source);
307        merged_cols.extend(merged_cols_r);
308
309        Ok(merged_cols)
310    } else {
311        if wildcard_idx.is_some() {
312            return Err(RwError::from(NotSupported(
313                "Wildcard in user-defined schema is only allowed when there exists columns from external schema".to_owned(),
314                "Remove the wildcard or use a source with external schema".to_owned(),
315            )));
316        }
317        let non_generated_sql_defined_columns = non_generated_sql_columns(col_defs_from_sql);
318
319        match (&format_encode.format, &format_encode.row_encode) {
320            (Format::DebeziumMongo, Encode::Json) => {
321                let strong_schema = format_encode
322                    .row_options
323                    .iter()
324                    .find(|k| k.name.real_value().to_lowercase() == CDC_MONGODB_STRONG_SCHEMA_KEY)
325                    .map(|k| matches!(k.value, SqlOptionValue::Value(Value::Boolean(true))))
326                    .unwrap_or(false);
327
328                // strong schema requires a '_id' column at the first position with a specific type
329                if strong_schema {
330                    let (_, id_column) = non_generated_sql_defined_columns
331                        .iter()
332                        .enumerate()
333                        .find(|(idx, col)| *idx == 0 && col.name.real_value() == "_id")
334                        .ok_or_else(|| {
335                            RwError::from(ProtocolError(
336                                "The `_id` column of the source with row format DebeziumMongoJson must be defined as the first column in SQL".to_owned(),
337                            ))
338                        })?;
339
340                    let id_data_type = bind_data_type(id_column.data_type.as_ref().unwrap())?;
341                    if !matches!(
342                        id_data_type,
343                        DataType::Varchar | DataType::Int32 | DataType::Int64 | DataType::Jsonb
344                    ) {
345                        return Err(RwError::from(ProtocolError(
346                            "the `_id` column of the source with row format DebeziumMongoJson must be [Jsonb | Varchar | Int32 | Int64]".to_owned(),
347                        )));
348                    }
349
350                    let mut columns = Vec::with_capacity(non_generated_sql_defined_columns.len());
351                    columns.push(
352                        // id column
353                        ColumnCatalog {
354                            column_desc: ColumnDesc::named("_id", 0.into(), id_data_type),
355                            is_hidden: false,
356                        },
357                    );
358
359                    // bind rest of the columns
360                    for (idx, col) in non_generated_sql_defined_columns
361                        .into_iter()
362                        // skip the first column
363                        .skip(1)
364                        .enumerate()
365                    {
366                        columns.push(ColumnCatalog {
367                            column_desc: ColumnDesc::named(
368                                col.name.real_value(),
369                                (idx as i32).into(),
370                                bind_data_type(col.data_type.as_ref().unwrap())?,
371                            ),
372                            is_hidden: false,
373                        });
374                    }
375
376                    return Ok(columns);
377                }
378
379                let mut columns = vec![
380                    ColumnCatalog {
381                        column_desc: ColumnDesc::named("_id", 0.into(), DataType::Varchar),
382                        is_hidden: false,
383                    },
384                    ColumnCatalog {
385                        column_desc: ColumnDesc::named("payload", 0.into(), DataType::Jsonb),
386                        is_hidden: false,
387                    },
388                ];
389
390                if non_generated_sql_defined_columns.len() != 2
391                    || non_generated_sql_defined_columns[0].name.real_value() != columns[0].name()
392                    || non_generated_sql_defined_columns[1].name.real_value() != columns[1].name()
393                {
394                    return Err(RwError::from(ProtocolError(
395                        "the not generated columns of the source with row format DebeziumMongoJson
396        must be (_id [Jsonb | Varchar | Int32 | Int64], payload jsonb)."
397                            .to_owned(),
398                    )));
399                }
400                // ok to unwrap since it was checked at `bind_sql_columns`
401                let key_data_type = bind_data_type(
402                    non_generated_sql_defined_columns[0]
403                        .data_type
404                        .as_ref()
405                        .unwrap(),
406                )?;
407                match key_data_type {
408                    DataType::Jsonb | DataType::Varchar | DataType::Int32 | DataType::Int64 => {
409                        columns[0].column_desc.data_type = key_data_type.clone();
410                    }
411                    _ => {
412                        return Err(RwError::from(ProtocolError(
413                            "the `_id` column of the source with row format DebeziumMongoJson
414        must be [Jsonb | Varchar | Int32 | Int64]"
415                                .to_owned(),
416                        )));
417                    }
418                }
419
420                // ok to unwrap since it was checked at `bind_sql_columns`
421                let value_data_type = bind_data_type(
422                    non_generated_sql_defined_columns[1]
423                        .data_type
424                        .as_ref()
425                        .unwrap(),
426                )?;
427                if !matches!(value_data_type, DataType::Jsonb) {
428                    return Err(RwError::from(ProtocolError(
429                        "the `payload` column of the source with row format DebeziumMongoJson
430        must be Jsonb datatype"
431                            .to_owned(),
432                    )));
433                }
434                Ok(columns)
435            }
436            (Format::Plain, Encode::Bytes) => {
437                let err = Err(RwError::from(ProtocolError(
438                    "ENCODE BYTES only accepts one BYTEA type column".to_owned(),
439                )));
440                if non_generated_sql_defined_columns.len() == 1 {
441                    // ok to unwrap `data_type`` since it was checked at `bind_sql_columns`
442                    let col_data_type = bind_data_type(
443                        non_generated_sql_defined_columns[0]
444                            .data_type
445                            .as_ref()
446                            .unwrap(),
447                    )?;
448                    if col_data_type == DataType::Bytea {
449                        Ok(cols_from_sql)
450                    } else {
451                        err
452                    }
453                } else {
454                    err
455                }
456            }
457            (_, _) => Ok(cols_from_sql),
458        }
459    }
460}
461
462/// TODO: perhaps put the hint in notice is better. The error message format might be not that reliable.
463fn hint_format_encode(format_encode: &FormatEncodeOptions) -> String {
464    format!(
465        r#"Hint: For FORMAT {0} ENCODE {1}, INCLUDE KEY must be specified and the key column must be used as primary key.
466example:
467    CREATE TABLE <table_name> ( PRIMARY KEY ([rw_key | <key_name>]) )
468    INCLUDE KEY [AS <key_name>]
469    WITH (...)
470    FORMAT {0} ENCODE {1}{2}
471"#,
472        format_encode.format,
473        format_encode.row_encode,
474        if format_encode.row_encode == Encode::Json || format_encode.row_encode == Encode::Bytes {
475            "".to_owned()
476        } else {
477            " (...)".to_owned()
478        }
479    )
480}
481
482/// Bind column from source. Add key column to table columns if necessary.
483/// Return `pk_names`.
484pub(crate) async fn bind_source_pk(
485    format_encode: &FormatEncodeOptions,
486    source_info: &StreamSourceInfo,
487    columns: &mut [ColumnCatalog],
488    sql_defined_pk_names: Vec<String>,
489    with_properties: &WithOptionsSecResolved,
490) -> Result<Vec<String>> {
491    let sql_defined_pk = !sql_defined_pk_names.is_empty();
492    let include_key_column_name: Option<String> = {
493        // iter columns to check if contains additional columns from key part
494        // return the key column names if exists
495        columns.iter().find_map(|catalog| {
496            if matches!(
497                catalog.column_desc.additional_column.column_type,
498                Some(AdditionalColumnType::Key(_))
499            ) {
500                Some(catalog.name().to_owned())
501            } else {
502                None
503            }
504        })
505    };
506    let additional_column_names = columns
507        .iter()
508        .filter_map(|col| {
509            if col.column_desc.additional_column.column_type.is_some() {
510                Some(col.name().to_owned())
511            } else {
512                None
513            }
514        })
515        .collect_vec();
516
517    let res = match (&format_encode.format, &format_encode.row_encode) {
518        (Format::Native, Encode::Native) | (Format::None, Encode::None) | (Format::Plain, _) => {
519            sql_defined_pk_names
520        }
521
522        // For all Upsert formats, we only accept one and only key column as primary key.
523        // Additional KEY columns must be set in this case and must be primary key.
524        (Format::Upsert, Encode::Json | Encode::Avro | Encode::Protobuf) => {
525            if let Some(ref key_column_name) = include_key_column_name
526                && sql_defined_pk
527            {
528                // pk is set. check if it's valid
529
530                // the column name have been converted to real value in `handle_addition_columns`
531                // so we don't ignore ascii case here
532                if sql_defined_pk_names.len() != 1
533                    || !key_column_name.eq(sql_defined_pk_names[0].as_str())
534                {
535                    return Err(RwError::from(ProtocolError(format!(
536                        "Only \"{}\" can be used as primary key\n\n{}",
537                        key_column_name,
538                        hint_format_encode(format_encode)
539                    ))));
540                }
541                sql_defined_pk_names
542            } else {
543                // pk not set, or even key not included
544                return if let Some(include_key_column_name) = include_key_column_name {
545                    Err(RwError::from(ProtocolError(format!(
546                        "Primary key must be specified to {}\n\n{}",
547                        include_key_column_name,
548                        hint_format_encode(format_encode)
549                    ))))
550                } else {
551                    Err(RwError::from(ProtocolError(format!(
552                        "INCLUDE KEY clause not set\n\n{}",
553                        hint_format_encode(format_encode)
554                    ))))
555                };
556            }
557        }
558
559        (Format::Debezium, Encode::Json) => {
560            if !additional_column_names.is_empty() {
561                return Err(RwError::from(ProtocolError(format!(
562                    "FORMAT DEBEZIUM forbids additional columns, but got {:?}",
563                    additional_column_names
564                ))));
565            }
566            if !sql_defined_pk {
567                return Err(RwError::from(ProtocolError(
568                    "Primary key must be specified when creating source with FORMAT DEBEZIUM."
569                        .to_owned(),
570                )));
571            }
572            sql_defined_pk_names
573        }
574        (Format::Debezium, Encode::Avro) => {
575            if !additional_column_names.is_empty() {
576                return Err(RwError::from(ProtocolError(format!(
577                    "FORMAT DEBEZIUM forbids additional columns, but got {:?}",
578                    additional_column_names
579                ))));
580            }
581            if sql_defined_pk {
582                sql_defined_pk_names
583            } else {
584                let pk_names =
585                    extract_debezium_avro_table_pk_columns(source_info, with_properties).await?;
586                // extract pk(s) from schema registry
587                for pk_name in &pk_names {
588                    columns
589                        .iter()
590                        .find(|c: &&ColumnCatalog| c.name().eq(pk_name))
591                        .ok_or_else(|| {
592                            RwError::from(ProtocolError(format!(
593                                "avro's key column {} not exists in avro's row schema",
594                                pk_name
595                            )))
596                        })?;
597                }
598                pk_names
599            }
600        }
601        (Format::DebeziumMongo, Encode::Json) => {
602            if sql_defined_pk {
603                sql_defined_pk_names
604            } else {
605                vec!["_id".to_owned()]
606            }
607        }
608
609        (Format::Maxwell, Encode::Json) => {
610            if !additional_column_names.is_empty() {
611                return Err(RwError::from(ProtocolError(format!(
612                    "FORMAT MAXWELL forbids additional columns, but got {:?}",
613                    additional_column_names
614                ))));
615            }
616            if !sql_defined_pk {
617                return Err(RwError::from(ProtocolError(
618    "Primary key must be specified when creating source with FORMAT MAXWELL ENCODE JSON.".to_owned(),
619    )));
620            }
621            sql_defined_pk_names
622        }
623
624        (Format::Canal, Encode::Json) => {
625            if !additional_column_names.is_empty() {
626                return Err(RwError::from(ProtocolError(format!(
627                    "FORMAT CANAL forbids additional columns, but got {:?}",
628                    additional_column_names
629                ))));
630            }
631            if !sql_defined_pk {
632                return Err(RwError::from(ProtocolError(
633    "Primary key must be specified when creating source with FORMAT CANAL ENCODE JSON.".to_owned(),
634    )));
635            }
636            sql_defined_pk_names
637        }
638        (format, encoding) => {
639            return Err(RwError::from(ProtocolError(format!(
640                "Unknown combination {:?} {:?}",
641                format, encoding
642            ))));
643        }
644    };
645    Ok(res)
646}
647
648pub(super) fn bind_source_watermark(
649    session: &SessionImpl,
650    name: String,
651    source_watermarks: Vec<SourceWatermark>,
652    column_catalogs: &[ColumnCatalog],
653) -> Result<Vec<WatermarkDesc>> {
654    let mut binder = Binder::new_for_ddl(session);
655    binder.bind_columns_to_context(name.clone(), column_catalogs)?;
656
657    let watermark_descs = source_watermarks
658        .into_iter()
659        .map(|source_watermark| {
660            let col_name = source_watermark.column.real_value();
661            let watermark_idx = binder.get_column_binding_index(name.clone(), &col_name)?;
662
663            let expr = binder.bind_expr(source_watermark.expr)?;
664            let watermark_col_type = column_catalogs[watermark_idx].data_type();
665            let watermark_expr_type = &expr.return_type();
666            if watermark_col_type != watermark_expr_type {
667                Err(RwError::from(ErrorCode::BindError(
668                    format!("The return value type of the watermark expression must be identical to the watermark column data type. Current data type of watermark return value: `{}`, column `{}`",watermark_expr_type, watermark_col_type),
669                )))
670            } else {
671                let expr_proto = expr.to_expr_proto();
672                Ok::<_, RwError>(WatermarkDesc {
673                    watermark_idx: watermark_idx as u32,
674                    expr: Some(expr_proto),
675                })
676            }
677        })
678        .try_collect()?;
679    Ok(watermark_descs)
680}
681
682/// Performs early stage checking in frontend to see if the schema of the given `columns` is
683/// compatible with the connector extracted from the properties.
684///
685/// One should only call this function after all properties of all columns are resolved, like
686/// generated column descriptors.
687pub(super) fn check_format_encode(
688    props: &WithOptionsSecResolved,
689    row_id_index: Option<usize>,
690    columns: &[ColumnCatalog],
691) -> Result<()> {
692    let Some(connector) = props.get_connector() else {
693        return Ok(());
694    };
695
696    if connector == NEXMARK_CONNECTOR {
697        check_nexmark_schema(props, row_id_index, columns)
698    } else {
699        Ok(())
700    }
701}
702
703pub fn bind_connector_props(
704    handler_args: &HandlerArgs,
705    format_encode: &FormatEncodeOptions,
706    is_create_source: bool,
707) -> Result<WithOptions> {
708    let mut with_properties = handler_args.with_options.clone().into_connector_props();
709    validate_compatibility(format_encode, &mut with_properties)?;
710    let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
711
712    if !is_create_source && with_properties.is_shareable_only_cdc_connector() {
713        return Err(RwError::from(ProtocolError(format!(
714            "connector {} does not support `CREATE TABLE`, please use `CREATE SOURCE` instead",
715            with_properties.get_connector().unwrap(),
716        ))));
717    }
718    if is_create_source && create_cdc_source_job {
719        if let Some(value) = with_properties.get(CDC_AUTO_SCHEMA_CHANGE_KEY)
720            && value
721                .parse::<bool>()
722                .map_err(|_| anyhow!("invalid value of '{}' option", CDC_AUTO_SCHEMA_CHANGE_KEY))?
723        {
724            Feature::CdcAutoSchemaChange
725                .check_available()
726                .map_err(|e| anyhow::anyhow!(e))?;
727        }
728
729        // set connector to backfill mode
730        with_properties.insert(CDC_SNAPSHOT_MODE_KEY.into(), CDC_SNAPSHOT_BACKFILL.into());
731        // enable cdc sharing mode, which will capture all tables in the given `database.name`
732        with_properties.insert(CDC_SHARING_MODE_KEY.into(), "true".into());
733        // enable transactional cdc
734        if with_properties.enable_transaction_metadata() {
735            with_properties.insert(CDC_TRANSACTIONAL_KEY.into(), "true".into());
736        }
737        with_properties.insert(
738            CDC_WAIT_FOR_STREAMING_START_TIMEOUT.into(),
739            handler_args
740                .session
741                .config()
742                .cdc_source_wait_streaming_start_timeout()
743                .to_string(),
744        );
745    }
746    if with_properties.is_mysql_cdc_connector() {
747        // Generate a random server id for mysql cdc source if needed
748        // `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication
749        // group (that is, different from any other server id being used by any master or slave)
750        with_properties
751            .entry("server.id".to_owned())
752            .or_insert(rand::rng().random_range(1..u32::MAX).to_string());
753    }
754    Ok(with_properties)
755}
756
757/// When the schema can be inferred from external system (like schema registry),
758/// how to handle the regular columns (i.e., non-generated) defined in SQL?
759pub enum SqlColumnStrategy {
760    /// Follow all columns defined in SQL, ignore the columns from external system.
761    /// This ensures that no accidental side effect will change the schema.
762    ///
763    /// This is the behavior when re-planning the target table of `SINK INTO`.
764    FollowUnchecked,
765
766    /// Follow all column defined in SQL, check the columns from external system with name & type.
767    /// This ensures that no accidental side effect will change the schema.
768    ///
769    /// This is the behavior when creating a new table or source with a resolvable schema;
770    /// adding, or dropping columns on that table.
771    // TODO(purify): `ALTER SOURCE` currently has its own code path and does not check.
772    FollowChecked,
773
774    /// Merge the generated columns defined in SQL and columns from external system. If there
775    /// are also regular columns defined in SQL, ignore silently.
776    ///
777    /// This is the behavior when `REFRESH SCHEMA` atop the purified SQL.
778    Ignore,
779}
780
781#[allow(clippy::too_many_arguments)]
782pub async fn bind_create_source_or_table_with_connector(
783    handler_args: HandlerArgs,
784    full_name: ObjectName,
785    format_encode: FormatEncodeOptions,
786    with_properties: WithOptions,
787    sql_columns_defs: &[ColumnDef],
788    constraints: Vec<TableConstraint>,
789    wildcard_idx: Option<usize>,
790    source_watermarks: Vec<SourceWatermark>,
791    columns_from_resolve_source: Option<Vec<ColumnCatalog>>,
792    source_info: StreamSourceInfo,
793    include_column_options: IncludeOption,
794    col_id_gen: &mut ColumnIdGenerator,
795    create_source_type: CreateSourceType,
796    source_rate_limit: Option<u32>,
797    sql_column_strategy: SqlColumnStrategy,
798) -> Result<SourceCatalog> {
799    let session = &handler_args.session;
800    let db_name: &str = &session.database();
801    let (schema_name, source_name) = Binder::resolve_schema_qualified_name(db_name, full_name)?;
802    let (database_id, schema_id) =
803        session.get_database_and_schema_id_for_create(schema_name.clone())?;
804
805    let is_create_source = create_source_type != CreateSourceType::Table;
806    if !is_create_source && with_properties.is_iceberg_connector() {
807        return Err(ErrorCode::BindError(
808            "can't CREATE TABLE with iceberg connector\n\nHint: use CREATE SOURCE instead"
809                .to_owned(),
810        )
811        .into());
812    }
813    if is_create_source {
814        match format_encode.format {
815            Format::Upsert
816            | Format::Debezium
817            | Format::DebeziumMongo
818            | Format::Maxwell
819            | Format::Canal => {
820                return Err(ErrorCode::BindError(format!(
821                    "can't CREATE SOURCE with FORMAT {}.\n\nHint: use CREATE TABLE instead\n\n{}",
822                    format_encode.format,
823                    hint_format_encode(&format_encode)
824                ))
825                .into());
826            }
827            _ => {
828                // TODO: enhance error message for other formats
829            }
830        }
831    }
832
833    let sql_pk_names = bind_sql_pk_names(sql_columns_defs, bind_table_constraints(&constraints)?)?;
834
835    let columns_from_sql = bind_sql_columns(sql_columns_defs, false)?;
836
837    let mut columns = bind_all_columns(
838        &format_encode,
839        columns_from_resolve_source,
840        columns_from_sql,
841        sql_columns_defs,
842        wildcard_idx,
843        sql_column_strategy,
844    )?;
845
846    // add additional columns before bind pk, because `format upsert` requires the key column
847    handle_addition_columns(
848        Some(&format_encode),
849        &with_properties,
850        include_column_options,
851        &mut columns,
852        false,
853    )?;
854
855    if columns.is_empty() {
856        return Err(RwError::from(ProtocolError(
857            "Schema definition is required, either from SQL or schema registry.".to_owned(),
858        )));
859    }
860
861    // compatible with the behavior that add a hidden column `_rw_kafka_timestamp` to each message from Kafka source
862    if is_create_source {
863        // must behind `handle_addition_columns`
864        check_and_add_timestamp_column(&with_properties, &mut columns);
865
866        // For shared sources, we will include partition and offset cols in the SourceExecutor's *output*, to be used by the SourceBackfillExecutor.
867        // For shared CDC source, the schema is different. See ColumnCatalog::debezium_cdc_source_cols(), CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS
868        if create_source_type == CreateSourceType::SharedNonCdc {
869            let (columns_exist, additional_columns) = source_add_partition_offset_cols(
870                &columns,
871                &with_properties.get_connector().unwrap(),
872                true, // col_id filled below at col_id_gen.generate
873            );
874            for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) {
875                if !existed {
876                    columns.push(ColumnCatalog::hidden(c));
877                }
878            }
879        }
880    }
881
882    // resolve privatelink connection for Kafka
883    let mut with_properties = with_properties;
884    resolve_privatelink_in_with_option(&mut with_properties)?;
885
886    let (with_properties, connection_type, connector_conn_ref) =
887        resolve_connection_ref_and_secret_ref(
888            with_properties,
889            session,
890            TelemetryDatabaseObject::Source,
891        )?;
892    ensure_connection_type_allowed(connection_type, &SOURCE_ALLOWED_CONNECTION_CONNECTOR)?;
893
894    // if not using connection, we don't need to check connector match connection type
895    if !matches!(connection_type, PbConnectionType::Unspecified) {
896        let connector = with_properties.get_connector().unwrap();
897        check_connector_match_connection_type(connector.as_str(), &connection_type)?;
898    }
899
900    let pk_names = bind_source_pk(
901        &format_encode,
902        &source_info,
903        &mut columns,
904        sql_pk_names,
905        &with_properties,
906    )
907    .await?;
908
909    if is_create_source && !pk_names.is_empty() {
910        return Err(ErrorCode::InvalidInputSyntax(
911            "Source does not support PRIMARY KEY constraint, please use \"CREATE TABLE\" instead"
912                .to_owned(),
913        )
914        .into());
915    }
916
917    // User may specify a generated or additional column with the same name as one from the external schema.
918    // Ensure duplicated column names are handled here.
919    if let Some(duplicated_name) = columns.iter().map(|c| c.name()).duplicates().next() {
920        return Err(ErrorCode::InvalidInputSyntax(format!(
921            "column \"{}\" specified more than once",
922            duplicated_name
923        ))
924        .into());
925    }
926
927    // XXX: why do we use col_id_gen here? It doesn't seem to be very necessary.
928    for c in &mut columns {
929        let original_data_type = c.data_type().clone();
930        col_id_gen.generate(c)?;
931        // TODO: Now we restore the data type for `CREATE SOURCE`, so that keep the nested field id unset.
932        //       This behavior is inconsistent with `CREATE TABLE`, and should be fixed once we refactor
933        //       `ALTER SOURCE` to also use `ColumnIdGenerator` in the future.
934        if is_create_source {
935            c.column_desc.data_type = original_data_type;
936        }
937    }
938    debug_assert_column_ids_distinct(&columns);
939
940    let (mut columns, pk_col_ids, row_id_index) =
941        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
942
943    let watermark_descs =
944        bind_source_watermark(session, source_name.clone(), source_watermarks, &columns)?;
945    // TODO(yuhao): allow multiple watermark on source.
946    assert!(watermark_descs.len() <= 1);
947
948    bind_sql_column_constraints(
949        session,
950        source_name.clone(),
951        &mut columns,
952        // TODO(st1page): pass the ref
953        sql_columns_defs.to_vec(),
954        &pk_col_ids,
955    )?;
956    check_format_encode(&with_properties, row_id_index, &columns)?;
957
958    let definition = handler_args.normalized_sql.clone();
959
960    let associated_table_id = if is_create_source {
961        None
962    } else {
963        Some(TableId::placeholder())
964    };
965    let source = SourceCatalog {
966        id: TableId::placeholder().table_id,
967        name: source_name,
968        schema_id,
969        database_id,
970        columns,
971        pk_col_ids,
972        append_only: row_id_index.is_some(),
973        owner: session.user_id(),
974        info: source_info,
975        row_id_index,
976        with_properties,
977        watermark_descs,
978        associated_table_id,
979        definition,
980        connection_id: connector_conn_ref,
981        created_at_epoch: None,
982        initialized_at_epoch: None,
983        version: INITIAL_SOURCE_VERSION_ID,
984        created_at_cluster_version: None,
985        initialized_at_cluster_version: None,
986        rate_limit: source_rate_limit,
987    };
988    Ok(source)
989}
990
991pub async fn handle_create_source(
992    mut handler_args: HandlerArgs,
993    stmt: CreateSourceStatement,
994) -> Result<RwPgResponse> {
995    let session = handler_args.session.clone();
996    let overwrite_options = OverwriteOptions::new(&mut handler_args);
997
998    if let Either::Right(resp) = session.check_relation_name_duplicated(
999        stmt.source_name.clone(),
1000        StatementType::CREATE_SOURCE,
1001        stmt.if_not_exists,
1002    )? {
1003        return Ok(resp);
1004    }
1005
1006    if handler_args.with_options.is_empty() {
1007        return Err(RwError::from(InvalidInputSyntax(
1008            "missing WITH clause".to_owned(),
1009        )));
1010    }
1011
1012    let format_encode = stmt.format_encode.into_v2_with_warning();
1013    let with_properties = bind_connector_props(&handler_args, &format_encode, true)?;
1014
1015    let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
1016    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
1017        &session,
1018        &format_encode,
1019        Either::Left(&with_properties),
1020        create_source_type,
1021    )
1022    .await?;
1023    let mut col_id_gen = ColumnIdGenerator::new_initial();
1024
1025    if stmt.columns.iter().any(|col| {
1026        col.options
1027            .iter()
1028            .any(|def| matches!(def.option, ColumnOption::NotNull))
1029    }) {
1030        return Err(RwError::from(InvalidInputSyntax(
1031            "NOT NULL constraint is not supported in source schema".to_owned(),
1032        )));
1033    }
1034
1035    let source_catalog = bind_create_source_or_table_with_connector(
1036        handler_args.clone(),
1037        stmt.source_name,
1038        format_encode,
1039        with_properties,
1040        &stmt.columns,
1041        stmt.constraints,
1042        stmt.wildcard_idx,
1043        stmt.source_watermarks,
1044        columns_from_resolve_source,
1045        source_info,
1046        stmt.include_column_options,
1047        &mut col_id_gen,
1048        create_source_type,
1049        overwrite_options.source_rate_limit,
1050        SqlColumnStrategy::FollowChecked,
1051    )
1052    .await?;
1053
1054    // If it is a temporary source, put it into SessionImpl.
1055    if stmt.temporary {
1056        if session.get_temporary_source(&source_catalog.name).is_some() {
1057            return Err(CatalogError::duplicated("source", source_catalog.name.clone()).into());
1058        }
1059        session.create_temporary_source(source_catalog);
1060        return Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE));
1061    }
1062
1063    let source = source_catalog.to_prost();
1064
1065    let catalog_writer = session.catalog_writer()?;
1066
1067    if create_source_type.is_shared() {
1068        let graph = generate_stream_graph_for_source(handler_args, source_catalog)?;
1069        catalog_writer.create_source(source, Some(graph)).await?;
1070    } else {
1071        // For other sources we don't create a streaming job
1072        catalog_writer.create_source(source, None).await?;
1073    }
1074
1075    Ok(PgResponse::empty_result(StatementType::CREATE_SOURCE))
1076}
1077
1078pub(super) fn generate_stream_graph_for_source(
1079    handler_args: HandlerArgs,
1080    source_catalog: SourceCatalog,
1081) -> Result<PbStreamFragmentGraph> {
1082    let context = OptimizerContext::from_handler_args(handler_args);
1083    let source_node = LogicalSource::with_catalog(
1084        Rc::new(source_catalog),
1085        SourceNodeKind::CreateSharedSource,
1086        context.into(),
1087        None,
1088    )?;
1089
1090    let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?;
1091    let graph = build_graph(stream_plan)?;
1092    Ok(graph)
1093}
1094
1095#[cfg(test)]
1096pub mod tests {
1097    use std::collections::HashMap;
1098    use std::sync::Arc;
1099
1100    use risingwave_common::catalog::{
1101        DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME,
1102    };
1103    use risingwave_common::types::{DataType, StructType};
1104
1105    use crate::catalog::root_catalog::SchemaPath;
1106    use crate::catalog::source_catalog::SourceCatalog;
1107    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
1108
1109    const GET_COLUMN_FROM_CATALOG: fn(&Arc<SourceCatalog>) -> HashMap<&str, DataType> =
1110        |catalog: &Arc<SourceCatalog>| -> HashMap<&str, DataType> {
1111            catalog
1112                .columns
1113                .iter()
1114                .map(|col| (col.name(), col.data_type().clone()))
1115                .collect::<HashMap<&str, DataType>>()
1116        };
1117
1118    #[tokio::test]
1119    async fn test_create_source_handler() {
1120        let proto_file = create_proto_file(PROTO_FILE_DATA);
1121        let sql = format!(
1122            r#"CREATE SOURCE t
1123    WITH (connector = 'kinesis')
1124    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
1125            proto_file.path().to_str().unwrap()
1126        );
1127        let frontend = LocalFrontend::new(Default::default()).await;
1128        frontend.run_sql(sql).await.unwrap();
1129
1130        let session = frontend.session_ref();
1131        let catalog_reader = session.env().catalog_reader().read_guard();
1132        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1133
1134        // Check source exists.
1135        let (source, _) = catalog_reader
1136            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
1137            .unwrap();
1138        assert_eq!(source.name, "t");
1139
1140        let columns = GET_COLUMN_FROM_CATALOG(source);
1141
1142        let city_type = StructType::new(vec![
1143            ("address", DataType::Varchar),
1144            ("zipcode", DataType::Varchar),
1145        ])
1146        // .with_ids([5, 6].map(ColumnId::new))
1147        .into();
1148        let expected_columns = maplit::hashmap! {
1149            ROW_ID_COLUMN_NAME => DataType::Serial,
1150            "id" => DataType::Int32,
1151            "zipcode" => DataType::Int64,
1152            "rate" => DataType::Float32,
1153            "country" => StructType::new(
1154                vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
1155            )
1156            // .with_ids([3, 4, 7].map(ColumnId::new))
1157            .into(),
1158        };
1159        assert_eq!(columns, expected_columns, "{columns:#?}");
1160    }
1161
1162    #[tokio::test]
1163    async fn test_duplicate_props_options() {
1164        let proto_file = create_proto_file(PROTO_FILE_DATA);
1165        let sql = format!(
1166            r#"CREATE SOURCE t
1167    WITH (
1168        connector = 'kinesis',
1169        aws.region='user_test_topic',
1170        endpoint='172.10.1.1:9090,172.10.1.2:9090',
1171        aws.credentials.access_key_id = 'your_access_key_1',
1172        aws.credentials.secret_access_key = 'your_secret_key_1'
1173    )
1174    FORMAT PLAIN ENCODE PROTOBUF (
1175        message = '.test.TestRecord',
1176        aws.credentials.access_key_id = 'your_access_key_2',
1177        aws.credentials.secret_access_key = 'your_secret_key_2',
1178        schema.location = 'file://{}',
1179    )"#,
1180            proto_file.path().to_str().unwrap()
1181        );
1182        let frontend = LocalFrontend::new(Default::default()).await;
1183        frontend.run_sql(sql).await.unwrap();
1184
1185        let session = frontend.session_ref();
1186        let catalog_reader = session.env().catalog_reader().read_guard();
1187        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1188
1189        // Check source exists.
1190        let (source, _) = catalog_reader
1191            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
1192            .unwrap();
1193        assert_eq!(source.name, "t");
1194
1195        // AwsAuth params exist in options.
1196        assert_eq!(
1197            source
1198                .info
1199                .format_encode_options
1200                .get("aws.credentials.access_key_id")
1201                .unwrap(),
1202            "your_access_key_2"
1203        );
1204        assert_eq!(
1205            source
1206                .info
1207                .format_encode_options
1208                .get("aws.credentials.secret_access_key")
1209                .unwrap(),
1210            "your_secret_key_2"
1211        );
1212
1213        // AwsAuth params exist in props.
1214        assert_eq!(
1215            source
1216                .with_properties
1217                .get("aws.credentials.access_key_id")
1218                .unwrap(),
1219            "your_access_key_1"
1220        );
1221        assert_eq!(
1222            source
1223                .with_properties
1224                .get("aws.credentials.secret_access_key")
1225                .unwrap(),
1226            "your_secret_key_1"
1227        );
1228
1229        // Options are not merged into props.
1230        assert!(!source.with_properties.contains_key("schema.location"));
1231    }
1232
1233    #[tokio::test]
1234    async fn test_multi_table_cdc_create_source_handler() {
1235        let sql =
1236            "CREATE SOURCE t2 WITH (connector = 'mysql-cdc') FORMAT PLAIN ENCODE JSON".to_owned();
1237        let frontend = LocalFrontend::new(Default::default()).await;
1238        let session = frontend.session_ref();
1239
1240        frontend
1241            .run_sql_with_session(session.clone(), sql)
1242            .await
1243            .unwrap();
1244        let catalog_reader = session.env().catalog_reader().read_guard();
1245        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
1246
1247        // Check source exists.
1248        let (source, _) = catalog_reader
1249            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t2")
1250            .unwrap();
1251        assert_eq!(source.name, "t2");
1252
1253        let columns = source
1254            .columns
1255            .iter()
1256            .map(|col| (col.name(), col.data_type().clone()))
1257            .collect::<Vec<(&str, DataType)>>();
1258
1259        expect_test::expect![[r#"
1260            [
1261                (
1262                    "payload",
1263                    Jsonb,
1264                ),
1265                (
1266                    "_rw_offset",
1267                    Varchar,
1268                ),
1269                (
1270                    "_rw_table_name",
1271                    Varchar,
1272                ),
1273                (
1274                    "_row_id",
1275                    Serial,
1276                ),
1277            ]
1278        "#]]
1279        .assert_debug_eq(&columns);
1280    }
1281
1282    #[tokio::test]
1283    async fn test_source_addition_columns() {
1284        // test derive include column for format plain
1285        let sql =
1286            "CREATE SOURCE s (v1 int) include key as _rw_kafka_key with (connector = 'kafka') format plain encode json".to_owned();
1287        let frontend = LocalFrontend::new(Default::default()).await;
1288        frontend.run_sql(sql).await.unwrap();
1289        let session = frontend.session_ref();
1290        let catalog_reader = session.env().catalog_reader().read_guard();
1291        let (source, _) = catalog_reader
1292            .get_source_by_name(
1293                DEFAULT_DATABASE_NAME,
1294                SchemaPath::Name(DEFAULT_SCHEMA_NAME),
1295                "s",
1296            )
1297            .unwrap();
1298        assert_eq!(source.name, "s");
1299
1300        let columns = source
1301            .columns
1302            .iter()
1303            .map(|col| (col.name(), col.data_type().clone()))
1304            .collect::<Vec<(&str, DataType)>>();
1305
1306        expect_test::expect![[r#"
1307            [
1308                (
1309                    "v1",
1310                    Int32,
1311                ),
1312                (
1313                    "_rw_kafka_key",
1314                    Bytea,
1315                ),
1316                (
1317                    "_rw_kafka_timestamp",
1318                    Timestamptz,
1319                ),
1320                (
1321                    "_rw_kafka_partition",
1322                    Varchar,
1323                ),
1324                (
1325                    "_rw_kafka_offset",
1326                    Varchar,
1327                ),
1328                (
1329                    "_row_id",
1330                    Serial,
1331                ),
1332            ]
1333        "#]]
1334        .assert_debug_eq(&columns);
1335
1336        let sql =
1337            "CREATE SOURCE s3 (v1 int) include timestamp 'header1' as header_col with (connector = 'kafka') format plain encode json".to_owned();
1338        match frontend.run_sql(sql).await {
1339            Err(e) => {
1340                assert_eq!(
1341                    e.to_string(),
1342                    "Protocol error: Only header column can have inner field, but got \"timestamp\""
1343                )
1344            }
1345            _ => unreachable!(),
1346        }
1347    }
1348}