risingwave_frontend/handler/
create_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28    CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29    ObjectId, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, TableId,
30};
31use risingwave_common::config::MetaBackend;
32use risingwave_common::global_jvm::Jvm;
33use risingwave_common::session_config::sink_decouple::SinkDecouple;
34use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
35use risingwave_common::util::value_encoding::DatumToProtoExt;
36use risingwave_common::{bail, bail_not_implemented};
37use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_connector::source::cdc::external::{
40    DATABASE_NAME_KEY, ExternalCdcTableType, ExternalTableConfig, ExternalTableImpl,
41    SCHEMA_NAME_KEY, TABLE_NAME_KEY,
42};
43use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, source};
44use risingwave_pb::catalog::connection::Info as ConnectionInfo;
45use risingwave_pb::catalog::connection_params::ConnectionType;
46use risingwave_pb::catalog::{PbSource, PbWebhookSourceInfo, WatermarkDesc};
47use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
48use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
49use risingwave_pb::plan_common::{
50    AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
51};
52use risingwave_pb::secret::PbSecretRef;
53use risingwave_pb::secret::secret_ref::PbRefAsType;
54use risingwave_pb::stream_plan::StreamFragmentGraph;
55use risingwave_sqlparser::ast::{
56    CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
57    CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
58    FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
59    Statement, TableConstraint, WebhookSourceInfo, WithProperties,
60};
61use risingwave_sqlparser::parser::{IncludeOption, Parser};
62use thiserror_ext::AsReport;
63
64use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
65use super::{RwPgResponse, alter_streaming_rate_limit};
66use crate::binder::{Clause, SecureCompareContext, bind_data_type};
67use crate::catalog::root_catalog::SchemaPath;
68use crate::catalog::source_catalog::SourceCatalog;
69use crate::catalog::table_catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, TableVersion};
70use crate::catalog::{ColumnId, DatabaseId, SchemaId, SourceId, check_column_name_not_reserved};
71use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
72use crate::expr::{Expr, ExprImpl, ExprRewriter};
73use crate::handler::HandlerArgs;
74use crate::handler::create_source::{
75    UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
76    bind_source_watermark, handle_addition_columns,
77};
78use crate::handler::util::SourceSchemaCompatExt;
79use crate::optimizer::plan_node::generic::{SourceNodeKind, build_cdc_scan_options_with_options};
80use crate::optimizer::plan_node::{
81    LogicalCdcScan, LogicalPlanRef, LogicalSource, StreamPlanRef as PlanRef,
82};
83use crate::optimizer::property::{Order, RequiredDist};
84use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRoot};
85use crate::session::SessionImpl;
86use crate::session::current::notice_to_user;
87use crate::stream_fragmenter::{GraphJobType, build_graph};
88use crate::utils::OverwriteOptions;
89use crate::{Binder, Explain, TableCatalog, WithOptions};
90
91mod col_id_gen;
92pub use col_id_gen::*;
93use risingwave_connector::sink::SinkParam;
94use risingwave_connector::sink::iceberg::{
95    COMPACTION_INTERVAL_SEC, ENABLE_COMPACTION, ENABLE_SNAPSHOT_EXPIRATION,
96    ICEBERG_WRITE_MODE_COPY_ON_WRITE, ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergSink,
97    MAX_SNAPSHOTS_NUM, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
98    SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA, SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
99    SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE, parse_partition_by_exprs,
100};
101use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
102use risingwave_pb::meta::PbThrottleTarget;
103
104use crate::handler::create_sink::{SinkPlanContext, gen_sink_plan};
105
106fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
107    for option_def in &c.options {
108        match option_def.option {
109            ColumnOption::GeneratedColumns(_) => {}
110            ColumnOption::DefaultValue(_) => {}
111            ColumnOption::DefaultValueInternal { .. } => {}
112            ColumnOption::Unique { is_primary: true } => {}
113            ColumnOption::Null => {}
114            ColumnOption::NotNull => {}
115            _ => bail_not_implemented!("column constraints \"{}\"", option_def),
116        }
117    }
118    Ok(())
119}
120
121/// Binds the column schemas declared in CREATE statement into `ColumnDesc`.
122/// If a column is marked as `primary key`, its `ColumnId` is also returned.
123/// This primary key is not combined with table constraints yet.
124pub fn bind_sql_columns(
125    column_defs: &[ColumnDef],
126    is_for_drop_table_connector: bool,
127) -> Result<Vec<ColumnCatalog>> {
128    let mut columns = Vec::with_capacity(column_defs.len());
129
130    for column in column_defs {
131        ensure_column_options_supported(column)?;
132        // Destruct to make sure all fields are properly handled rather than ignored.
133        // Do NOT use `..` to ignore fields you do not want to deal with.
134        // Reject them with a clear NotImplemented error.
135        let ColumnDef {
136            name,
137            data_type,
138            collation,
139            options,
140            ..
141        } = column;
142
143        let data_type = data_type
144            .clone()
145            .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
146        if let Some(collation) = collation {
147            // PostgreSQL will limit the datatypes that collate can work on.
148            // https://www.postgresql.org/docs/16/collation.html#COLLATION-CONCEPTS
149            //   > The built-in collatable data types are `text`, `varchar`, and `char`.
150            //
151            // But we don't support real collation, we simply ignore it here.
152            if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
153                bail_not_implemented!(
154                    "Collate collation other than `C` or `POSIX` is not implemented"
155                );
156            }
157
158            match data_type {
159                AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
160                _ => {
161                    return Err(ErrorCode::NotSupported(
162                        format!("{} is not a collatable data type", data_type),
163                        "The only built-in collatable data types are `varchar`, please check your type".into(),
164                    ).into());
165                }
166            }
167        }
168
169        if !is_for_drop_table_connector {
170            // additional column name may have prefix _rw
171            // When converting dropping the connector from table, the additional columns are converted to normal columns and keep the original name.
172            // Under this case, we loosen the check for _rw prefix.
173            check_column_name_not_reserved(&name.real_value())?;
174        }
175
176        let nullable: bool = !options
177            .iter()
178            .any(|def| matches!(def.option, ColumnOption::NotNull));
179
180        columns.push(ColumnCatalog {
181            column_desc: ColumnDesc {
182                data_type: bind_data_type(&data_type)?,
183                column_id: ColumnId::placeholder(),
184                name: name.real_value(),
185                generated_or_default_column: None,
186                description: None,
187                additional_column: AdditionalColumn { column_type: None },
188                version: ColumnDescVersion::LATEST,
189                system_column: None,
190                nullable,
191            },
192            is_hidden: false,
193        });
194    }
195
196    Ok(columns)
197}
198
199fn check_generated_column_constraints(
200    column_name: &String,
201    column_id: ColumnId,
202    expr: &ExprImpl,
203    column_catalogs: &[ColumnCatalog],
204    generated_column_names: &[String],
205    pk_column_ids: &[ColumnId],
206) -> Result<()> {
207    let input_refs = expr.collect_input_refs(column_catalogs.len());
208    for idx in input_refs.ones() {
209        let referred_generated_column = &column_catalogs[idx].column_desc.name;
210        if generated_column_names
211            .iter()
212            .any(|c| c == referred_generated_column)
213        {
214            return Err(ErrorCode::BindError(format!(
215                "Generated can not reference another generated column. \
216                But here generated column \"{}\" referenced another generated column \"{}\"",
217                column_name, referred_generated_column
218            ))
219            .into());
220        }
221    }
222
223    if pk_column_ids.contains(&column_id) && expr.is_impure() {
224        return Err(ErrorCode::BindError(format!(
225            "Generated columns with impure expressions should not be part of the primary key. \
226            Here column \"{}\" is defined as part of the primary key.",
227            column_name
228        ))
229        .into());
230    }
231
232    Ok(())
233}
234
235/// Binds constraints that can be only specified in column definitions,
236/// currently generated columns and default columns.
237pub fn bind_sql_column_constraints(
238    session: &SessionImpl,
239    table_name: String,
240    column_catalogs: &mut [ColumnCatalog],
241    columns: &[ColumnDef],
242    pk_column_ids: &[ColumnId],
243) -> Result<()> {
244    let generated_column_names = {
245        let mut names = vec![];
246        for column in columns {
247            for option_def in &column.options {
248                if let ColumnOption::GeneratedColumns(_) = option_def.option {
249                    names.push(column.name.real_value());
250                    break;
251                }
252            }
253        }
254        names
255    };
256
257    let mut binder = Binder::new_for_ddl(session);
258    binder.bind_columns_to_context(table_name, column_catalogs)?;
259
260    for column in columns {
261        let Some(idx) = column_catalogs
262            .iter()
263            .position(|c| c.name() == column.name.real_value())
264        else {
265            // It's possible that we don't follow the user defined columns in SQL but take the
266            // ones resolved from the source, thus missing some columns. Simply ignore them.
267            continue;
268        };
269
270        for option_def in &column.options {
271            match &option_def.option {
272                ColumnOption::GeneratedColumns(expr) => {
273                    binder.set_clause(Some(Clause::GeneratedColumn));
274
275                    let expr_impl = binder.bind_expr(expr).with_context(|| {
276                        format!(
277                            "fail to bind expression in generated column \"{}\"",
278                            column.name.real_value()
279                        )
280                    })?;
281
282                    check_generated_column_constraints(
283                        &column.name.real_value(),
284                        column_catalogs[idx].column_id(),
285                        &expr_impl,
286                        column_catalogs,
287                        &generated_column_names,
288                        pk_column_ids,
289                    )?;
290
291                    column_catalogs[idx].column_desc.generated_or_default_column = Some(
292                        GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
293                            expr: Some(expr_impl.to_expr_proto()),
294                        }),
295                    );
296                    binder.set_clause(None);
297                }
298                ColumnOption::DefaultValue(expr) => {
299                    let expr_impl = binder
300                        .bind_expr(expr)?
301                        .cast_assign(column_catalogs[idx].data_type())?;
302
303                    // Rewrite expressions to evaluate a snapshot value, used for missing values in the case of
304                    // schema change.
305                    //
306                    // TODO: Currently we don't support impure expressions other than `now()` (like `random()`),
307                    // so the rewritten expression should almost always be pure and we directly call `fold_const`
308                    // here. Actually we do not require purity of the expression here since we're only to get a
309                    // snapshot value.
310                    let rewritten_expr_impl = session
311                        .pinned_snapshot()
312                        .inline_now_proc_time()
313                        .rewrite_expr(expr_impl.clone());
314
315                    if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
316                        let snapshot_value = snapshot_value?;
317
318                        column_catalogs[idx].column_desc.generated_or_default_column =
319                            Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
320                                snapshot_value: Some(snapshot_value.to_protobuf()),
321                                expr: Some(expr_impl.to_expr_proto()),
322                                // persist the original expression
323                            }));
324                    } else {
325                        return Err(ErrorCode::BindError(format!(
326                            "Default expression used in column `{}` cannot be evaluated. \
327                            Use generated columns instead if you mean to reference other columns.",
328                            column.name
329                        ))
330                        .into());
331                    }
332                }
333                ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
334                    // When a `DEFAULT INTERNAL` is used internally for schema change, the persisted value
335                    // should already be set during purifcation. So if we encounter an empty value here, it
336                    // means the user has specified it explicitly in the SQL statement, typically by
337                    // directly copying the result of `SHOW CREATE TABLE` and executing it.
338                    if persisted.is_empty() {
339                        bail_bind_error!(
340                            "DEFAULT INTERNAL is only used for internal purposes, \
341                             please specify a concrete default value"
342                        );
343                    }
344
345                    let desc = DefaultColumnDesc::decode(&**persisted)
346                        .expect("failed to decode persisted `DefaultColumnDesc`");
347
348                    column_catalogs[idx].column_desc.generated_or_default_column =
349                        Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
350                }
351                _ => {}
352            }
353        }
354    }
355    Ok(())
356}
357
358/// Currently we only support Primary key table constraint, so just return pk names if it exists
359pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
360    let mut pk_column_names = vec![];
361
362    for constraint in table_constraints {
363        match constraint {
364            TableConstraint::Unique {
365                name: _,
366                columns,
367                is_primary: true,
368            } => {
369                if !pk_column_names.is_empty() {
370                    return Err(multiple_pk_definition_err());
371                }
372                pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
373            }
374            _ => bail_not_implemented!("table constraint \"{}\"", constraint),
375        }
376    }
377    Ok(pk_column_names)
378}
379
380pub fn bind_sql_pk_names(
381    columns_defs: &[ColumnDef],
382    pk_names_from_table_constraints: Vec<String>,
383) -> Result<Vec<String>> {
384    let mut pk_column_names = pk_names_from_table_constraints;
385
386    for column in columns_defs {
387        for option_def in &column.options {
388            if let ColumnOption::Unique { is_primary: true } = option_def.option {
389                if !pk_column_names.is_empty() {
390                    return Err(multiple_pk_definition_err());
391                }
392                pk_column_names.push(column.name.real_value());
393            };
394        }
395    }
396
397    Ok(pk_column_names)
398}
399
400fn multiple_pk_definition_err() -> RwError {
401    ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
402}
403
404/// Binds primary keys defined in SQL.
405///
406/// It returns the columns together with `pk_column_ids`, and an optional row id column index if
407/// added.
408pub fn bind_pk_and_row_id_on_relation(
409    mut columns: Vec<ColumnCatalog>,
410    pk_names: Vec<String>,
411    must_need_pk: bool,
412) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
413    for c in &columns {
414        assert!(c.column_id() != ColumnId::placeholder());
415    }
416
417    // Mapping from column name to column id.
418    let name_to_id = columns
419        .iter()
420        .map(|c| (c.name(), c.column_id()))
421        .collect::<HashMap<_, _>>();
422
423    let mut pk_column_ids: Vec<_> = pk_names
424        .iter()
425        .map(|name| {
426            name_to_id.get(name.as_str()).copied().ok_or_else(|| {
427                ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
428            })
429        })
430        .try_collect()?;
431
432    // Add `_row_id` column if `pk_column_ids` is empty and must_need_pk
433    let need_row_id = pk_column_ids.is_empty() && must_need_pk;
434
435    let row_id_index = need_row_id.then(|| {
436        let column = ColumnCatalog::row_id_column();
437        let index = columns.len();
438        pk_column_ids = vec![column.column_id()];
439        columns.push(column);
440        index
441    });
442
443    if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
444        Err(ErrorCode::InvalidInputSyntax(format!(
445            "column \"{col}\" specified more than once"
446        )))?;
447    }
448
449    Ok((columns, pk_column_ids, row_id_index))
450}
451
452/// `gen_create_table_plan_with_source` generates the plan for creating a table with an external
453/// stream source.
454#[allow(clippy::too_many_arguments)]
455pub(crate) async fn gen_create_table_plan_with_source(
456    mut handler_args: HandlerArgs,
457    explain_options: ExplainOptions,
458    table_name: ObjectName,
459    column_defs: Vec<ColumnDef>,
460    wildcard_idx: Option<usize>,
461    constraints: Vec<TableConstraint>,
462    format_encode: FormatEncodeOptions,
463    source_watermarks: Vec<SourceWatermark>,
464    mut col_id_gen: ColumnIdGenerator,
465    include_column_options: IncludeOption,
466    props: CreateTableProps,
467    sql_column_strategy: SqlColumnStrategy,
468) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog)> {
469    if props.append_only
470        && format_encode.format != Format::Plain
471        && format_encode.format != Format::Native
472    {
473        return Err(ErrorCode::BindError(format!(
474            "Append only table does not support format {}.",
475            format_encode.format
476        ))
477        .into());
478    }
479
480    let session = &handler_args.session;
481    let with_properties = bind_connector_props(&handler_args, &format_encode, false)?;
482    if with_properties.is_shareable_cdc_connector() {
483        generated_columns_check_for_cdc_table(&column_defs)?;
484        not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
485    } else if column_defs.iter().any(|col| {
486        col.options
487            .iter()
488            .any(|def| matches!(def.option, ColumnOption::NotNull))
489    }) {
490        // if non-cdc source
491        notice_to_user(
492            "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
493        );
494    }
495
496    let db_name: &str = &session.database();
497    let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
498
499    // TODO: omit this step if `sql_column_strategy` is `Follow`.
500    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
501        session,
502        &format_encode,
503        Either::Left(&with_properties),
504        CreateSourceType::Table,
505    )
506    .await?;
507
508    let overwrite_options = OverwriteOptions::new(&mut handler_args);
509    let rate_limit = overwrite_options.source_rate_limit;
510    let source = bind_create_source_or_table_with_connector(
511        handler_args.clone(),
512        table_name,
513        format_encode,
514        with_properties,
515        &column_defs,
516        constraints,
517        wildcard_idx,
518        source_watermarks,
519        columns_from_resolve_source,
520        source_info,
521        include_column_options,
522        &mut col_id_gen,
523        CreateSourceType::Table,
524        rate_limit,
525        sql_column_strategy,
526    )
527    .await?;
528
529    let context = OptimizerContext::new(handler_args, explain_options);
530
531    let (plan, table) = gen_table_plan_with_source(
532        context.into(),
533        schema_name,
534        source.clone(),
535        col_id_gen.into_version(),
536        props,
537    )?;
538
539    Ok((plan, Some(source), table))
540}
541
542/// `gen_create_table_plan` generates the plan for creating a table without an external stream
543/// source.
544#[allow(clippy::too_many_arguments)]
545pub(crate) fn gen_create_table_plan(
546    context: OptimizerContext,
547    table_name: ObjectName,
548    column_defs: Vec<ColumnDef>,
549    constraints: Vec<TableConstraint>,
550    mut col_id_gen: ColumnIdGenerator,
551    source_watermarks: Vec<SourceWatermark>,
552    props: CreateTableProps,
553    is_for_replace_plan: bool,
554) -> Result<(PlanRef, TableCatalog)> {
555    let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
556    for c in &mut columns {
557        col_id_gen.generate(c)?;
558    }
559
560    let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
561    if !secret_refs.is_empty() || !connection_refs.is_empty() {
562        return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
563    }
564
565    gen_create_table_plan_without_source(
566        context,
567        table_name,
568        columns,
569        column_defs,
570        constraints,
571        source_watermarks,
572        col_id_gen.into_version(),
573        props,
574    )
575}
576
577#[allow(clippy::too_many_arguments)]
578pub(crate) fn gen_create_table_plan_without_source(
579    context: OptimizerContext,
580    table_name: ObjectName,
581    columns: Vec<ColumnCatalog>,
582    column_defs: Vec<ColumnDef>,
583    constraints: Vec<TableConstraint>,
584    source_watermarks: Vec<SourceWatermark>,
585    version: TableVersion,
586    props: CreateTableProps,
587) -> Result<(PlanRef, TableCatalog)> {
588    // XXX: Why not bind outside?
589    let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
590    let (mut columns, pk_column_ids, row_id_index) =
591        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
592
593    let watermark_descs: Vec<WatermarkDesc> = bind_source_watermark(
594        context.session_ctx(),
595        table_name.real_value(),
596        source_watermarks,
597        &columns,
598    )?;
599
600    bind_sql_column_constraints(
601        context.session_ctx(),
602        table_name.real_value(),
603        &mut columns,
604        &column_defs,
605        &pk_column_ids,
606    )?;
607    let session = context.session_ctx().clone();
608
609    let db_name = &session.database();
610    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
611
612    let info = CreateTableInfo {
613        columns,
614        pk_column_ids,
615        row_id_index,
616        watermark_descs,
617        source_catalog: None,
618        version,
619    };
620
621    gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
622}
623
624fn gen_table_plan_with_source(
625    context: OptimizerContextRef,
626    schema_name: Option<String>,
627    source_catalog: SourceCatalog,
628    version: TableVersion,
629    props: CreateTableProps,
630) -> Result<(PlanRef, TableCatalog)> {
631    let table_name = source_catalog.name.clone();
632
633    let info = CreateTableInfo {
634        columns: source_catalog.columns.clone(),
635        pk_column_ids: source_catalog.pk_col_ids.clone(),
636        row_id_index: source_catalog.row_id_index,
637        watermark_descs: source_catalog.watermark_descs.clone(),
638        source_catalog: Some(source_catalog),
639        version,
640    };
641
642    gen_table_plan_inner(context, schema_name, table_name, info, props)
643}
644
645/// On-conflict behavior either from user input or existing table catalog.
646#[derive(Clone, Copy)]
647pub enum EitherOnConflict {
648    Ast(Option<OnConflict>),
649    Resolved(ConflictBehavior),
650}
651
652impl From<Option<OnConflict>> for EitherOnConflict {
653    fn from(v: Option<OnConflict>) -> Self {
654        Self::Ast(v)
655    }
656}
657
658impl From<ConflictBehavior> for EitherOnConflict {
659    fn from(v: ConflictBehavior) -> Self {
660        Self::Resolved(v)
661    }
662}
663
664impl EitherOnConflict {
665    /// Resolves the conflict behavior based on the given information.
666    pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
667        let conflict_behavior = match self {
668            EitherOnConflict::Ast(on_conflict) => {
669                if append_only {
670                    if row_id_as_pk {
671                        // Primary key will be generated, no conflict check needed.
672                        ConflictBehavior::NoCheck
673                    } else {
674                        // User defined PK on append-only table, enforce `DO NOTHING`.
675                        if let Some(on_conflict) = on_conflict
676                            && on_conflict != OnConflict::Nothing
677                        {
678                            return Err(ErrorCode::InvalidInputSyntax(
679                                "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
680                                     the ON CONFLICT behavior must be DO NOTHING."
681                                    .to_owned(),
682                            )
683                            .into());
684                        }
685                        ConflictBehavior::IgnoreConflict
686                    }
687                } else {
688                    // Default to `UPDATE FULL` for non-append-only tables.
689                    match on_conflict.unwrap_or(OnConflict::UpdateFull) {
690                        OnConflict::UpdateFull => ConflictBehavior::Overwrite,
691                        OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
692                        OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
693                    }
694                }
695            }
696            EitherOnConflict::Resolved(b) => b,
697        };
698
699        Ok(conflict_behavior)
700    }
701}
702
703/// Arguments of the functions that generate a table plan, part 1.
704///
705/// Compared to [`CreateTableProps`], this struct contains fields that need some work of binding
706/// or resolving based on the user input.
707pub struct CreateTableInfo {
708    pub columns: Vec<ColumnCatalog>,
709    pub pk_column_ids: Vec<ColumnId>,
710    pub row_id_index: Option<usize>,
711    pub watermark_descs: Vec<WatermarkDesc>,
712    pub source_catalog: Option<SourceCatalog>,
713    pub version: TableVersion,
714}
715
716/// Arguments of the functions that generate a table plan, part 2.
717///
718/// Compared to [`CreateTableInfo`], this struct contains fields that can be (relatively) simply
719/// obtained from the input or the context.
720pub struct CreateTableProps {
721    pub definition: String,
722    pub append_only: bool,
723    pub on_conflict: EitherOnConflict,
724    pub with_version_columns: Vec<String>,
725    pub webhook_info: Option<PbWebhookSourceInfo>,
726    pub engine: Engine,
727}
728
729#[allow(clippy::too_many_arguments)]
730fn gen_table_plan_inner(
731    context: OptimizerContextRef,
732    schema_name: Option<String>,
733    table_name: String,
734    info: CreateTableInfo,
735    props: CreateTableProps,
736) -> Result<(PlanRef, TableCatalog)> {
737    let CreateTableInfo {
738        ref columns,
739        row_id_index,
740        ref watermark_descs,
741        ref source_catalog,
742        ..
743    } = info;
744    let CreateTableProps { append_only, .. } = props;
745
746    let (database_id, schema_id) = context
747        .session_ctx()
748        .get_database_and_schema_id_for_create(schema_name)?;
749
750    let session = context.session_ctx().clone();
751    let retention_seconds = context.with_options().retention_seconds();
752
753    let source_node: LogicalPlanRef = LogicalSource::new(
754        source_catalog.clone().map(Rc::new),
755        columns.clone(),
756        row_id_index,
757        SourceNodeKind::CreateTable,
758        context.clone(),
759        None,
760    )?
761    .into();
762
763    let required_cols = FixedBitSet::with_capacity(columns.len());
764    let plan_root = PlanRoot::new_with_logical_plan(
765        source_node,
766        RequiredDist::Any,
767        Order::any(),
768        required_cols,
769        vec![],
770    );
771
772    if !append_only && !watermark_descs.is_empty() {
773        return Err(ErrorCode::NotSupported(
774            "Defining watermarks on table requires the table to be append only.".to_owned(),
775            "Use the key words `APPEND ONLY`".to_owned(),
776        )
777        .into());
778    }
779
780    if !append_only && retention_seconds.is_some() {
781        if session
782            .config()
783            .unsafe_enable_storage_retention_for_non_append_only_tables()
784        {
785            tracing::warn!(
786                "Storage retention is enabled for non-append-only table {}. This may lead to stream inconsistency.",
787                table_name
788            );
789            const NOTICE: &str = "Storage retention is enabled for non-append-only table. \
790                                  This may lead to stream inconsistency and unrecoverable \
791                                  node failure if there is any row INSERT/UPDATE/DELETE operation \
792                                  corresponding to the TTLed primary keys";
793            session.notice_to_user(NOTICE);
794        } else {
795            return Err(ErrorCode::NotSupported(
796                "Defining retention seconds on table requires the table to be append only."
797                    .to_owned(),
798                "Use the key words `APPEND ONLY`".to_owned(),
799            )
800            .into());
801        }
802    }
803
804    let materialize =
805        plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
806
807    let mut table = materialize.table().clone();
808    table.owner = session.user_id();
809
810    Ok((materialize.into(), table))
811}
812
813/// Generate stream plan for cdc table based on shared source.
814/// In replace workflow, the `table_id` is the id of the table to be replaced
815/// in create table workflow, the `table_id` is a placeholder will be filled in the Meta
816#[allow(clippy::too_many_arguments)]
817pub(crate) fn gen_create_table_plan_for_cdc_table(
818    context: OptimizerContextRef,
819    source: Arc<SourceCatalog>,
820    external_table_name: String,
821    column_defs: Vec<ColumnDef>,
822    mut columns: Vec<ColumnCatalog>,
823    pk_names: Vec<String>,
824    cdc_with_options: WithOptionsSecResolved,
825    mut col_id_gen: ColumnIdGenerator,
826    on_conflict: Option<OnConflict>,
827    with_version_columns: Vec<String>,
828    include_column_options: IncludeOption,
829    table_name: ObjectName,
830    resolved_table_name: String, // table name without schema prefix
831    database_id: DatabaseId,
832    schema_id: SchemaId,
833    table_id: TableId,
834    engine: Engine,
835) -> Result<(PlanRef, TableCatalog)> {
836    let session = context.session_ctx().clone();
837
838    // append additional columns to the end
839    handle_addition_columns(
840        None,
841        &cdc_with_options,
842        include_column_options,
843        &mut columns,
844        true,
845    )?;
846
847    for c in &mut columns {
848        col_id_gen.generate(c)?;
849    }
850
851    let (mut columns, pk_column_ids, _row_id_index) =
852        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
853
854    // NOTES: In auto schema change, default value is not provided in column definition.
855    bind_sql_column_constraints(
856        context.session_ctx(),
857        table_name.real_value(),
858        &mut columns,
859        &column_defs,
860        &pk_column_ids,
861    )?;
862
863    let definition = context.normalized_sql().to_owned();
864
865    let pk_column_indices = {
866        let mut id_to_idx = HashMap::new();
867        columns.iter().enumerate().for_each(|(idx, c)| {
868            id_to_idx.insert(c.column_id(), idx);
869        });
870        // pk column id must exist in table columns.
871        pk_column_ids
872            .iter()
873            .map(|c| id_to_idx.get(c).copied().unwrap())
874            .collect_vec()
875    };
876    let table_pk = pk_column_indices
877        .iter()
878        .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
879        .collect();
880
881    let (options, secret_refs) = cdc_with_options.into_parts();
882
883    let non_generated_column_descs = columns
884        .iter()
885        .filter(|&c| !c.is_generated())
886        .map(|c| c.column_desc.clone())
887        .collect_vec();
888    let non_generated_column_num = non_generated_column_descs.len();
889    let cdc_table_type = ExternalCdcTableType::from_properties(&options);
890    let cdc_table_desc = CdcTableDesc {
891        table_id,
892        source_id: source.id.into(), // id of cdc source streaming job
893        external_table_name: external_table_name.clone(),
894        pk: table_pk,
895        columns: non_generated_column_descs,
896        stream_key: pk_column_indices,
897        connect_properties: options,
898        secret_refs,
899    };
900
901    tracing::debug!(?cdc_table_desc, "create cdc table");
902    let options = build_cdc_scan_options_with_options(context.with_options(), &cdc_table_type)?;
903
904    let logical_scan = LogicalCdcScan::create(
905        external_table_name.clone(),
906        Rc::new(cdc_table_desc),
907        context.clone(),
908        options,
909    );
910
911    let scan_node: LogicalPlanRef = logical_scan.into();
912    let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
913    let plan_root = PlanRoot::new_with_logical_plan(
914        scan_node,
915        RequiredDist::Any,
916        Order::any(),
917        required_cols,
918        vec![],
919    );
920
921    let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
922    let materialize = plan_root.gen_table_plan(
923        context,
924        resolved_table_name,
925        database_id,
926        schema_id,
927        CreateTableInfo {
928            columns,
929            pk_column_ids,
930            row_id_index: None,
931            watermark_descs: vec![],
932            source_catalog: Some((*source).clone()),
933            version: col_id_gen.into_version(),
934        },
935        CreateTableProps {
936            definition,
937            append_only: false,
938            on_conflict: on_conflict.into(),
939            with_version_columns,
940            webhook_info: None,
941            engine,
942        },
943    )?;
944
945    let mut table = materialize.table().clone();
946    table.owner = session.user_id();
947    table.cdc_table_id = Some(cdc_table_id);
948    table.cdc_table_type = Some(cdc_table_type);
949    Ok((materialize.into(), table))
950}
951
952fn derive_with_options_for_cdc_table(
953    source_with_properties: &WithOptionsSecResolved,
954    external_table_name: String,
955) -> Result<WithOptionsSecResolved> {
956    use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
957    // we should remove the prefix from `full_table_name`
958    let source_database_name: &str = source_with_properties
959        .get("database.name")
960        .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
961        .as_str();
962    let mut with_options = source_with_properties.clone();
963    if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
964        match connector.as_str() {
965            MYSQL_CDC_CONNECTOR => {
966                // MySQL doesn't allow '.' in database name and table name, so we can split the
967                // external table name by '.' to get the table name
968                let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
969                    anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
970                })?;
971                // We allow multiple database names in the source definition
972                if !source_database_name
973                    .split(',')
974                    .map(|s| s.trim())
975                    .any(|name| name == db_name)
976                {
977                    return Err(anyhow!(
978                        "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
979                        db_name,
980                        source_database_name
981                    ).into());
982                }
983                with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
984                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
985            }
986            POSTGRES_CDC_CONNECTOR => {
987                let (schema_name, table_name) = external_table_name
988                    .split_once('.')
989                    .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
990
991                // insert 'schema.name' into connect properties
992                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
993                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
994            }
995            SQL_SERVER_CDC_CONNECTOR => {
996                // SQL Server external table name can be in different formats:
997                // 1. 'databaseName.schemaName.tableName' (full format)
998                // 2. 'schemaName.tableName' (schema and table only)
999                // 3. 'tableName' (table only, will use default schema 'dbo')
1000                // We will auto-fill missing parts from source configuration
1001                let parts: Vec<&str> = external_table_name.split('.').collect();
1002                let (_, schema_name, table_name) = match parts.len() {
1003                    3 => {
1004                        // Full format: database.schema.table
1005                        let db_name = parts[0];
1006                        let schema_name = parts[1];
1007                        let table_name = parts[2];
1008
1009                        // Verify database name matches source configuration
1010                        if db_name != source_database_name {
1011                            return Err(anyhow!(
1012                                "The database name `{}` in the FROM clause is not the same as the database name `{}` in source definition",
1013                                db_name,
1014                                source_database_name
1015                            ).into());
1016                        }
1017                        (db_name, schema_name, table_name)
1018                    }
1019                    2 => {
1020                        // Schema and table only: schema.table
1021                        let schema_name = parts[0];
1022                        let table_name = parts[1];
1023                        (source_database_name, schema_name, table_name)
1024                    }
1025                    1 => {
1026                        // Table only: table (use default schema 'dbo')
1027                        let table_name = parts[0];
1028                        (source_database_name, "dbo", table_name)
1029                    }
1030                    _ => {
1031                        return Err(anyhow!(
1032                            "The upstream table name must be in one of these formats: 'database.schema.table', 'schema.table', or 'table'"
1033                        ).into());
1034                    }
1035                };
1036
1037                // insert 'schema.name' into connect properties
1038                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1039                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1040            }
1041            _ => {
1042                return Err(RwError::from(anyhow!(
1043                    "connector {} is not supported for cdc table",
1044                    connector
1045                )));
1046            }
1047        };
1048    }
1049    Ok(with_options)
1050}
1051
1052#[allow(clippy::too_many_arguments)]
1053pub(super) async fn handle_create_table_plan(
1054    handler_args: HandlerArgs,
1055    explain_options: ExplainOptions,
1056    format_encode: Option<FormatEncodeOptions>,
1057    cdc_table_info: Option<CdcTableInfo>,
1058    table_name: &ObjectName,
1059    column_defs: Vec<ColumnDef>,
1060    wildcard_idx: Option<usize>,
1061    constraints: Vec<TableConstraint>,
1062    source_watermarks: Vec<SourceWatermark>,
1063    append_only: bool,
1064    on_conflict: Option<OnConflict>,
1065    with_version_columns: Vec<String>,
1066    include_column_options: IncludeOption,
1067    webhook_info: Option<WebhookSourceInfo>,
1068    engine: Engine,
1069) -> Result<(
1070    PlanRef,
1071    Option<SourceCatalog>,
1072    TableCatalog,
1073    TableJobType,
1074    Option<SourceId>,
1075)> {
1076    let col_id_gen = ColumnIdGenerator::new_initial();
1077    let format_encode = check_create_table_with_source(
1078        &handler_args.with_options,
1079        format_encode,
1080        &include_column_options,
1081        &cdc_table_info,
1082    )?;
1083    let webhook_info = webhook_info
1084        .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1085        .transpose()?;
1086
1087    let props = CreateTableProps {
1088        definition: handler_args.normalized_sql.clone(),
1089        append_only,
1090        on_conflict: on_conflict.into(),
1091        with_version_columns: with_version_columns.clone(),
1092        webhook_info,
1093        engine,
1094    };
1095
1096    let ((plan, source, table), job_type, shared_shource_id) = match (
1097        format_encode,
1098        cdc_table_info.as_ref(),
1099    ) {
1100        (Some(format_encode), None) => (
1101            gen_create_table_plan_with_source(
1102                handler_args,
1103                explain_options,
1104                table_name.clone(),
1105                column_defs,
1106                wildcard_idx,
1107                constraints,
1108                format_encode,
1109                source_watermarks,
1110                col_id_gen,
1111                include_column_options,
1112                props,
1113                SqlColumnStrategy::FollowChecked,
1114            )
1115            .await?,
1116            TableJobType::General,
1117            None,
1118        ),
1119        (None, None) => {
1120            let context = OptimizerContext::new(handler_args, explain_options);
1121            let (plan, table) = gen_create_table_plan(
1122                context,
1123                table_name.clone(),
1124                column_defs,
1125                constraints,
1126                col_id_gen,
1127                source_watermarks,
1128                props,
1129                false,
1130            )?;
1131
1132            ((plan, None, table), TableJobType::General, None)
1133        }
1134
1135        (None, Some(cdc_table)) => {
1136            sanity_check_for_table_on_cdc_source(
1137                append_only,
1138                &column_defs,
1139                &wildcard_idx,
1140                &constraints,
1141                &source_watermarks,
1142            )?;
1143
1144            generated_columns_check_for_cdc_table(&column_defs)?;
1145            not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1146
1147            let session = &handler_args.session;
1148            let db_name = &session.database();
1149            let user_name = &session.user_name();
1150            let search_path = session.config().search_path();
1151            let (schema_name, resolved_table_name) =
1152                Binder::resolve_schema_qualified_name(db_name, table_name)?;
1153            let (database_id, schema_id) =
1154                session.get_database_and_schema_id_for_create(schema_name.clone())?;
1155
1156            // cdc table cannot be append-only
1157            let (format_encode, source_name) =
1158                Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
1159
1160            let source = {
1161                let catalog_reader = session.env().catalog_reader().read_guard();
1162                let schema_path =
1163                    SchemaPath::new(format_encode.as_deref(), &search_path, user_name);
1164
1165                let (source, _) = catalog_reader.get_source_by_name(
1166                    db_name,
1167                    schema_path,
1168                    source_name.as_str(),
1169                )?;
1170                source.clone()
1171            };
1172            let cdc_with_options: WithOptionsSecResolved = derive_with_options_for_cdc_table(
1173                &source.with_properties,
1174                cdc_table.external_table_name.clone(),
1175            )?;
1176
1177            let (columns, pk_names) = match wildcard_idx {
1178                Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1179                None => {
1180                    for column_def in &column_defs {
1181                        for option_def in &column_def.options {
1182                            if let ColumnOption::DefaultValue(_)
1183                            | ColumnOption::DefaultValueInternal { .. } = option_def.option
1184                            {
1185                                return Err(ErrorCode::NotSupported(
1186                                            "Default value for columns defined on the table created from a CDC source".into(),
1187                                            "Remove the default value expression in the column definitions".into(),
1188                                        )
1189                                            .into());
1190                            }
1191                        }
1192                    }
1193
1194                    let (columns, pk_names) =
1195                        bind_cdc_table_schema(&column_defs, &constraints, false)?;
1196                    // read default value definition from external db
1197                    let (options, secret_refs) = cdc_with_options.clone().into_parts();
1198                    let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1199                        .context("failed to extract external table config")?;
1200
1201                    (columns, pk_names)
1202                }
1203            };
1204
1205            let context: OptimizerContextRef =
1206                OptimizerContext::new(handler_args, explain_options).into();
1207            let shared_source_id = source.id;
1208            let (plan, table) = gen_create_table_plan_for_cdc_table(
1209                context,
1210                source,
1211                cdc_table.external_table_name.clone(),
1212                column_defs,
1213                columns,
1214                pk_names,
1215                cdc_with_options,
1216                col_id_gen,
1217                on_conflict,
1218                with_version_columns,
1219                include_column_options,
1220                table_name.clone(),
1221                resolved_table_name,
1222                database_id,
1223                schema_id,
1224                TableId::placeholder(),
1225                engine,
1226            )?;
1227
1228            (
1229                (plan, None, table),
1230                TableJobType::SharedCdcSource,
1231                Some(shared_source_id),
1232            )
1233        }
1234        (Some(_), Some(_)) => {
1235            return Err(ErrorCode::NotSupported(
1236                "Data format and encoding format doesn't apply to table created from a CDC source"
1237                    .into(),
1238                "Remove the FORMAT and ENCODE specification".into(),
1239            )
1240            .into());
1241        }
1242    };
1243    Ok((plan, source, table, job_type, shared_shource_id))
1244}
1245
1246// For both table from cdc source and table with cdc connector
1247fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1248    let mut found_generated_column = false;
1249    for column in columns {
1250        let mut is_generated = false;
1251
1252        for option_def in &column.options {
1253            if let ColumnOption::GeneratedColumns(_) = option_def.option {
1254                is_generated = true;
1255                break;
1256            }
1257        }
1258
1259        if is_generated {
1260            found_generated_column = true;
1261        } else if found_generated_column {
1262            return Err(ErrorCode::NotSupported(
1263                "Non-generated column found after a generated column.".into(),
1264                "Ensure that all generated columns appear at the end of the cdc table definition."
1265                    .into(),
1266            )
1267            .into());
1268        }
1269    }
1270    Ok(())
1271}
1272
1273// For both table from cdc source and table with cdc connector
1274fn not_null_check_for_cdc_table(
1275    wildcard_idx: &Option<usize>,
1276    column_defs: &Vec<ColumnDef>,
1277) -> Result<()> {
1278    if !wildcard_idx.is_some()
1279        && column_defs.iter().any(|col| {
1280            col.options
1281                .iter()
1282                .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1283        })
1284    {
1285        return Err(ErrorCode::NotSupported(
1286            "CDC table with NOT NULL constraint is not supported".to_owned(),
1287            "Please remove the NOT NULL constraint for columns".to_owned(),
1288        )
1289        .into());
1290    }
1291    Ok(())
1292}
1293
1294// Only for table from cdc source
1295fn sanity_check_for_table_on_cdc_source(
1296    append_only: bool,
1297    column_defs: &Vec<ColumnDef>,
1298    wildcard_idx: &Option<usize>,
1299    constraints: &Vec<TableConstraint>,
1300    source_watermarks: &Vec<SourceWatermark>,
1301) -> Result<()> {
1302    // wildcard cannot be used with column definitions
1303    if wildcard_idx.is_some() && !column_defs.is_empty() {
1304        return Err(ErrorCode::NotSupported(
1305            "wildcard(*) and column definitions cannot be used together".to_owned(),
1306            "Remove the wildcard or column definitions".to_owned(),
1307        )
1308        .into());
1309    }
1310
1311    // cdc table must have primary key constraint or primary key column
1312    if !wildcard_idx.is_some()
1313        && !constraints.iter().any(|c| {
1314            matches!(
1315                c,
1316                TableConstraint::Unique {
1317                    is_primary: true,
1318                    ..
1319                }
1320            )
1321        })
1322        && !column_defs.iter().any(|col| {
1323            col.options
1324                .iter()
1325                .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1326        })
1327    {
1328        return Err(ErrorCode::NotSupported(
1329            "CDC table without primary key constraint is not supported".to_owned(),
1330            "Please define a primary key".to_owned(),
1331        )
1332        .into());
1333    }
1334
1335    if append_only {
1336        return Err(ErrorCode::NotSupported(
1337            "append only modifier on the table created from a CDC source".into(),
1338            "Remove the APPEND ONLY clause".into(),
1339        )
1340        .into());
1341    }
1342
1343    if !source_watermarks.is_empty() {
1344        return Err(ErrorCode::NotSupported(
1345            "watermark defined on the table created from a CDC source".into(),
1346            "Remove the Watermark definitions".into(),
1347        )
1348        .into());
1349    }
1350
1351    Ok(())
1352}
1353
1354/// Derive schema for cdc table when create a new Table or alter an existing Table
1355async fn bind_cdc_table_schema_externally(
1356    cdc_with_options: WithOptionsSecResolved,
1357) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1358    // read cdc table schema from external db or parsing the schema from SQL definitions
1359    let (options, secret_refs) = cdc_with_options.into_parts();
1360    let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1361        .context("failed to extract external table config")?;
1362
1363    let table = ExternalTableImpl::connect(config)
1364        .await
1365        .context("failed to auto derive table schema")?;
1366
1367    Ok((
1368        table
1369            .column_descs()
1370            .iter()
1371            .cloned()
1372            .map(|column_desc| ColumnCatalog {
1373                column_desc,
1374                is_hidden: false,
1375            })
1376            .collect(),
1377        table.pk_names().clone(),
1378    ))
1379}
1380
1381/// Derive schema for cdc table when create a new Table or alter an existing Table
1382fn bind_cdc_table_schema(
1383    column_defs: &Vec<ColumnDef>,
1384    constraints: &Vec<TableConstraint>,
1385    is_for_replace_plan: bool,
1386) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1387    let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1388
1389    let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1390    Ok((columns, pk_names))
1391}
1392
1393#[allow(clippy::too_many_arguments)]
1394pub async fn handle_create_table(
1395    mut handler_args: HandlerArgs,
1396    table_name: ObjectName,
1397    column_defs: Vec<ColumnDef>,
1398    wildcard_idx: Option<usize>,
1399    constraints: Vec<TableConstraint>,
1400    if_not_exists: bool,
1401    format_encode: Option<FormatEncodeOptions>,
1402    source_watermarks: Vec<SourceWatermark>,
1403    append_only: bool,
1404    on_conflict: Option<OnConflict>,
1405    with_version_columns: Vec<String>,
1406    cdc_table_info: Option<CdcTableInfo>,
1407    include_column_options: IncludeOption,
1408    webhook_info: Option<WebhookSourceInfo>,
1409    ast_engine: risingwave_sqlparser::ast::Engine,
1410) -> Result<RwPgResponse> {
1411    let session = handler_args.session.clone();
1412
1413    if append_only {
1414        session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1415    }
1416
1417    session.check_cluster_limits().await?;
1418
1419    let engine = match ast_engine {
1420        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1421        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1422    };
1423    if engine == Engine::Iceberg && handler_args.with_options.get_connector().is_some() {
1424        // HACK: since we don't have atomic DDL, table with connector may lose data.
1425        // FIXME: remove this after https://github.com/risingwavelabs/risingwave/issues/21863
1426        if let Some(_rate_limit) = handler_args.with_options.insert(
1427            OverwriteOptions::SOURCE_RATE_LIMIT_KEY.to_owned(),
1428            "0".to_owned(),
1429        ) {
1430            // prevent user specified rate limit
1431            return Err(ErrorCode::NotSupported(
1432                "source_rate_limit for iceberg table engine during table creation".to_owned(),
1433                "Please remove source_rate_limit from WITH options.".to_owned(),
1434            )
1435            .into());
1436        }
1437    }
1438
1439    if let Either::Right(resp) = session.check_relation_name_duplicated(
1440        table_name.clone(),
1441        StatementType::CREATE_TABLE,
1442        if_not_exists,
1443    )? {
1444        return Ok(resp);
1445    }
1446
1447    let (graph, source, hummock_table, job_type, shared_source_id) = {
1448        let (plan, source, table, job_type, shared_source_id) = handle_create_table_plan(
1449            handler_args.clone(),
1450            ExplainOptions::default(),
1451            format_encode,
1452            cdc_table_info,
1453            &table_name,
1454            column_defs.clone(),
1455            wildcard_idx,
1456            constraints.clone(),
1457            source_watermarks,
1458            append_only,
1459            on_conflict,
1460            with_version_columns,
1461            include_column_options,
1462            webhook_info,
1463            engine,
1464        )
1465        .await?;
1466        tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1467
1468        let graph = build_graph(plan, Some(GraphJobType::Table))?;
1469
1470        (graph, source, table, job_type, shared_source_id)
1471    };
1472
1473    tracing::trace!(
1474        "name={}, graph=\n{}",
1475        table_name,
1476        serde_json::to_string_pretty(&graph).unwrap()
1477    );
1478
1479    let dependencies = shared_source_id
1480        .map(|id| HashSet::from([id as ObjectId]))
1481        .unwrap_or_default();
1482
1483    // Handle engine
1484    match engine {
1485        Engine::Hummock => {
1486            let catalog_writer = session.catalog_writer()?;
1487            catalog_writer
1488                .create_table(
1489                    source.map(|s| s.to_prost()),
1490                    hummock_table.to_prost(),
1491                    graph,
1492                    job_type,
1493                    if_not_exists,
1494                    dependencies,
1495                )
1496                .await?;
1497        }
1498        Engine::Iceberg => {
1499            let hummock_table_name = hummock_table.name.clone();
1500            session.create_staging_table(hummock_table.clone());
1501            let res = create_iceberg_engine_table(
1502                session.clone(),
1503                handler_args,
1504                source.map(|s| s.to_prost()),
1505                hummock_table,
1506                graph,
1507                table_name,
1508                job_type,
1509                if_not_exists,
1510            )
1511            .await;
1512            session.drop_staging_table(&hummock_table_name);
1513            res?
1514        }
1515    }
1516
1517    Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1518}
1519
1520/// Iceberg table engine is composed of hummock table, iceberg sink and iceberg source.
1521///
1522/// 1. fetch iceberg engine options from the meta node. Or use iceberg engine connection provided by users.
1523/// 2. create a hummock table
1524/// 3. create an iceberg sink
1525/// 4. create an iceberg source
1526///
1527/// See <https://github.com/risingwavelabs/risingwave/issues/21586> for an architecture diagram.
1528#[allow(clippy::too_many_arguments)]
1529pub async fn create_iceberg_engine_table(
1530    session: Arc<SessionImpl>,
1531    handler_args: HandlerArgs,
1532    mut source: Option<PbSource>,
1533    table: TableCatalog,
1534    graph: StreamFragmentGraph,
1535    table_name: ObjectName,
1536    job_type: PbTableJobType,
1537    if_not_exists: bool,
1538) -> Result<()> {
1539    let meta_client = session.env().meta_client();
1540    let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1541
1542    let meta_store_endpoint = url::Url::parse(&meta_store_endpoint).map_err(|_| {
1543        ErrorCode::InternalError("failed to parse the meta store endpoint".to_owned())
1544    })?;
1545    let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1546    let meta_store_user = meta_store_endpoint.username().to_owned();
1547    let meta_store_password = match meta_store_endpoint.password() {
1548        Some(password) => percent_decode_str(password)
1549            .decode_utf8()
1550            .map_err(|_| {
1551                ErrorCode::InternalError(
1552                    "failed to parse password from meta store endpoint".to_owned(),
1553                )
1554            })?
1555            .into_owned(),
1556        None => "".to_owned(),
1557    };
1558    let meta_store_host = meta_store_endpoint
1559        .host_str()
1560        .ok_or_else(|| {
1561            ErrorCode::InternalError("failed to parse host from meta store endpoint".to_owned())
1562        })?
1563        .to_owned();
1564    let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1565        ErrorCode::InternalError("failed to parse port from meta store endpoint".to_owned())
1566    })?;
1567    let meta_store_database = meta_store_endpoint
1568        .path()
1569        .trim_start_matches('/')
1570        .to_owned();
1571
1572    let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1573        bail!("failed to parse meta backend: {}", meta_store_backend);
1574    };
1575
1576    let catalog_uri = match meta_backend {
1577        MetaBackend::Postgres => {
1578            format!(
1579                "jdbc:postgresql://{}:{}/{}",
1580                meta_store_host.clone(),
1581                meta_store_port.clone(),
1582                meta_store_database.clone()
1583            )
1584        }
1585        MetaBackend::Mysql => {
1586            format!(
1587                "jdbc:mysql://{}:{}/{}",
1588                meta_store_host.clone(),
1589                meta_store_port.clone(),
1590                meta_store_database.clone()
1591            )
1592        }
1593        MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1594            bail!(
1595                "Unsupported meta backend for iceberg engine table: {}",
1596                meta_store_backend
1597            );
1598        }
1599    };
1600
1601    let rw_db_name = session
1602        .env()
1603        .catalog_reader()
1604        .read_guard()
1605        .get_database_by_id(&table.database_id)?
1606        .name()
1607        .to_owned();
1608    let rw_schema_name = session
1609        .env()
1610        .catalog_reader()
1611        .read_guard()
1612        .get_schema_by_id(&table.database_id, &table.schema_id)?
1613        .name()
1614        .clone();
1615    let iceberg_catalog_name = rw_db_name.clone();
1616    let iceberg_database_name = rw_schema_name.clone();
1617    let iceberg_table_name = table_name.0.last().unwrap().real_value();
1618
1619    let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1620    let sink_decouple = session.config().sink_decouple();
1621    if matches!(sink_decouple, SinkDecouple::Disable) {
1622        bail!(
1623            "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1624        );
1625    }
1626
1627    let mut connection_ref = BTreeMap::new();
1628    let with_common = if iceberg_engine_connection.is_empty() {
1629        bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1630    } else {
1631        let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1632        assert_eq!(parts.len(), 2);
1633        let connection_catalog =
1634            session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1635        if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1636            if params.connection_type == ConnectionType::Iceberg as i32 {
1637                // With iceberg engine connection:
1638                connection_ref.insert(
1639                    "connection".to_owned(),
1640                    ConnectionRefValue {
1641                        connection_name: ObjectName::from(vec![
1642                            Ident::from(parts[0]),
1643                            Ident::from(parts[1]),
1644                        ]),
1645                    },
1646                );
1647
1648                let mut with_common = BTreeMap::new();
1649                with_common.insert("connector".to_owned(), "iceberg".to_owned());
1650                with_common.insert("database.name".to_owned(), iceberg_database_name);
1651                with_common.insert("table.name".to_owned(), iceberg_table_name);
1652
1653                if let Some(s) = params.properties.get("hosted_catalog")
1654                    && s.eq_ignore_ascii_case("true")
1655                {
1656                    with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1657                    with_common.insert("catalog.uri".to_owned(), catalog_uri);
1658                    with_common.insert("catalog.jdbc.user".to_owned(), meta_store_user);
1659                    with_common.insert(
1660                        "catalog.jdbc.password".to_owned(),
1661                        meta_store_password.clone(),
1662                    );
1663                    with_common.insert("catalog.name".to_owned(), iceberg_catalog_name);
1664                }
1665
1666                with_common
1667            } else {
1668                return Err(RwError::from(ErrorCode::InvalidParameterValue(
1669                    "Only iceberg connection could be used in iceberg engine".to_owned(),
1670                )));
1671            }
1672        } else {
1673            return Err(RwError::from(ErrorCode::InvalidParameterValue(
1674                "Private Link Service has been deprecated. Please create a new connection instead."
1675                    .to_owned(),
1676            )));
1677        }
1678    };
1679
1680    // Iceberg sinks require a primary key, if none is provided, we will use the _row_id column
1681    // Fetch primary key from columns
1682    let mut pks = table
1683        .pk_column_names()
1684        .iter()
1685        .map(|c| c.to_string())
1686        .collect::<Vec<String>>();
1687
1688    // For the table without primary key. We will use `_row_id` as primary key
1689    let sink_from = if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1690        pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1691        let [stmt]: [_; 1] = Parser::parse_sql(&format!(
1692            "select {} as {}, * from {}",
1693            ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, table_name
1694        ))
1695        .context("unable to parse query")?
1696        .try_into()
1697        .unwrap();
1698
1699        let Statement::Query(query) = &stmt else {
1700            panic!("unexpected statement: {:?}", stmt);
1701        };
1702        CreateSink::AsQuery(query.clone())
1703    } else {
1704        CreateSink::From(table_name.clone())
1705    };
1706
1707    let mut sink_name = table_name.clone();
1708    *sink_name.0.last_mut().unwrap() = Ident::from(
1709        (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1710    );
1711    let create_sink_stmt = CreateSinkStatement {
1712        if_not_exists: false,
1713        sink_name,
1714        with_properties: WithProperties(vec![]),
1715        sink_from,
1716        columns: vec![],
1717        emit_mode: None,
1718        sink_schema: None,
1719        into_table_name: None,
1720    };
1721
1722    let mut sink_handler_args = handler_args.clone();
1723
1724    let mut sink_with = with_common.clone();
1725
1726    if table.append_only {
1727        sink_with.insert("type".to_owned(), "append-only".to_owned());
1728    } else {
1729        sink_with.insert("primary_key".to_owned(), pks.join(","));
1730        sink_with.insert("type".to_owned(), "upsert".to_owned());
1731    }
1732    // sink_with.insert(SINK_SNAPSHOT_OPTION.to_owned(), "false".to_owned());
1733    //
1734    // Note: in theory, we don't need to backfill from the table to the sink,
1735    // but we don't have atomic DDL now https://github.com/risingwavelabs/risingwave/issues/21863
1736    // so it may have potential data loss problem on the first barrier.
1737    //
1738    // For non-append-only table, we can always solve it by the initial sink with backfill, since
1739    // data will be present in hummock table.
1740    //
1741    // For append-only table, we need to be more careful.
1742    //
1743    // The possible cases for a table:
1744    // - For table without connector: it doesn't matter, since there's no data before the table is created
1745    // - For table with connector: we workarounded it by setting SOURCE_RATE_LIMIT to 0
1746    //   + If we support blocking DDL for table with connector, we need to be careful.
1747    // - For table with an upstream job: Specifically, CDC table from shared CDC source.
1748    //   + Data may come from both upstream connector, and CDC table backfill, so we need to pause both of them.
1749    //   + For now we don't support APPEND ONLY CDC table, so it's safe.
1750    let commit_checkpoint_interval = handler_args
1751        .with_options
1752        .get(COMMIT_CHECKPOINT_INTERVAL)
1753        .map(|v| v.to_owned())
1754        .unwrap_or_else(|| "60".to_owned());
1755    let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1756        ErrorCode::InvalidInputSyntax(format!(
1757            "commit_checkpoint_interval must be a positive integer: {}",
1758            commit_checkpoint_interval
1759        ))
1760    })?;
1761
1762    if commit_checkpoint_interval == 0 {
1763        bail!("commit_checkpoint_interval must be a positive integer: 0");
1764    }
1765
1766    // remove commit_checkpoint_interval from source options, otherwise it will be considered as an unknown field.
1767    source
1768        .as_mut()
1769        .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1770
1771    let sink_decouple = session.config().sink_decouple();
1772    if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1773        bail!(
1774            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1775        )
1776    }
1777
1778    sink_with.insert(
1779        COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1780        commit_checkpoint_interval.to_string(),
1781    );
1782    sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1783
1784    sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1785
1786    if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1787        match enable_compaction.to_lowercase().as_str() {
1788            "true" => {
1789                sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1790            }
1791            "false" => {
1792                sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1793            }
1794            _ => {
1795                return Err(ErrorCode::InvalidInputSyntax(format!(
1796                    "enable_compaction must be true or false: {}",
1797                    enable_compaction
1798                ))
1799                .into());
1800            }
1801        }
1802
1803        // remove enable_compaction from source options, otherwise it will be considered as an unknown field.
1804        source
1805            .as_mut()
1806            .map(|x| x.with_properties.remove("enable_compaction"));
1807    } else {
1808        sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1809    }
1810
1811    if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1812        let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1813            ErrorCode::InvalidInputSyntax(format!(
1814                "compaction_interval_sec must be a positive integer: {}",
1815                commit_checkpoint_interval
1816            ))
1817        })?;
1818        if compaction_interval_sec == 0 {
1819            bail!("compaction_interval_sec must be a positive integer: 0");
1820        }
1821        sink_with.insert(
1822            "compaction_interval_sec".to_owned(),
1823            compaction_interval_sec.to_string(),
1824        );
1825        // remove compaction_interval_sec from source options, otherwise it will be considered as an unknown field.
1826        source
1827            .as_mut()
1828            .map(|x| x.with_properties.remove("compaction_interval_sec"));
1829    }
1830
1831    let has_enabled_snapshot_expiration = if let Some(enable_snapshot_expiration) =
1832        handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1833    {
1834        // remove enable_snapshot_expiration from source options, otherwise it will be considered as an unknown field.
1835        source
1836            .as_mut()
1837            .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1838        match enable_snapshot_expiration.to_lowercase().as_str() {
1839            "true" => {
1840                sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1841                true
1842            }
1843            "false" => {
1844                sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1845                false
1846            }
1847            _ => {
1848                return Err(ErrorCode::InvalidInputSyntax(format!(
1849                    "enable_snapshot_expiration must be true or false: {}",
1850                    enable_snapshot_expiration
1851                ))
1852                .into());
1853            }
1854        }
1855    } else {
1856        sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1857        true
1858    };
1859
1860    if has_enabled_snapshot_expiration {
1861        // configuration for snapshot expiration
1862        if let Some(snapshot_expiration_retain_last) = handler_args
1863            .with_options
1864            .get(SNAPSHOT_EXPIRATION_RETAIN_LAST)
1865        {
1866            sink_with.insert(
1867                SNAPSHOT_EXPIRATION_RETAIN_LAST.to_owned(),
1868                snapshot_expiration_retain_last.to_owned(),
1869            );
1870            // remove snapshot_expiration_retain_last from source options, otherwise it will be considered as an unknown field.
1871            source
1872                .as_mut()
1873                .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_RETAIN_LAST));
1874        }
1875
1876        if let Some(snapshot_expiration_max_age) = handler_args
1877            .with_options
1878            .get(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS)
1879        {
1880            sink_with.insert(
1881                SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS.to_owned(),
1882                snapshot_expiration_max_age.to_owned(),
1883            );
1884            // remove snapshot_expiration_max_age from source options, otherwise it will be considered as an unknown field.
1885            source
1886                .as_mut()
1887                .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS));
1888        }
1889
1890        if let Some(snapshot_expiration_clear_expired_files) = handler_args
1891            .with_options
1892            .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1893        {
1894            sink_with.insert(
1895                SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES.to_owned(),
1896                snapshot_expiration_clear_expired_files.to_owned(),
1897            );
1898            // remove snapshot_expiration_clear_expired_files from source options, otherwise it will be considered as an unknown field.
1899            source.as_mut().map(|x| {
1900                x.with_properties
1901                    .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1902            });
1903        }
1904
1905        if let Some(snapshot_expiration_clear_expired_meta_data) = handler_args
1906            .with_options
1907            .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1908        {
1909            sink_with.insert(
1910                SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA.to_owned(),
1911                snapshot_expiration_clear_expired_meta_data.to_owned(),
1912            );
1913            // remove snapshot_expiration_clear_expired_meta_data from source options, otherwise it will be considered as an unknown field.
1914            source.as_mut().map(|x| {
1915                x.with_properties
1916                    .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1917            });
1918        }
1919    }
1920
1921    if let Some(write_mode) = handler_args.with_options.get(WRITE_MODE) {
1922        match write_mode.to_lowercase().as_str() {
1923            ICEBERG_WRITE_MODE_MERGE_ON_READ => {
1924                sink_with.insert(
1925                    WRITE_MODE.to_owned(),
1926                    ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
1927                );
1928            }
1929
1930            ICEBERG_WRITE_MODE_COPY_ON_WRITE => {
1931                if table.append_only {
1932                    return Err(ErrorCode::NotSupported(
1933                        "COPY ON WRITE is not supported for append-only iceberg table".to_owned(),
1934                        "Please use MERGE ON READ instead".to_owned(),
1935                    )
1936                    .into());
1937                }
1938
1939                sink_with.insert(
1940                    WRITE_MODE.to_owned(),
1941                    ICEBERG_WRITE_MODE_COPY_ON_WRITE.to_owned(),
1942                );
1943            }
1944
1945            _ => {
1946                return Err(ErrorCode::InvalidInputSyntax(format!(
1947                    "write_mode must be one of: {}, {}",
1948                    ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
1949                ))
1950                .into());
1951            }
1952        }
1953
1954        // remove write_mode from source options, otherwise it will be considered as an unknown field.
1955        source
1956            .as_mut()
1957            .map(|x| x.with_properties.remove("write_mode"));
1958    } else {
1959        sink_with.insert(
1960            WRITE_MODE.to_owned(),
1961            ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
1962        );
1963    }
1964
1965    if let Some(max_snapshots_num_before_compaction) =
1966        handler_args.with_options.get(MAX_SNAPSHOTS_NUM)
1967    {
1968        let max_snapshots_num_before_compaction = max_snapshots_num_before_compaction
1969            .parse::<u32>()
1970            .map_err(|_| {
1971                ErrorCode::InvalidInputSyntax(format!(
1972                    "max_snapshots_num_before_compaction must be a positive integer: {}",
1973                    max_snapshots_num_before_compaction
1974                ))
1975            })?;
1976        if max_snapshots_num_before_compaction == 0 {
1977            bail!("max_snapshots_num_before_compaction must be a positive integer: 0");
1978        }
1979        sink_with.insert(
1980            MAX_SNAPSHOTS_NUM.to_owned(),
1981            max_snapshots_num_before_compaction.to_string(),
1982        );
1983    }
1984
1985    let partition_by = handler_args
1986        .with_options
1987        .get("partition_by")
1988        .map(|v| v.to_owned());
1989
1990    if let Some(partition_by) = &partition_by {
1991        let mut partition_columns = vec![];
1992        for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
1993            table
1994                .columns()
1995                .iter()
1996                .find(|col| col.name().eq_ignore_ascii_case(&column))
1997                .ok_or_else(|| {
1998                    ErrorCode::InvalidInputSyntax(format!(
1999                        "Partition source column does not exist in schema: {}",
2000                        column
2001                    ))
2002                })?;
2003
2004            partition_columns.push(column);
2005        }
2006
2007        ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
2008            |_| {
2009                ErrorCode::InvalidInputSyntax(
2010                    "The partition columns should be the prefix of the primary key".to_owned(),
2011                )
2012            },
2013        )?;
2014
2015        sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
2016
2017        // remove partition_by from source options, otherwise it will be considered as an unknown field.
2018        source
2019            .as_mut()
2020            .map(|x| x.with_properties.remove("partition_by"));
2021    }
2022
2023    sink_handler_args.with_options =
2024        WithOptions::new(sink_with, Default::default(), connection_ref.clone());
2025    let SinkPlanContext {
2026        sink_plan,
2027        sink_catalog,
2028        ..
2029    } = gen_sink_plan(sink_handler_args, create_sink_stmt, None, true).await?;
2030    let sink_graph = build_graph(sink_plan, Some(GraphJobType::Sink))?;
2031
2032    let mut source_name = table_name.clone();
2033    *source_name.0.last_mut().unwrap() = Ident::from(
2034        (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
2035    );
2036    let create_source_stmt = CreateSourceStatement {
2037        temporary: false,
2038        if_not_exists: false,
2039        columns: vec![],
2040        source_name,
2041        wildcard_idx: Some(0),
2042        constraints: vec![],
2043        with_properties: WithProperties(vec![]),
2044        format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
2045        source_watermarks: vec![],
2046        include_column_options: vec![],
2047    };
2048
2049    let mut source_handler_args = handler_args.clone();
2050    let source_with = with_common;
2051    source_handler_args.with_options =
2052        WithOptions::new(source_with, Default::default(), connection_ref);
2053
2054    let overwrite_options = OverwriteOptions::new(&mut source_handler_args);
2055    let format_encode = create_source_stmt.format_encode.into_v2_with_warning();
2056    let with_properties = bind_connector_props(&source_handler_args, &format_encode, true)?;
2057
2058    // Create iceberg sink table, used for iceberg source column binding. See `bind_columns_from_source_for_non_cdc` for more details.
2059    // TODO: We can derive the columns directly from table definition in the future, so that we don't need to pre-create the table catalog.
2060    let (iceberg_catalog, table_identifier) = {
2061        let sink_param = SinkParam::try_from_sink_catalog(sink_catalog.clone())?;
2062        let iceberg_sink = IcebergSink::try_from(sink_param)?;
2063        iceberg_sink.create_table_if_not_exists().await?;
2064
2065        let iceberg_catalog = iceberg_sink.config.create_catalog().await?;
2066        let table_identifier = iceberg_sink.config.full_table_name()?;
2067        (iceberg_catalog, table_identifier)
2068    };
2069
2070    let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
2071    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
2072        &session,
2073        &format_encode,
2074        Either::Left(&with_properties),
2075        create_source_type,
2076    )
2077    .await?;
2078    let mut col_id_gen = ColumnIdGenerator::new_initial();
2079
2080    let iceberg_source_catalog = bind_create_source_or_table_with_connector(
2081        source_handler_args,
2082        create_source_stmt.source_name,
2083        format_encode,
2084        with_properties,
2085        &create_source_stmt.columns,
2086        create_source_stmt.constraints,
2087        create_source_stmt.wildcard_idx,
2088        create_source_stmt.source_watermarks,
2089        columns_from_resolve_source,
2090        source_info,
2091        create_source_stmt.include_column_options,
2092        &mut col_id_gen,
2093        create_source_type,
2094        overwrite_options.source_rate_limit,
2095        SqlColumnStrategy::FollowChecked,
2096    )
2097    .await?;
2098
2099    let has_connector = source.is_some();
2100
2101    // before we create the table, ensure the JVM is initialized as we use jdbc catalog right now.
2102    // If JVM isn't initialized successfully, current not atomic ddl will result in a partially created iceberg engine table.
2103    let _ = Jvm::get_or_init()?;
2104
2105    let catalog_writer = session.catalog_writer()?;
2106    let res = catalog_writer
2107        .create_iceberg_table(
2108            PbTableJobInfo {
2109                source,
2110                table: Some(table.to_prost()),
2111                fragment_graph: Some(graph),
2112                job_type: job_type as _,
2113            },
2114            PbSinkJobInfo {
2115                sink: Some(sink_catalog.to_proto()),
2116                fragment_graph: Some(sink_graph),
2117            },
2118            iceberg_source_catalog.to_prost(),
2119            if_not_exists,
2120        )
2121        .await;
2122    if res.is_err() {
2123        let _ = iceberg_catalog
2124            .drop_table(&table_identifier)
2125            .await
2126            .inspect_err(|err| {
2127                tracing::error!(
2128                    "failed to drop iceberg table {} after create iceberg engine table failed: {}",
2129                    table_identifier,
2130                    err.as_report()
2131                );
2132            });
2133        res?
2134    }
2135
2136    // TODO: remove it together with rate limit rewrite after we support atomic DDL in meta side.
2137    if has_connector {
2138        alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
2139            handler_args,
2140            PbThrottleTarget::TableWithSource,
2141            table_name,
2142            -1,
2143        )
2144        .await?;
2145    }
2146
2147    Ok(())
2148}
2149
2150pub fn check_create_table_with_source(
2151    with_options: &WithOptions,
2152    format_encode: Option<FormatEncodeOptions>,
2153    include_column_options: &IncludeOption,
2154    cdc_table_info: &Option<CdcTableInfo>,
2155) -> Result<Option<FormatEncodeOptions>> {
2156    // skip check for cdc table
2157    if cdc_table_info.is_some() {
2158        return Ok(format_encode);
2159    }
2160    let defined_source = with_options.is_source_connector();
2161
2162    if !include_column_options.is_empty() && !defined_source {
2163        return Err(ErrorCode::InvalidInputSyntax(
2164            "INCLUDE should be used with a connector".to_owned(),
2165        )
2166        .into());
2167    }
2168    if defined_source {
2169        format_encode.as_ref().ok_or_else(|| {
2170            ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
2171        })?;
2172    }
2173    Ok(format_encode)
2174}
2175
2176fn ensure_partition_columns_are_prefix_of_primary_key(
2177    partition_columns: &[String],
2178    primary_key_columns: &[String],
2179) -> std::result::Result<(), String> {
2180    if partition_columns.len() > primary_key_columns.len() {
2181        return Err("Partition columns cannot be longer than primary key columns.".to_owned());
2182    }
2183
2184    for (i, partition_col) in partition_columns.iter().enumerate() {
2185        if primary_key_columns.get(i) != Some(partition_col) {
2186            return Err(format!(
2187                "Partition column '{}' is not a prefix of the primary key.",
2188                partition_col
2189            ));
2190        }
2191    }
2192
2193    Ok(())
2194}
2195
2196#[allow(clippy::too_many_arguments)]
2197pub async fn generate_stream_graph_for_replace_table(
2198    _session: &Arc<SessionImpl>,
2199    table_name: ObjectName,
2200    original_catalog: &Arc<TableCatalog>,
2201    handler_args: HandlerArgs,
2202    statement: Statement,
2203    col_id_gen: ColumnIdGenerator,
2204    sql_column_strategy: SqlColumnStrategy,
2205) -> Result<(
2206    StreamFragmentGraph,
2207    TableCatalog,
2208    Option<SourceCatalog>,
2209    TableJobType,
2210)> {
2211    let Statement::CreateTable {
2212        columns,
2213        constraints,
2214        source_watermarks,
2215        append_only,
2216        on_conflict,
2217        with_version_columns,
2218        wildcard_idx,
2219        cdc_table_info,
2220        format_encode,
2221        include_column_options,
2222        engine,
2223        with_options,
2224        ..
2225    } = statement
2226    else {
2227        panic!("unexpected statement type: {:?}", statement);
2228    };
2229
2230    let format_encode = format_encode
2231        .clone()
2232        .map(|format_encode| format_encode.into_v2_with_warning());
2233
2234    let engine = match engine {
2235        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
2236        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
2237    };
2238
2239    let is_drop_connector =
2240        original_catalog.associated_source_id().is_some() && format_encode.is_none();
2241    if is_drop_connector {
2242        debug_assert!(
2243            source_watermarks.is_empty()
2244                && include_column_options.is_empty()
2245                && with_options
2246                    .iter()
2247                    .all(|opt| opt.name.real_value().to_lowercase() != "connector")
2248        );
2249    }
2250
2251    let props = CreateTableProps {
2252        definition: handler_args.normalized_sql.clone(),
2253        append_only,
2254        on_conflict: on_conflict.into(),
2255        with_version_columns: with_version_columns
2256            .iter()
2257            .map(|col| col.real_value())
2258            .collect(),
2259        webhook_info: original_catalog.webhook_info.clone(),
2260        engine,
2261    };
2262
2263    let ((plan, mut source, mut table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2264        (Some(format_encode), None) => (
2265            gen_create_table_plan_with_source(
2266                handler_args,
2267                ExplainOptions::default(),
2268                table_name,
2269                columns,
2270                wildcard_idx,
2271                constraints,
2272                format_encode,
2273                source_watermarks,
2274                col_id_gen,
2275                include_column_options,
2276                props,
2277                sql_column_strategy,
2278            )
2279            .await?,
2280            TableJobType::General,
2281        ),
2282        (None, None) => {
2283            let context = OptimizerContext::from_handler_args(handler_args);
2284            let (plan, table) = gen_create_table_plan(
2285                context,
2286                table_name,
2287                columns,
2288                constraints,
2289                col_id_gen,
2290                source_watermarks,
2291                props,
2292                true,
2293            )?;
2294            ((plan, None, table), TableJobType::General)
2295        }
2296        (None, Some(cdc_table)) => {
2297            let session = &handler_args.session;
2298            let (source, resolved_table_name, database_id, schema_id) =
2299                get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2300
2301            let cdc_with_options = derive_with_options_for_cdc_table(
2302                &source.with_properties,
2303                cdc_table.external_table_name.clone(),
2304            )?;
2305
2306            let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2307
2308            let context: OptimizerContextRef =
2309                OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2310            let (plan, table) = gen_create_table_plan_for_cdc_table(
2311                context,
2312                source,
2313                cdc_table.external_table_name.clone(),
2314                columns,
2315                column_catalogs,
2316                pk_names,
2317                cdc_with_options,
2318                col_id_gen,
2319                on_conflict,
2320                with_version_columns
2321                    .iter()
2322                    .map(|col| col.real_value())
2323                    .collect(),
2324                include_column_options,
2325                table_name,
2326                resolved_table_name,
2327                database_id,
2328                schema_id,
2329                original_catalog.id(),
2330                engine,
2331            )?;
2332
2333            ((plan, None, table), TableJobType::SharedCdcSource)
2334        }
2335        (Some(_), Some(_)) => {
2336            return Err(ErrorCode::NotSupported(
2337                "Data format and encoding format doesn't apply to table created from a CDC source"
2338                    .into(),
2339                "Remove the FORMAT and ENCODE specification".into(),
2340            )
2341            .into());
2342        }
2343    };
2344
2345    if table.pk_column_ids() != original_catalog.pk_column_ids() {
2346        Err(ErrorCode::InvalidInputSyntax(
2347            "alter primary key of table is not supported".to_owned(),
2348        ))?
2349    }
2350
2351    let graph = build_graph(plan, Some(GraphJobType::Table))?;
2352
2353    // Fill the original table ID.
2354    table.id = original_catalog.id();
2355    if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2356        table.associated_source_id = Some(source_id);
2357
2358        let source = source.as_mut().unwrap();
2359        source.id = source_id.as_raw_id();
2360        source.associated_table_id = Some(table.id());
2361    }
2362
2363    Ok((graph, table, source, job_type))
2364}
2365
2366fn get_source_and_resolved_table_name(
2367    session: &Arc<SessionImpl>,
2368    cdc_table: CdcTableInfo,
2369    table_name: ObjectName,
2370) -> Result<(Arc<SourceCatalog>, String, DatabaseId, SchemaId)> {
2371    let db_name = &session.database();
2372    let (schema_name, resolved_table_name) =
2373        Binder::resolve_schema_qualified_name(db_name, &table_name)?;
2374    let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
2375
2376    let (format_encode, source_name) =
2377        Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
2378
2379    let source = {
2380        let catalog_reader = session.env().catalog_reader().read_guard();
2381        let schema_name = format_encode.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2382        let (source, _) = catalog_reader.get_source_by_name(
2383            db_name,
2384            SchemaPath::Name(schema_name.as_str()),
2385            source_name.as_str(),
2386        )?;
2387        source.clone()
2388    };
2389
2390    Ok((source, resolved_table_name, database_id, schema_id))
2391}
2392
2393// validate the webhook_info and also bind the webhook_info to protobuf
2394fn bind_webhook_info(
2395    session: &Arc<SessionImpl>,
2396    columns_defs: &[ColumnDef],
2397    webhook_info: WebhookSourceInfo,
2398) -> Result<PbWebhookSourceInfo> {
2399    // validate columns
2400    if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &AstDataType::Jsonb
2401    {
2402        return Err(ErrorCode::InvalidInputSyntax(
2403            "Table with webhook source should have exactly one JSONB column".to_owned(),
2404        )
2405        .into());
2406    }
2407
2408    let WebhookSourceInfo {
2409        secret_ref,
2410        signature_expr,
2411        wait_for_persistence,
2412        is_batched,
2413    } = webhook_info;
2414
2415    // validate secret_ref
2416    let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2417        let db_name = &session.database();
2418        let (schema_name, secret_name) =
2419            Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
2420        let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2421        (
2422            Some(PbSecretRef {
2423                secret_id: secret_catalog.id.secret_id(),
2424                ref_as: match secret_ref.ref_as {
2425                    SecretRefAsType::Text => PbRefAsType::Text,
2426                    SecretRefAsType::File => PbRefAsType::File,
2427                }
2428                .into(),
2429            }),
2430            Some(secret_name),
2431        )
2432    } else {
2433        (None, None)
2434    };
2435
2436    let secure_compare_context = SecureCompareContext {
2437        column_name: columns_defs[0].name.real_value(),
2438        secret_name,
2439    };
2440    let mut binder = Binder::new_for_ddl(session).with_secure_compare(secure_compare_context);
2441    let expr = binder.bind_expr(&signature_expr)?;
2442
2443    // validate expr, ensuring it is SECURE_COMPARE()
2444    if expr.as_function_call().is_none()
2445        || expr.as_function_call().unwrap().func_type()
2446            != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2447    {
2448        return Err(ErrorCode::InvalidInputSyntax(
2449            "The signature verification function must be SECURE_COMPARE()".to_owned(),
2450        )
2451        .into());
2452    }
2453
2454    let pb_webhook_info = PbWebhookSourceInfo {
2455        secret_ref: pb_secret_ref,
2456        signature_expr: Some(expr.to_expr_proto()),
2457        wait_for_persistence,
2458        is_batched,
2459    };
2460
2461    Ok(pb_webhook_info)
2462}
2463
2464#[cfg(test)]
2465mod tests {
2466    use risingwave_common::catalog::{
2467        DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2468    };
2469    use risingwave_common::types::{DataType, StructType};
2470
2471    use super::*;
2472    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2473
2474    #[tokio::test]
2475    async fn test_create_table_handler() {
2476        let sql =
2477            "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2478        let frontend = LocalFrontend::new(Default::default()).await;
2479        frontend.run_sql(sql).await.unwrap();
2480
2481        let session = frontend.session_ref();
2482        let catalog_reader = session.env().catalog_reader().read_guard();
2483        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2484
2485        // Check table exists.
2486        let (table, _) = catalog_reader
2487            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2488            .unwrap();
2489        assert_eq!(table.name(), "t");
2490
2491        let columns = table
2492            .columns
2493            .iter()
2494            .map(|col| (col.name(), col.data_type().clone()))
2495            .collect::<HashMap<&str, DataType>>();
2496
2497        let expected_columns = maplit::hashmap! {
2498            ROW_ID_COLUMN_NAME => DataType::Serial,
2499            "v1" => DataType::Int16,
2500            "v2" => StructType::new(
2501                vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2502            )
2503            .with_ids([3, 4, 5].map(ColumnId::new))
2504            .into(),
2505            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2506        };
2507
2508        assert_eq!(columns, expected_columns, "{columns:#?}");
2509    }
2510
2511    #[test]
2512    fn test_bind_primary_key() {
2513        // Note: Column ID 0 is reserved for row ID column.
2514
2515        for (sql, expected) in [
2516            ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
2517            ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
2518            ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
2519            (
2520                "create table t (v1 int primary key, v2 int primary key)",
2521                Err("multiple primary keys are not allowed"),
2522            ),
2523            (
2524                "create table t (v1 int primary key primary key, v2 int)",
2525                Err("multiple primary keys are not allowed"),
2526            ),
2527            (
2528                "create table t (v1 int, v2 int, primary key (v1))",
2529                Ok(&[1]),
2530            ),
2531            (
2532                "create table t (v1 int, primary key (v2), v2 int)",
2533                Ok(&[2]),
2534            ),
2535            (
2536                "create table t (primary key (v2, v1), v1 int, v2 int)",
2537                Ok(&[2, 1]),
2538            ),
2539            (
2540                "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
2541                Err("multiple primary keys are not allowed"),
2542            ),
2543            (
2544                "create table t (v1 int primary key, primary key (v1), v2 int)",
2545                Err("multiple primary keys are not allowed"),
2546            ),
2547            (
2548                "create table t (v1 int, primary key (V3), v2 int)",
2549                Err("column \"v3\" named in key does not exist"),
2550            ),
2551        ] {
2552            let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
2553            let risingwave_sqlparser::ast::Statement::CreateTable {
2554                columns: column_defs,
2555                constraints,
2556                ..
2557            } = ast.remove(0)
2558            else {
2559                panic!("test case should be create table")
2560            };
2561            let actual: Result<_> = (|| {
2562                let mut columns = bind_sql_columns(&column_defs, false)?;
2563                let mut col_id_gen = ColumnIdGenerator::new_initial();
2564                for c in &mut columns {
2565                    col_id_gen.generate(c)?;
2566                }
2567
2568                let pk_names =
2569                    bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
2570                let (_, pk_column_ids, _) =
2571                    bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
2572                Ok(pk_column_ids)
2573            })();
2574            match (expected, actual) {
2575                (Ok(expected), Ok(actual)) => assert_eq!(
2576                    expected.iter().copied().map(ColumnId::new).collect_vec(),
2577                    actual,
2578                    "sql: {sql}"
2579                ),
2580                (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
2581                (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
2582                (Err(expected), Err(actual)) => assert!(
2583                    actual.to_string().contains(expected),
2584                    "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
2585                ),
2586            }
2587        }
2588    }
2589
2590    #[tokio::test]
2591    async fn test_duplicate_props_options() {
2592        let proto_file = create_proto_file(PROTO_FILE_DATA);
2593        let sql = format!(
2594            r#"CREATE TABLE t
2595    WITH (
2596        connector = 'kinesis',
2597        aws.region='user_test_topic',
2598        endpoint='172.10.1.1:9090,172.10.1.2:9090',
2599        aws.credentials.access_key_id = 'your_access_key_1',
2600        aws.credentials.secret_access_key = 'your_secret_key_1'
2601    )
2602    FORMAT PLAIN ENCODE PROTOBUF (
2603        message = '.test.TestRecord',
2604        aws.credentials.access_key_id = 'your_access_key_2',
2605        aws.credentials.secret_access_key = 'your_secret_key_2',
2606        schema.location = 'file://{}',
2607    )"#,
2608            proto_file.path().to_str().unwrap()
2609        );
2610        let frontend = LocalFrontend::new(Default::default()).await;
2611        frontend.run_sql(sql).await.unwrap();
2612
2613        let session = frontend.session_ref();
2614        let catalog_reader = session.env().catalog_reader().read_guard();
2615        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2616
2617        // Check source exists.
2618        let (source, _) = catalog_reader
2619            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2620            .unwrap();
2621        assert_eq!(source.name, "t");
2622
2623        // AwsAuth params exist in options.
2624        assert_eq!(
2625            source
2626                .info
2627                .format_encode_options
2628                .get("aws.credentials.access_key_id")
2629                .unwrap(),
2630            "your_access_key_2"
2631        );
2632        assert_eq!(
2633            source
2634                .info
2635                .format_encode_options
2636                .get("aws.credentials.secret_access_key")
2637                .unwrap(),
2638            "your_secret_key_2"
2639        );
2640
2641        // AwsAuth params exist in props.
2642        assert_eq!(
2643            source
2644                .with_properties
2645                .get("aws.credentials.access_key_id")
2646                .unwrap(),
2647            "your_access_key_1"
2648        );
2649        assert_eq!(
2650            source
2651                .with_properties
2652                .get("aws.credentials.secret_access_key")
2653                .unwrap(),
2654            "your_secret_key_1"
2655        );
2656
2657        // Options are not merged into props.
2658        assert!(!source.with_properties.contains_key("schema.location"));
2659    }
2660}