risingwave_frontend/handler/
create_table.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28    CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29    ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME,
30    TableId,
31};
32use risingwave_common::config::MetaBackend;
33use risingwave_common::global_jvm::Jvm;
34use risingwave_common::session_config::sink_decouple::SinkDecouple;
35use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
36use risingwave_common::util::value_encoding::DatumToProtoExt;
37use risingwave_common::{bail, bail_not_implemented};
38use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
39use risingwave_connector::source::cdc::build_cdc_table_id;
40use risingwave_connector::source::cdc::external::{
41    DATABASE_NAME_KEY, ExternalCdcTableType, ExternalTableConfig, ExternalTableImpl,
42    SCHEMA_NAME_KEY, TABLE_NAME_KEY,
43};
44use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, source};
45use risingwave_pb::catalog::connection::Info as ConnectionInfo;
46use risingwave_pb::catalog::connection_params::ConnectionType;
47use risingwave_pb::catalog::{PbSource, PbWebhookSourceInfo, WatermarkDesc};
48use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
49use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
50use risingwave_pb::plan_common::{
51    AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
52};
53use risingwave_pb::secret::PbSecretRef;
54use risingwave_pb::secret::secret_ref::PbRefAsType;
55use risingwave_pb::stream_plan::StreamFragmentGraph;
56use risingwave_sqlparser::ast::{
57    CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
58    CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
59    FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
60    Statement, TableConstraint, WebhookSourceInfo, WithProperties,
61};
62use risingwave_sqlparser::parser::{IncludeOption, Parser};
63use thiserror_ext::AsReport;
64
65use super::RwPgResponse;
66use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
67use crate::binder::{Clause, SecureCompareContext, bind_data_type};
68use crate::catalog::root_catalog::SchemaPath;
69use crate::catalog::source_catalog::SourceCatalog;
70use crate::catalog::table_catalog::TableVersion;
71use crate::catalog::{ColumnId, DatabaseId, SchemaId, SourceId, check_column_name_not_reserved};
72use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
73use crate::expr::{Expr, ExprImpl, ExprRewriter};
74use crate::handler::HandlerArgs;
75use crate::handler::create_source::{
76    UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
77    bind_source_watermark, handle_addition_columns,
78};
79use crate::handler::util::{
80    LongRunningNotificationAction, SourceSchemaCompatExt, execute_with_long_running_notification,
81};
82use crate::optimizer::plan_node::generic::{SourceNodeKind, build_cdc_scan_options_with_options};
83use crate::optimizer::plan_node::{
84    LogicalCdcScan, LogicalPlanRef, LogicalSource, StreamPlanRef as PlanRef,
85};
86use crate::optimizer::property::{Order, RequiredDist};
87use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRoot};
88use crate::session::SessionImpl;
89use crate::session::current::notice_to_user;
90use crate::stream_fragmenter::{GraphJobType, build_graph};
91use crate::utils::OverwriteOptions;
92use crate::{Binder, Explain, TableCatalog, WithOptions};
93
94mod col_id_gen;
95pub use col_id_gen::*;
96use risingwave_connector::sink::SinkParam;
97use risingwave_connector::sink::iceberg::{
98    COMPACTION_DELETE_FILES_COUNT_THRESHOLD, COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM,
99    COMPACTION_SMALL_FILES_THRESHOLD_MB, COMPACTION_TARGET_FILE_SIZE_MB,
100    COMPACTION_TRIGGER_SNAPSHOT_COUNT, COMPACTION_TYPE, COMPACTION_WRITE_PARQUET_COMPRESSION,
101    COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS, CompactionType, ENABLE_COMPACTION,
102    ENABLE_SNAPSHOT_EXPIRATION, FORMAT_VERSION, ICEBERG_WRITE_MODE_COPY_ON_WRITE,
103    ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergSink, IcebergWriteMode,
104    SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
105    SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
106    parse_partition_by_exprs,
107};
108use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
109
110use crate::handler::create_sink::{SinkPlanContext, gen_sink_plan};
111
112fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
113    for option_def in &c.options {
114        match option_def.option {
115            ColumnOption::GeneratedColumns(_) => {}
116            ColumnOption::DefaultValue(_) => {}
117            ColumnOption::DefaultValueInternal { .. } => {}
118            ColumnOption::Unique { is_primary: true } => {}
119            ColumnOption::Null => {}
120            ColumnOption::NotNull => {}
121            _ => bail_not_implemented!("column constraints \"{}\"", option_def),
122        }
123    }
124    Ok(())
125}
126
127/// Binds the column schemas declared in CREATE statement into `ColumnDesc`.
128/// If a column is marked as `primary key`, its `ColumnId` is also returned.
129/// This primary key is not combined with table constraints yet.
130pub fn bind_sql_columns(
131    column_defs: &[ColumnDef],
132    is_for_drop_table_connector: bool,
133) -> Result<Vec<ColumnCatalog>> {
134    let mut columns = Vec::with_capacity(column_defs.len());
135
136    for column in column_defs {
137        ensure_column_options_supported(column)?;
138        // Destruct to make sure all fields are properly handled rather than ignored.
139        // Do NOT use `..` to ignore fields you do not want to deal with.
140        // Reject them with a clear NotImplemented error.
141        let ColumnDef {
142            name,
143            data_type,
144            collation,
145            options,
146            ..
147        } = column;
148
149        let data_type = data_type
150            .clone()
151            .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
152        if let Some(collation) = collation {
153            // PostgreSQL will limit the datatypes that collate can work on.
154            // https://www.postgresql.org/docs/16/collation.html#COLLATION-CONCEPTS
155            //   > The built-in collatable data types are `text`, `varchar`, and `char`.
156            //
157            // But we don't support real collation, we simply ignore it here.
158            if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
159                bail_not_implemented!(
160                    "Collate collation other than `C` or `POSIX` is not implemented"
161                );
162            }
163
164            match data_type {
165                AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
166                _ => {
167                    return Err(ErrorCode::NotSupported(
168                        format!("{} is not a collatable data type", data_type),
169                        "The only built-in collatable data types are `varchar`, please check your type".into(),
170                    ).into());
171                }
172            }
173        }
174
175        if !is_for_drop_table_connector {
176            // additional column name may have prefix _rw
177            // When converting dropping the connector from table, the additional columns are converted to normal columns and keep the original name.
178            // Under this case, we loosen the check for _rw prefix.
179            check_column_name_not_reserved(&name.real_value())?;
180        }
181
182        let nullable: bool = !options
183            .iter()
184            .any(|def| matches!(def.option, ColumnOption::NotNull));
185
186        columns.push(ColumnCatalog {
187            column_desc: ColumnDesc {
188                data_type: bind_data_type(&data_type)?,
189                column_id: ColumnId::placeholder(),
190                name: name.real_value(),
191                generated_or_default_column: None,
192                description: None,
193                additional_column: AdditionalColumn { column_type: None },
194                version: ColumnDescVersion::LATEST,
195                system_column: None,
196                nullable,
197            },
198            is_hidden: false,
199        });
200    }
201
202    Ok(columns)
203}
204
205fn check_generated_column_constraints(
206    column_name: &String,
207    column_id: ColumnId,
208    expr: &ExprImpl,
209    column_catalogs: &[ColumnCatalog],
210    generated_column_names: &[String],
211    pk_column_ids: &[ColumnId],
212) -> Result<()> {
213    let input_refs = expr.collect_input_refs(column_catalogs.len());
214    for idx in input_refs.ones() {
215        let referred_generated_column = &column_catalogs[idx].column_desc.name;
216        if generated_column_names
217            .iter()
218            .any(|c| c == referred_generated_column)
219        {
220            return Err(ErrorCode::BindError(format!(
221                "Generated can not reference another generated column. \
222                But here generated column \"{}\" referenced another generated column \"{}\"",
223                column_name, referred_generated_column
224            ))
225            .into());
226        }
227    }
228
229    if pk_column_ids.contains(&column_id) && expr.is_impure() {
230        return Err(ErrorCode::BindError(format!(
231            "Generated columns with impure expressions should not be part of the primary key. \
232            Here column \"{}\" is defined as part of the primary key.",
233            column_name
234        ))
235        .into());
236    }
237
238    Ok(())
239}
240
241/// Binds constraints that can be only specified in column definitions,
242/// currently generated columns and default columns.
243pub fn bind_sql_column_constraints(
244    session: &SessionImpl,
245    table_name: String,
246    column_catalogs: &mut [ColumnCatalog],
247    columns: &[ColumnDef],
248    pk_column_ids: &[ColumnId],
249) -> Result<()> {
250    let generated_column_names = {
251        let mut names = vec![];
252        for column in columns {
253            for option_def in &column.options {
254                if let ColumnOption::GeneratedColumns(_) = option_def.option {
255                    names.push(column.name.real_value());
256                    break;
257                }
258            }
259        }
260        names
261    };
262
263    let mut binder = Binder::new_for_ddl(session);
264    binder.bind_columns_to_context(table_name, column_catalogs)?;
265
266    for column in columns {
267        let Some(idx) = column_catalogs
268            .iter()
269            .position(|c| c.name() == column.name.real_value())
270        else {
271            // It's possible that we don't follow the user defined columns in SQL but take the
272            // ones resolved from the source, thus missing some columns. Simply ignore them.
273            continue;
274        };
275
276        for option_def in &column.options {
277            match &option_def.option {
278                ColumnOption::GeneratedColumns(expr) => {
279                    binder.set_clause(Some(Clause::GeneratedColumn));
280
281                    let expr_impl = binder.bind_expr(expr).with_context(|| {
282                        format!(
283                            "fail to bind expression in generated column \"{}\"",
284                            column.name.real_value()
285                        )
286                    })?;
287
288                    check_generated_column_constraints(
289                        &column.name.real_value(),
290                        column_catalogs[idx].column_id(),
291                        &expr_impl,
292                        column_catalogs,
293                        &generated_column_names,
294                        pk_column_ids,
295                    )?;
296
297                    column_catalogs[idx].column_desc.generated_or_default_column = Some(
298                        GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
299                            expr: Some(expr_impl.to_expr_proto()),
300                        }),
301                    );
302                    binder.set_clause(None);
303                }
304                ColumnOption::DefaultValue(expr) => {
305                    let expr_impl = binder
306                        .bind_expr(expr)?
307                        .cast_assign(column_catalogs[idx].data_type())?;
308
309                    // Rewrite expressions to evaluate a snapshot value, used for missing values in the case of
310                    // schema change.
311                    //
312                    // TODO: Currently we don't support impure expressions other than `now()` (like `random()`),
313                    // so the rewritten expression should almost always be pure and we directly call `fold_const`
314                    // here. Actually we do not require purity of the expression here since we're only to get a
315                    // snapshot value.
316                    let rewritten_expr_impl = session
317                        .pinned_snapshot()
318                        .inline_now_proc_time()
319                        .rewrite_expr(expr_impl.clone());
320
321                    if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
322                        let snapshot_value = snapshot_value?;
323
324                        column_catalogs[idx].column_desc.generated_or_default_column =
325                            Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
326                                snapshot_value: Some(snapshot_value.to_protobuf()),
327                                expr: Some(expr_impl.to_expr_proto()),
328                                // persist the original expression
329                            }));
330                    } else {
331                        return Err(ErrorCode::BindError(format!(
332                            "Default expression used in column `{}` cannot be evaluated. \
333                            Use generated columns instead if you mean to reference other columns.",
334                            column.name
335                        ))
336                        .into());
337                    }
338                }
339                ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
340                    // When a `DEFAULT INTERNAL` is used internally for schema change, the persisted value
341                    // should already be set during purifcation. So if we encounter an empty value here, it
342                    // means the user has specified it explicitly in the SQL statement, typically by
343                    // directly copying the result of `SHOW CREATE TABLE` and executing it.
344                    if persisted.is_empty() {
345                        bail_bind_error!(
346                            "DEFAULT INTERNAL is only used for internal purposes, \
347                             please specify a concrete default value"
348                        );
349                    }
350
351                    let desc = DefaultColumnDesc::decode(&**persisted)
352                        .expect("failed to decode persisted `DefaultColumnDesc`");
353
354                    column_catalogs[idx].column_desc.generated_or_default_column =
355                        Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
356                }
357                _ => {}
358            }
359        }
360    }
361    Ok(())
362}
363
364/// Currently we only support Primary key table constraint, so just return pk names if it exists
365pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
366    let mut pk_column_names = vec![];
367
368    for constraint in table_constraints {
369        match constraint {
370            TableConstraint::Unique {
371                name: _,
372                columns,
373                is_primary: true,
374            } => {
375                if !pk_column_names.is_empty() {
376                    return Err(multiple_pk_definition_err());
377                }
378                pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
379            }
380            _ => bail_not_implemented!("table constraint \"{}\"", constraint),
381        }
382    }
383    Ok(pk_column_names)
384}
385
386pub fn bind_sql_pk_names(
387    columns_defs: &[ColumnDef],
388    pk_names_from_table_constraints: Vec<String>,
389) -> Result<Vec<String>> {
390    let mut pk_column_names = pk_names_from_table_constraints;
391
392    for column in columns_defs {
393        for option_def in &column.options {
394            if let ColumnOption::Unique { is_primary: true } = option_def.option {
395                if !pk_column_names.is_empty() {
396                    return Err(multiple_pk_definition_err());
397                }
398                pk_column_names.push(column.name.real_value());
399            };
400        }
401    }
402
403    Ok(pk_column_names)
404}
405
406fn multiple_pk_definition_err() -> RwError {
407    ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
408}
409
410/// Binds primary keys defined in SQL.
411///
412/// It returns the columns together with `pk_column_ids`, and an optional row id column index if
413/// added.
414pub fn bind_pk_and_row_id_on_relation(
415    mut columns: Vec<ColumnCatalog>,
416    pk_names: Vec<String>,
417    must_need_pk: bool,
418) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
419    for c in &columns {
420        assert!(c.column_id() != ColumnId::placeholder());
421    }
422
423    // Mapping from column name to column id.
424    let name_to_id = columns
425        .iter()
426        .map(|c| (c.name(), c.column_id()))
427        .collect::<HashMap<_, _>>();
428
429    let mut pk_column_ids: Vec<_> = pk_names
430        .iter()
431        .map(|name| {
432            name_to_id.get(name.as_str()).copied().ok_or_else(|| {
433                ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
434            })
435        })
436        .try_collect()?;
437
438    // Add `_row_id` column if `pk_column_ids` is empty and must_need_pk
439    let need_row_id = pk_column_ids.is_empty() && must_need_pk;
440
441    let row_id_index = need_row_id.then(|| {
442        let column = ColumnCatalog::row_id_column();
443        let index = columns.len();
444        pk_column_ids = vec![column.column_id()];
445        columns.push(column);
446        index
447    });
448
449    if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
450        Err(ErrorCode::InvalidInputSyntax(format!(
451            "column \"{col}\" specified more than once"
452        )))?;
453    }
454
455    Ok((columns, pk_column_ids, row_id_index))
456}
457
458/// `gen_create_table_plan_with_source` generates the plan for creating a table with an external
459/// stream source.
460#[allow(clippy::too_many_arguments)]
461pub(crate) async fn gen_create_table_plan_with_source(
462    mut handler_args: HandlerArgs,
463    explain_options: ExplainOptions,
464    table_name: ObjectName,
465    column_defs: Vec<ColumnDef>,
466    wildcard_idx: Option<usize>,
467    constraints: Vec<TableConstraint>,
468    format_encode: FormatEncodeOptions,
469    source_watermarks: Vec<SourceWatermark>,
470    mut col_id_gen: ColumnIdGenerator,
471    include_column_options: IncludeOption,
472    props: CreateTableProps,
473    sql_column_strategy: SqlColumnStrategy,
474) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog)> {
475    if props.append_only
476        && format_encode.format != Format::Plain
477        && format_encode.format != Format::Native
478    {
479        return Err(ErrorCode::BindError(format!(
480            "Append only table does not support format {}.",
481            format_encode.format
482        ))
483        .into());
484    }
485
486    let session = &handler_args.session;
487    let (with_properties, refresh_mode) =
488        bind_connector_props(&handler_args, &format_encode, false)?;
489    if with_properties.is_shareable_cdc_connector() {
490        generated_columns_check_for_cdc_table(&column_defs)?;
491        not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
492    } else if column_defs.iter().any(|col| {
493        col.options
494            .iter()
495            .any(|def| matches!(def.option, ColumnOption::NotNull))
496    }) {
497        // if non-cdc source
498        notice_to_user(
499            "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
500        );
501    }
502
503    let db_name: &str = &session.database();
504    let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
505
506    // TODO: omit this step if `sql_column_strategy` is `Follow`.
507    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
508        session,
509        &format_encode,
510        Either::Left(&with_properties),
511        CreateSourceType::Table,
512    )
513    .await?;
514
515    let overwrite_options = OverwriteOptions::new(&mut handler_args);
516    let rate_limit = overwrite_options.source_rate_limit;
517    let source = bind_create_source_or_table_with_connector(
518        handler_args.clone(),
519        table_name,
520        format_encode,
521        with_properties,
522        &column_defs,
523        constraints,
524        wildcard_idx,
525        source_watermarks,
526        columns_from_resolve_source,
527        source_info,
528        include_column_options,
529        &mut col_id_gen,
530        CreateSourceType::Table,
531        rate_limit,
532        sql_column_strategy,
533        refresh_mode,
534    )
535    .await?;
536
537    let context = OptimizerContext::new(handler_args, explain_options);
538
539    let (plan, table) = gen_table_plan_with_source(
540        context.into(),
541        schema_name,
542        source.clone(),
543        col_id_gen.into_version(),
544        props,
545    )?;
546
547    Ok((plan, Some(source), table))
548}
549
550/// `gen_create_table_plan` generates the plan for creating a table without an external stream
551/// source.
552#[allow(clippy::too_many_arguments)]
553pub(crate) fn gen_create_table_plan(
554    context: OptimizerContext,
555    table_name: ObjectName,
556    column_defs: Vec<ColumnDef>,
557    constraints: Vec<TableConstraint>,
558    mut col_id_gen: ColumnIdGenerator,
559    source_watermarks: Vec<SourceWatermark>,
560    props: CreateTableProps,
561    is_for_replace_plan: bool,
562) -> Result<(PlanRef, TableCatalog)> {
563    let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
564    for c in &mut columns {
565        col_id_gen.generate(c)?;
566    }
567
568    let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
569    if !secret_refs.is_empty() || !connection_refs.is_empty() {
570        return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
571    }
572
573    gen_create_table_plan_without_source(
574        context,
575        table_name,
576        columns,
577        column_defs,
578        constraints,
579        source_watermarks,
580        col_id_gen.into_version(),
581        props,
582    )
583}
584
585#[allow(clippy::too_many_arguments)]
586pub(crate) fn gen_create_table_plan_without_source(
587    context: OptimizerContext,
588    table_name: ObjectName,
589    columns: Vec<ColumnCatalog>,
590    column_defs: Vec<ColumnDef>,
591    constraints: Vec<TableConstraint>,
592    source_watermarks: Vec<SourceWatermark>,
593    version: TableVersion,
594    props: CreateTableProps,
595) -> Result<(PlanRef, TableCatalog)> {
596    // XXX: Why not bind outside?
597    let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
598    let (mut columns, pk_column_ids, row_id_index) =
599        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
600
601    let watermark_descs = bind_source_watermark(
602        context.session_ctx(),
603        table_name.real_value(),
604        source_watermarks,
605        &columns,
606    )?;
607
608    bind_sql_column_constraints(
609        context.session_ctx(),
610        table_name.real_value(),
611        &mut columns,
612        &column_defs,
613        &pk_column_ids,
614    )?;
615    let session = context.session_ctx().clone();
616
617    let db_name = &session.database();
618    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
619
620    let info = CreateTableInfo {
621        columns,
622        pk_column_ids,
623        row_id_index,
624        watermark_descs,
625        source_catalog: None,
626        version,
627    };
628
629    gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
630}
631
632fn gen_table_plan_with_source(
633    context: OptimizerContextRef,
634    schema_name: Option<String>,
635    source_catalog: SourceCatalog,
636    version: TableVersion,
637    props: CreateTableProps,
638) -> Result<(PlanRef, TableCatalog)> {
639    let table_name = source_catalog.name.clone();
640
641    let info = CreateTableInfo {
642        columns: source_catalog.columns.clone(),
643        pk_column_ids: source_catalog.pk_col_ids.clone(),
644        row_id_index: source_catalog.row_id_index,
645        watermark_descs: source_catalog.watermark_descs.clone(),
646        source_catalog: Some(source_catalog),
647        version,
648    };
649
650    gen_table_plan_inner(context, schema_name, table_name, info, props)
651}
652
653/// On-conflict behavior either from user input or existing table catalog.
654#[derive(Clone, Copy)]
655pub enum EitherOnConflict {
656    Ast(Option<OnConflict>),
657    Resolved(ConflictBehavior),
658}
659
660impl From<Option<OnConflict>> for EitherOnConflict {
661    fn from(v: Option<OnConflict>) -> Self {
662        Self::Ast(v)
663    }
664}
665
666impl From<ConflictBehavior> for EitherOnConflict {
667    fn from(v: ConflictBehavior) -> Self {
668        Self::Resolved(v)
669    }
670}
671
672impl EitherOnConflict {
673    /// Resolves the conflict behavior based on the given information.
674    pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
675        let conflict_behavior = match self {
676            EitherOnConflict::Ast(on_conflict) => {
677                if append_only {
678                    if row_id_as_pk {
679                        // Primary key will be generated, no conflict check needed.
680                        ConflictBehavior::NoCheck
681                    } else {
682                        // User defined PK on append-only table, enforce `DO NOTHING`.
683                        if let Some(on_conflict) = on_conflict
684                            && on_conflict != OnConflict::Nothing
685                        {
686                            return Err(ErrorCode::InvalidInputSyntax(
687                                "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
688                                     the ON CONFLICT behavior must be DO NOTHING."
689                                    .to_owned(),
690                            )
691                            .into());
692                        }
693                        ConflictBehavior::IgnoreConflict
694                    }
695                } else {
696                    // Default to `UPDATE FULL` for non-append-only tables.
697                    match on_conflict.unwrap_or(OnConflict::UpdateFull) {
698                        OnConflict::UpdateFull => ConflictBehavior::Overwrite,
699                        OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
700                        OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
701                    }
702                }
703            }
704            EitherOnConflict::Resolved(b) => b,
705        };
706
707        Ok(conflict_behavior)
708    }
709}
710
711/// Arguments of the functions that generate a table plan, part 1.
712///
713/// Compared to [`CreateTableProps`], this struct contains fields that need some work of binding
714/// or resolving based on the user input.
715pub struct CreateTableInfo {
716    pub columns: Vec<ColumnCatalog>,
717    pub pk_column_ids: Vec<ColumnId>,
718    pub row_id_index: Option<usize>,
719    pub watermark_descs: Vec<WatermarkDesc>,
720    pub source_catalog: Option<SourceCatalog>,
721    pub version: TableVersion,
722}
723
724/// Arguments of the functions that generate a table plan, part 2.
725///
726/// Compared to [`CreateTableInfo`], this struct contains fields that can be (relatively) simply
727/// obtained from the input or the context.
728pub struct CreateTableProps {
729    pub definition: String,
730    pub append_only: bool,
731    pub on_conflict: EitherOnConflict,
732    pub with_version_columns: Vec<String>,
733    pub webhook_info: Option<PbWebhookSourceInfo>,
734    pub engine: Engine,
735}
736
737#[allow(clippy::too_many_arguments)]
738fn gen_table_plan_inner(
739    context: OptimizerContextRef,
740    schema_name: Option<String>,
741    table_name: String,
742    info: CreateTableInfo,
743    props: CreateTableProps,
744) -> Result<(PlanRef, TableCatalog)> {
745    let CreateTableInfo {
746        ref columns,
747        row_id_index,
748        ref watermark_descs,
749        ref source_catalog,
750        ..
751    } = info;
752    let CreateTableProps { append_only, .. } = props;
753
754    let (database_id, schema_id) = context
755        .session_ctx()
756        .get_database_and_schema_id_for_create(schema_name)?;
757
758    let session = context.session_ctx().clone();
759    let retention_seconds = context.with_options().retention_seconds();
760
761    let source_node: LogicalPlanRef = LogicalSource::new(
762        source_catalog.clone().map(Rc::new),
763        columns.clone(),
764        row_id_index,
765        SourceNodeKind::CreateTable,
766        context.clone(),
767        None,
768    )?
769    .into();
770
771    let required_cols = FixedBitSet::with_capacity(columns.len());
772    let plan_root = PlanRoot::new_with_logical_plan(
773        source_node,
774        RequiredDist::Any,
775        Order::any(),
776        required_cols,
777        vec![],
778    );
779
780    let has_non_ttl_watermark = watermark_descs.iter().any(|d| !d.with_ttl);
781
782    if !append_only && has_non_ttl_watermark {
783        return Err(ErrorCode::NotSupported(
784            "Defining watermarks on table requires the table to be append only.".to_owned(),
785            "Use the key words `APPEND ONLY`".to_owned(),
786        )
787        .into());
788    }
789
790    if !append_only && retention_seconds.is_some() {
791        if session
792            .config()
793            .unsafe_enable_storage_retention_for_non_append_only_tables()
794        {
795            tracing::warn!(
796                "Storage retention is enabled for non-append-only table {}. This may lead to stream inconsistency.",
797                table_name
798            );
799            const NOTICE: &str = "Storage retention is enabled for non-append-only table. \
800                                  This may lead to stream inconsistency and unrecoverable \
801                                  node failure if there is any row INSERT/UPDATE/DELETE operation \
802                                  corresponding to the TTLed primary keys";
803            session.notice_to_user(NOTICE);
804        } else {
805            return Err(ErrorCode::NotSupported(
806                "Defining retention seconds on table requires the table to be append only."
807                    .to_owned(),
808                "Use the key words `APPEND ONLY`".to_owned(),
809            )
810            .into());
811        }
812    }
813
814    let materialize =
815        plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
816
817    let mut table = materialize.table().clone();
818    table.owner = session.user_id();
819
820    Ok((materialize.into(), table))
821}
822
823/// Generate stream plan for cdc table based on shared source.
824/// In replace workflow, the `table_id` is the id of the table to be replaced
825/// in create table workflow, the `table_id` is a placeholder will be filled in the Meta
826#[allow(clippy::too_many_arguments)]
827pub(crate) fn gen_create_table_plan_for_cdc_table(
828    context: OptimizerContextRef,
829    source: Arc<SourceCatalog>,
830    external_table_name: String,
831    column_defs: Vec<ColumnDef>,
832    mut columns: Vec<ColumnCatalog>,
833    pk_names: Vec<String>,
834    cdc_with_options: WithOptionsSecResolved,
835    mut col_id_gen: ColumnIdGenerator,
836    on_conflict: Option<OnConflict>,
837    with_version_columns: Vec<String>,
838    include_column_options: IncludeOption,
839    table_name: ObjectName,
840    resolved_table_name: String, // table name without schema prefix
841    database_id: DatabaseId,
842    schema_id: SchemaId,
843    table_id: TableId,
844    engine: Engine,
845) -> Result<(PlanRef, TableCatalog)> {
846    let session = context.session_ctx().clone();
847
848    // append additional columns to the end
849    handle_addition_columns(
850        None,
851        &cdc_with_options,
852        include_column_options,
853        &mut columns,
854        true,
855    )?;
856
857    for c in &mut columns {
858        col_id_gen.generate(c)?;
859    }
860
861    let (mut columns, pk_column_ids, _row_id_index) =
862        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
863
864    // NOTES: In auto schema change, default value is not provided in column definition.
865    bind_sql_column_constraints(
866        context.session_ctx(),
867        table_name.real_value(),
868        &mut columns,
869        &column_defs,
870        &pk_column_ids,
871    )?;
872
873    let definition = context.normalized_sql().to_owned();
874
875    let pk_column_indices = {
876        let mut id_to_idx = HashMap::new();
877        columns.iter().enumerate().for_each(|(idx, c)| {
878            id_to_idx.insert(c.column_id(), idx);
879        });
880        // pk column id must exist in table columns.
881        pk_column_ids
882            .iter()
883            .map(|c| id_to_idx.get(c).copied().unwrap())
884            .collect_vec()
885    };
886    let table_pk = pk_column_indices
887        .iter()
888        .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
889        .collect();
890
891    let (options, secret_refs) = cdc_with_options.into_parts();
892
893    let non_generated_column_descs = columns
894        .iter()
895        .filter(|&c| !c.is_generated())
896        .map(|c| c.column_desc.clone())
897        .collect_vec();
898    let non_generated_column_num = non_generated_column_descs.len();
899    let cdc_table_type = ExternalCdcTableType::from_properties(&options);
900    let cdc_table_desc = CdcTableDesc {
901        table_id,
902        source_id: source.id, // id of cdc source streaming job
903        external_table_name: external_table_name.clone(),
904        pk: table_pk,
905        columns: non_generated_column_descs,
906        stream_key: pk_column_indices,
907        connect_properties: options,
908        secret_refs,
909    };
910
911    tracing::debug!(?cdc_table_desc, "create cdc table");
912    let options = build_cdc_scan_options_with_options(context.with_options(), &cdc_table_type)?;
913
914    let logical_scan = LogicalCdcScan::create(
915        external_table_name.clone(),
916        Rc::new(cdc_table_desc),
917        context.clone(),
918        options,
919    );
920
921    let scan_node: LogicalPlanRef = logical_scan.into();
922    let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
923    let plan_root = PlanRoot::new_with_logical_plan(
924        scan_node,
925        RequiredDist::Any,
926        Order::any(),
927        required_cols,
928        vec![],
929    );
930
931    let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
932    let materialize = plan_root.gen_table_plan(
933        context,
934        resolved_table_name,
935        database_id,
936        schema_id,
937        CreateTableInfo {
938            columns,
939            pk_column_ids,
940            row_id_index: None,
941            watermark_descs: vec![],
942            source_catalog: Some((*source).clone()),
943            version: col_id_gen.into_version(),
944        },
945        CreateTableProps {
946            definition,
947            append_only: false,
948            on_conflict: on_conflict.into(),
949            with_version_columns,
950            webhook_info: None,
951            engine,
952        },
953    )?;
954
955    let mut table = materialize.table().clone();
956    table.owner = session.user_id();
957    table.cdc_table_id = Some(cdc_table_id);
958    table.cdc_table_type = Some(cdc_table_type);
959    Ok((materialize.into(), table))
960}
961
962/// Derive connector properties and normalize `external_table_name` for CDC tables.
963///
964/// Returns (`connector_properties`, `normalized_external_table_name`) where:
965/// - For SQL Server: Normalizes 'db.schema.table' (3 parts) to 'schema.table' (2 parts),
966///   because users can optionally include database name for verification, but it needs to be
967///   stripped to match the format returned by Debezium's `extract_table_name()`.
968/// - For MySQL/Postgres: Returns the original `external_table_name` unchanged.
969fn derive_with_options_for_cdc_table(
970    source_with_properties: &WithOptionsSecResolved,
971    external_table_name: String,
972) -> Result<(WithOptionsSecResolved, String)> {
973    use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
974    // we should remove the prefix from `full_table_name`
975    let source_database_name: &str = source_with_properties
976        .get("database.name")
977        .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
978        .as_str();
979    let mut with_options = source_with_properties.clone();
980    if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
981        match connector.as_str() {
982            MYSQL_CDC_CONNECTOR => {
983                // MySQL doesn't allow '.' in database name and table name, so we can split the
984                // external table name by '.' to get the table name
985                let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
986                    anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
987                })?;
988                // We allow multiple database names in the source definition
989                if !source_database_name
990                    .split(',')
991                    .map(|s| s.trim())
992                    .any(|name| name == db_name)
993                {
994                    return Err(anyhow!(
995                        "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
996                        db_name,
997                        source_database_name
998                    ).into());
999                }
1000                with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
1001                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1002                // Return original external_table_name unchanged for MySQL
1003                return Ok((with_options, external_table_name));
1004            }
1005            POSTGRES_CDC_CONNECTOR => {
1006                let (schema_name, table_name) = external_table_name
1007                    .split_once('.')
1008                    .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
1009
1010                // insert 'schema.name' into connect properties
1011                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1012                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1013                // Return original external_table_name unchanged for Postgres
1014                return Ok((with_options, external_table_name));
1015            }
1016            SQL_SERVER_CDC_CONNECTOR => {
1017                // SQL Server external table name must be in one of two formats:
1018                // 1. 'schemaName.tableName' (2 parts) - database is already specified in source
1019                // 2. 'databaseName.schemaName.tableName' (3 parts) - for explicit verification
1020                //
1021                // We do NOT allow single table name (e.g., 't') because:
1022                // - Unlike database name (already in source), schema name is NOT pre-specified
1023                // - User must explicitly provide schema (even if it's 'dbo')
1024                let parts: Vec<&str> = external_table_name.split('.').collect();
1025                let (schema_name, table_name) = match parts.len() {
1026                    3 => {
1027                        // Format: database.schema.table
1028                        // Verify that the database name matches the one in source definition
1029                        let db_name = parts[0];
1030                        let schema_name = parts[1];
1031                        let table_name = parts[2];
1032
1033                        if db_name != source_database_name {
1034                            return Err(anyhow!(
1035                                "The database name '{}' in FROM clause does not match the database name '{}' specified in source definition. \
1036                                 You can either use 'schema.table' format (recommended) or ensure the database name matches.",
1037                                db_name,
1038                                source_database_name
1039                            ).into());
1040                        }
1041                        (schema_name, table_name)
1042                    }
1043                    2 => {
1044                        // Format: schema.table (recommended)
1045                        // Database name is taken from source definition
1046                        let schema_name = parts[0];
1047                        let table_name = parts[1];
1048                        (schema_name, table_name)
1049                    }
1050                    1 => {
1051                        // Format: table only
1052                        // Reject with clear error message
1053                        return Err(anyhow!(
1054                            "Invalid table name format '{}'. For SQL Server CDC, you must specify the schema name. \
1055                             Use 'schema.table' format (e.g., 'dbo.{}') or 'database.schema.table' format (e.g., '{}.dbo.{}').",
1056                            external_table_name,
1057                            external_table_name,
1058                            source_database_name,
1059                            external_table_name
1060                        ).into());
1061                    }
1062                    _ => {
1063                        // Invalid format (4+ parts or empty)
1064                        return Err(anyhow!(
1065                            "Invalid table name format '{}'. Expected 'schema.table' or 'database.schema.table'.",
1066                            external_table_name
1067                        ).into());
1068                    }
1069                };
1070
1071                // Insert schema and table names into connector properties
1072                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1073                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1074
1075                // Normalize external_table_name to 'schema.table' format
1076                // This ensures consistency with extract_table_name() in message.rs
1077                let normalized_external_table_name = format!("{}.{}", schema_name, table_name);
1078                return Ok((with_options, normalized_external_table_name));
1079            }
1080            _ => {
1081                return Err(RwError::from(anyhow!(
1082                    "connector {} is not supported for cdc table",
1083                    connector
1084                )));
1085            }
1086        };
1087    }
1088    unreachable!("All valid CDC connectors should have returned by now")
1089}
1090
1091#[allow(clippy::too_many_arguments)]
1092pub(super) async fn handle_create_table_plan(
1093    handler_args: HandlerArgs,
1094    explain_options: ExplainOptions,
1095    format_encode: Option<FormatEncodeOptions>,
1096    cdc_table_info: Option<CdcTableInfo>,
1097    table_name: &ObjectName,
1098    column_defs: Vec<ColumnDef>,
1099    wildcard_idx: Option<usize>,
1100    constraints: Vec<TableConstraint>,
1101    source_watermarks: Vec<SourceWatermark>,
1102    append_only: bool,
1103    on_conflict: Option<OnConflict>,
1104    with_version_columns: Vec<String>,
1105    include_column_options: IncludeOption,
1106    webhook_info: Option<WebhookSourceInfo>,
1107    engine: Engine,
1108) -> Result<(
1109    PlanRef,
1110    Option<SourceCatalog>,
1111    TableCatalog,
1112    TableJobType,
1113    Option<SourceId>,
1114)> {
1115    let col_id_gen = ColumnIdGenerator::new_initial();
1116    let format_encode = check_create_table_with_source(
1117        &handler_args.with_options,
1118        format_encode,
1119        &include_column_options,
1120        &cdc_table_info,
1121    )?;
1122    let webhook_info = webhook_info
1123        .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1124        .transpose()?;
1125
1126    let props = CreateTableProps {
1127        definition: handler_args.normalized_sql.clone(),
1128        append_only,
1129        on_conflict: on_conflict.into(),
1130        with_version_columns: with_version_columns.clone(),
1131        webhook_info,
1132        engine,
1133    };
1134
1135    let ((plan, source, table), job_type, shared_shource_id) = match (
1136        format_encode,
1137        cdc_table_info.as_ref(),
1138    ) {
1139        (Some(format_encode), None) => (
1140            gen_create_table_plan_with_source(
1141                handler_args,
1142                explain_options,
1143                table_name.clone(),
1144                column_defs,
1145                wildcard_idx,
1146                constraints,
1147                format_encode,
1148                source_watermarks,
1149                col_id_gen,
1150                include_column_options,
1151                props,
1152                SqlColumnStrategy::FollowChecked,
1153            )
1154            .await?,
1155            TableJobType::General,
1156            None,
1157        ),
1158        (None, None) => {
1159            let context = OptimizerContext::new(handler_args, explain_options);
1160            let (plan, table) = gen_create_table_plan(
1161                context,
1162                table_name.clone(),
1163                column_defs,
1164                constraints,
1165                col_id_gen,
1166                source_watermarks,
1167                props,
1168                false,
1169            )?;
1170
1171            ((plan, None, table), TableJobType::General, None)
1172        }
1173
1174        (None, Some(cdc_table)) => {
1175            sanity_check_for_table_on_cdc_source(
1176                append_only,
1177                &column_defs,
1178                &wildcard_idx,
1179                &constraints,
1180                &source_watermarks,
1181            )?;
1182
1183            generated_columns_check_for_cdc_table(&column_defs)?;
1184            not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1185
1186            let session = &handler_args.session;
1187            let db_name = &session.database();
1188            let user_name = &session.user_name();
1189            let search_path = session.config().search_path();
1190            let (schema_name, resolved_table_name) =
1191                Binder::resolve_schema_qualified_name(db_name, table_name)?;
1192            let (database_id, schema_id) =
1193                session.get_database_and_schema_id_for_create(schema_name.clone())?;
1194
1195            // cdc table cannot be append-only
1196            let (source_schema, source_name) =
1197                Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
1198
1199            let source = {
1200                let catalog_reader = session.env().catalog_reader().read_guard();
1201                let schema_path =
1202                    SchemaPath::new(source_schema.as_deref(), &search_path, user_name);
1203
1204                let (source, _) = catalog_reader.get_source_by_name(
1205                    db_name,
1206                    schema_path,
1207                    source_name.as_str(),
1208                )?;
1209                source.clone()
1210            };
1211            let (cdc_with_options, normalized_external_table_name) =
1212                derive_with_options_for_cdc_table(
1213                    &source.with_properties,
1214                    cdc_table.external_table_name.clone(),
1215                )?;
1216
1217            let (columns, pk_names) = match wildcard_idx {
1218                Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1219                None => {
1220                    for column_def in &column_defs {
1221                        for option_def in &column_def.options {
1222                            if let ColumnOption::DefaultValue(_)
1223                            | ColumnOption::DefaultValueInternal { .. } = option_def.option
1224                            {
1225                                return Err(ErrorCode::NotSupported(
1226                                            "Default value for columns defined on the table created from a CDC source".into(),
1227                                            "Remove the default value expression in the column definitions".into(),
1228                                        )
1229                                            .into());
1230                            }
1231                        }
1232                    }
1233
1234                    let (columns, pk_names) =
1235                        bind_cdc_table_schema(&column_defs, &constraints, false)?;
1236                    // read default value definition from external db
1237                    let (options, secret_refs) = cdc_with_options.clone().into_parts();
1238                    let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1239                        .context("failed to extract external table config")?;
1240
1241                    (columns, pk_names)
1242                }
1243            };
1244
1245            let context: OptimizerContextRef =
1246                OptimizerContext::new(handler_args, explain_options).into();
1247            let shared_source_id = source.id;
1248            let (plan, table) = gen_create_table_plan_for_cdc_table(
1249                context,
1250                source,
1251                normalized_external_table_name,
1252                column_defs,
1253                columns,
1254                pk_names,
1255                cdc_with_options,
1256                col_id_gen,
1257                on_conflict,
1258                with_version_columns,
1259                include_column_options,
1260                table_name.clone(),
1261                resolved_table_name,
1262                database_id,
1263                schema_id,
1264                TableId::placeholder(),
1265                engine,
1266            )?;
1267
1268            (
1269                (plan, None, table),
1270                TableJobType::SharedCdcSource,
1271                Some(shared_source_id),
1272            )
1273        }
1274        (Some(_), Some(_)) => {
1275            return Err(ErrorCode::NotSupported(
1276                "Data format and encoding format doesn't apply to table created from a CDC source"
1277                    .into(),
1278                "Remove the FORMAT and ENCODE specification".into(),
1279            )
1280            .into());
1281        }
1282    };
1283    Ok((plan, source, table, job_type, shared_shource_id))
1284}
1285
1286// For both table from cdc source and table with cdc connector
1287fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1288    let mut found_generated_column = false;
1289    for column in columns {
1290        let mut is_generated = false;
1291
1292        for option_def in &column.options {
1293            if let ColumnOption::GeneratedColumns(_) = option_def.option {
1294                is_generated = true;
1295                break;
1296            }
1297        }
1298
1299        if is_generated {
1300            found_generated_column = true;
1301        } else if found_generated_column {
1302            return Err(ErrorCode::NotSupported(
1303                "Non-generated column found after a generated column.".into(),
1304                "Ensure that all generated columns appear at the end of the cdc table definition."
1305                    .into(),
1306            )
1307            .into());
1308        }
1309    }
1310    Ok(())
1311}
1312
1313// For both table from cdc source and table with cdc connector
1314fn not_null_check_for_cdc_table(
1315    wildcard_idx: &Option<usize>,
1316    column_defs: &Vec<ColumnDef>,
1317) -> Result<()> {
1318    if !wildcard_idx.is_some()
1319        && column_defs.iter().any(|col| {
1320            col.options
1321                .iter()
1322                .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1323        })
1324    {
1325        return Err(ErrorCode::NotSupported(
1326            "CDC table with NOT NULL constraint is not supported".to_owned(),
1327            "Please remove the NOT NULL constraint for columns".to_owned(),
1328        )
1329        .into());
1330    }
1331    Ok(())
1332}
1333
1334// Only for table from cdc source
1335fn sanity_check_for_table_on_cdc_source(
1336    append_only: bool,
1337    column_defs: &Vec<ColumnDef>,
1338    wildcard_idx: &Option<usize>,
1339    constraints: &Vec<TableConstraint>,
1340    source_watermarks: &Vec<SourceWatermark>,
1341) -> Result<()> {
1342    // wildcard cannot be used with column definitions
1343    if wildcard_idx.is_some() && !column_defs.is_empty() {
1344        return Err(ErrorCode::NotSupported(
1345            "wildcard(*) and column definitions cannot be used together".to_owned(),
1346            "Remove the wildcard or column definitions".to_owned(),
1347        )
1348        .into());
1349    }
1350
1351    // cdc table must have primary key constraint or primary key column
1352    if !wildcard_idx.is_some()
1353        && !constraints.iter().any(|c| {
1354            matches!(
1355                c,
1356                TableConstraint::Unique {
1357                    is_primary: true,
1358                    ..
1359                }
1360            )
1361        })
1362        && !column_defs.iter().any(|col| {
1363            col.options
1364                .iter()
1365                .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1366        })
1367    {
1368        return Err(ErrorCode::NotSupported(
1369            "CDC table without primary key constraint is not supported".to_owned(),
1370            "Please define a primary key".to_owned(),
1371        )
1372        .into());
1373    }
1374
1375    if append_only {
1376        return Err(ErrorCode::NotSupported(
1377            "append only modifier on the table created from a CDC source".into(),
1378            "Remove the APPEND ONLY clause".into(),
1379        )
1380        .into());
1381    }
1382
1383    if !source_watermarks.is_empty() {
1384        return Err(ErrorCode::NotSupported(
1385            "watermark defined on the table created from a CDC source".into(),
1386            "Remove the Watermark definitions".into(),
1387        )
1388        .into());
1389    }
1390
1391    Ok(())
1392}
1393
1394/// Derive schema for cdc table when create a new Table or alter an existing Table
1395async fn bind_cdc_table_schema_externally(
1396    cdc_with_options: WithOptionsSecResolved,
1397) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1398    // read cdc table schema from external db or parsing the schema from SQL definitions
1399    let (options, secret_refs) = cdc_with_options.into_parts();
1400    let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1401        .context("failed to extract external table config")?;
1402
1403    let table = ExternalTableImpl::connect(config)
1404        .await
1405        .context("failed to auto derive table schema")?;
1406
1407    Ok((
1408        table
1409            .column_descs()
1410            .iter()
1411            .cloned()
1412            .map(|column_desc| ColumnCatalog {
1413                column_desc,
1414                is_hidden: false,
1415            })
1416            .collect(),
1417        table.pk_names().clone(),
1418    ))
1419}
1420
1421/// Derive schema for cdc table when create a new Table or alter an existing Table
1422fn bind_cdc_table_schema(
1423    column_defs: &Vec<ColumnDef>,
1424    constraints: &Vec<TableConstraint>,
1425    is_for_replace_plan: bool,
1426) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1427    let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1428
1429    let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1430    Ok((columns, pk_names))
1431}
1432
1433#[allow(clippy::too_many_arguments)]
1434pub async fn handle_create_table(
1435    handler_args: HandlerArgs,
1436    table_name: ObjectName,
1437    column_defs: Vec<ColumnDef>,
1438    wildcard_idx: Option<usize>,
1439    constraints: Vec<TableConstraint>,
1440    if_not_exists: bool,
1441    format_encode: Option<FormatEncodeOptions>,
1442    source_watermarks: Vec<SourceWatermark>,
1443    append_only: bool,
1444    on_conflict: Option<OnConflict>,
1445    with_version_columns: Vec<String>,
1446    cdc_table_info: Option<CdcTableInfo>,
1447    include_column_options: IncludeOption,
1448    webhook_info: Option<WebhookSourceInfo>,
1449    ast_engine: risingwave_sqlparser::ast::Engine,
1450) -> Result<RwPgResponse> {
1451    let session = handler_args.session.clone();
1452
1453    if append_only {
1454        session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1455    }
1456
1457    session.check_cluster_limits().await?;
1458
1459    let engine = match ast_engine {
1460        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1461        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1462    };
1463
1464    if let Either::Right(resp) = session.check_relation_name_duplicated(
1465        table_name.clone(),
1466        StatementType::CREATE_TABLE,
1467        if_not_exists,
1468    )? {
1469        return Ok(resp);
1470    }
1471
1472    let (graph, source, hummock_table, job_type, shared_source_id) = {
1473        let (plan, source, table, job_type, shared_source_id) = handle_create_table_plan(
1474            handler_args.clone(),
1475            ExplainOptions::default(),
1476            format_encode,
1477            cdc_table_info,
1478            &table_name,
1479            column_defs.clone(),
1480            wildcard_idx,
1481            constraints.clone(),
1482            source_watermarks,
1483            append_only,
1484            on_conflict,
1485            with_version_columns,
1486            include_column_options,
1487            webhook_info,
1488            engine,
1489        )
1490        .await?;
1491        tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1492
1493        let graph = build_graph(plan, Some(GraphJobType::Table))?;
1494
1495        (graph, source, table, job_type, shared_source_id)
1496    };
1497
1498    tracing::trace!(
1499        "name={}, graph=\n{}",
1500        table_name,
1501        serde_json::to_string_pretty(&graph).unwrap()
1502    );
1503
1504    let dependencies = shared_source_id
1505        .map(|id| HashSet::from([id.as_object_id()]))
1506        .unwrap_or_default();
1507
1508    // Handle engine
1509    match engine {
1510        Engine::Hummock => {
1511            let catalog_writer = session.catalog_writer()?;
1512            let action = match job_type {
1513                TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
1514                _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
1515            };
1516            execute_with_long_running_notification(
1517                catalog_writer.create_table(
1518                    source.map(|s| s.to_prost()),
1519                    hummock_table.to_prost(),
1520                    graph,
1521                    job_type,
1522                    if_not_exists,
1523                    dependencies,
1524                ),
1525                &session,
1526                "CREATE TABLE",
1527                action,
1528            )
1529            .await?;
1530        }
1531        Engine::Iceberg => {
1532            let hummock_table_name = hummock_table.name.clone();
1533            session.create_staging_table(hummock_table.clone());
1534            let res = create_iceberg_engine_table(
1535                session.clone(),
1536                handler_args,
1537                source.map(|s| s.to_prost()),
1538                hummock_table,
1539                graph,
1540                table_name,
1541                job_type,
1542                if_not_exists,
1543            )
1544            .await;
1545            session.drop_staging_table(&hummock_table_name);
1546            res?
1547        }
1548    }
1549
1550    Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1551}
1552
1553/// Iceberg table engine is composed of hummock table, iceberg sink and iceberg source.
1554///
1555/// 1. fetch iceberg engine options from the meta node. Or use iceberg engine connection provided by users.
1556/// 2. create a hummock table
1557/// 3. create an iceberg sink
1558/// 4. create an iceberg source
1559///
1560/// See <https://github.com/risingwavelabs/risingwave/issues/21586> for an architecture diagram.
1561#[allow(clippy::too_many_arguments)]
1562pub async fn create_iceberg_engine_table(
1563    session: Arc<SessionImpl>,
1564    handler_args: HandlerArgs,
1565    mut source: Option<PbSource>,
1566    table: TableCatalog,
1567    graph: StreamFragmentGraph,
1568    table_name: ObjectName,
1569    job_type: PbTableJobType,
1570    if_not_exists: bool,
1571) -> Result<()> {
1572    let rw_db_name = session
1573        .env()
1574        .catalog_reader()
1575        .read_guard()
1576        .get_database_by_id(table.database_id)?
1577        .name()
1578        .to_owned();
1579    let rw_schema_name = session
1580        .env()
1581        .catalog_reader()
1582        .read_guard()
1583        .get_schema_by_id(table.database_id, table.schema_id)?
1584        .name()
1585        .clone();
1586    let iceberg_catalog_name = rw_db_name.clone();
1587    let iceberg_database_name = rw_schema_name.clone();
1588    let iceberg_table_name = table_name.0.last().unwrap().real_value();
1589
1590    let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1591    let sink_decouple = session.config().sink_decouple();
1592    if matches!(sink_decouple, SinkDecouple::Disable) {
1593        bail!(
1594            "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1595        );
1596    }
1597
1598    let mut connection_ref = BTreeMap::new();
1599    let with_common = if iceberg_engine_connection.is_empty() {
1600        bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1601    } else {
1602        let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1603        assert_eq!(parts.len(), 2);
1604        let connection_catalog =
1605            session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1606        if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1607            if params.connection_type == ConnectionType::Iceberg as i32 {
1608                // With iceberg engine connection:
1609                connection_ref.insert(
1610                    "connection".to_owned(),
1611                    ConnectionRefValue {
1612                        connection_name: ObjectName::from(vec![
1613                            Ident::from(parts[0]),
1614                            Ident::from(parts[1]),
1615                        ]),
1616                    },
1617                );
1618
1619                let mut with_common = BTreeMap::new();
1620                with_common.insert("connector".to_owned(), "iceberg".to_owned());
1621                with_common.insert("database.name".to_owned(), iceberg_database_name);
1622                with_common.insert("table.name".to_owned(), iceberg_table_name);
1623
1624                let hosted_catalog = params
1625                    .properties
1626                    .get("hosted_catalog")
1627                    .map(|s| s.eq_ignore_ascii_case("true"))
1628                    .unwrap_or(false);
1629                if hosted_catalog {
1630                    let meta_client = session.env().meta_client();
1631                    let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1632
1633                    let meta_store_endpoint =
1634                        url::Url::parse(&meta_store_endpoint).map_err(|_| {
1635                            ErrorCode::InternalError(
1636                                "failed to parse the meta store endpoint".to_owned(),
1637                            )
1638                        })?;
1639                    let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1640                    let meta_store_user = meta_store_endpoint.username().to_owned();
1641                    let meta_store_password = match meta_store_endpoint.password() {
1642                        Some(password) => percent_decode_str(password)
1643                            .decode_utf8()
1644                            .map_err(|_| {
1645                                ErrorCode::InternalError(
1646                                    "failed to parse password from meta store endpoint".to_owned(),
1647                                )
1648                            })?
1649                            .into_owned(),
1650                        None => "".to_owned(),
1651                    };
1652                    let meta_store_host = meta_store_endpoint
1653                        .host_str()
1654                        .ok_or_else(|| {
1655                            ErrorCode::InternalError(
1656                                "failed to parse host from meta store endpoint".to_owned(),
1657                            )
1658                        })?
1659                        .to_owned();
1660                    let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1661                        ErrorCode::InternalError(
1662                            "failed to parse port from meta store endpoint".to_owned(),
1663                        )
1664                    })?;
1665                    let meta_store_database = meta_store_endpoint
1666                        .path()
1667                        .trim_start_matches('/')
1668                        .to_owned();
1669
1670                    let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1671                        bail!("failed to parse meta backend: {}", meta_store_backend);
1672                    };
1673
1674                    let catalog_uri = match meta_backend {
1675                        MetaBackend::Postgres => {
1676                            format!(
1677                                "jdbc:postgresql://{}:{}/{}",
1678                                meta_store_host, meta_store_port, meta_store_database
1679                            )
1680                        }
1681                        MetaBackend::Mysql => {
1682                            format!(
1683                                "jdbc:mysql://{}:{}/{}",
1684                                meta_store_host, meta_store_port, meta_store_database
1685                            )
1686                        }
1687                        MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1688                            bail!(
1689                                "Unsupported meta backend for iceberg engine table: {}",
1690                                meta_store_backend
1691                            );
1692                        }
1693                    };
1694
1695                    with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1696                    with_common.insert("catalog.uri".to_owned(), catalog_uri);
1697                    with_common.insert("catalog.jdbc.user".to_owned(), meta_store_user);
1698                    with_common.insert("catalog.jdbc.password".to_owned(), meta_store_password);
1699                    with_common.insert("catalog.name".to_owned(), iceberg_catalog_name);
1700                }
1701
1702                with_common
1703            } else {
1704                return Err(RwError::from(ErrorCode::InvalidParameterValue(
1705                    "Only iceberg connection could be used in iceberg engine".to_owned(),
1706                )));
1707            }
1708        } else {
1709            return Err(RwError::from(ErrorCode::InvalidParameterValue(
1710                "Private Link Service has been deprecated. Please create a new connection instead."
1711                    .to_owned(),
1712            )));
1713        }
1714    };
1715
1716    // Iceberg sinks require a primary key, if none is provided, we will use the _row_id column
1717    // Fetch primary key from columns
1718    let mut pks = table
1719        .pk_column_names()
1720        .iter()
1721        .map(|c| c.to_string())
1722        .collect::<Vec<String>>();
1723
1724    // For the table without primary key. We will use `_row_id` as primary key
1725    let sink_from = if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1726        pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1727        let [stmt]: [_; 1] = Parser::parse_sql(&format!(
1728            "select {} as {}, * from {}",
1729            ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, table_name
1730        ))
1731        .context("unable to parse query")?
1732        .try_into()
1733        .unwrap();
1734
1735        let Statement::Query(query) = &stmt else {
1736            panic!("unexpected statement: {:?}", stmt);
1737        };
1738        CreateSink::AsQuery(query.clone())
1739    } else {
1740        CreateSink::From(table_name.clone())
1741    };
1742
1743    let mut sink_name = table_name.clone();
1744    *sink_name.0.last_mut().unwrap() = Ident::from(
1745        (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1746    );
1747    let create_sink_stmt = CreateSinkStatement {
1748        if_not_exists: false,
1749        sink_name,
1750        with_properties: WithProperties(vec![]),
1751        sink_from,
1752        columns: vec![],
1753        emit_mode: None,
1754        sink_schema: None,
1755        into_table_name: None,
1756    };
1757
1758    let mut sink_handler_args = handler_args.clone();
1759
1760    let mut sink_with = with_common.clone();
1761
1762    if table.append_only {
1763        sink_with.insert("type".to_owned(), "append-only".to_owned());
1764    } else {
1765        sink_with.insert("primary_key".to_owned(), pks.join(","));
1766        sink_with.insert("type".to_owned(), "upsert".to_owned());
1767    }
1768    // sink_with.insert(SINK_SNAPSHOT_OPTION.to_owned(), "false".to_owned());
1769    //
1770    // Note: in theory, we don't need to backfill from the table to the sink,
1771    // but we don't have atomic DDL now https://github.com/risingwavelabs/risingwave/issues/21863
1772    // so it may have potential data loss problem on the first barrier.
1773    //
1774    // For non-append-only table, we can always solve it by the initial sink with backfill, since
1775    // data will be present in hummock table.
1776    //
1777    // For append-only table, we need to be more careful.
1778    //
1779    // The possible cases for a table:
1780    // - For table without connector: it doesn't matter, since there's no data before the table is created
1781    // - For table with connector: we workarounded it by setting SOURCE_RATE_LIMIT to 0
1782    //   + If we support blocking DDL for table with connector, we need to be careful.
1783    // - For table with an upstream job: Specifically, CDC table from shared CDC source.
1784    //   + Data may come from both upstream connector, and CDC table backfill, so we need to pause both of them.
1785    //   + For now we don't support APPEND ONLY CDC table, so it's safe.
1786    let commit_checkpoint_interval = handler_args
1787        .with_options
1788        .get(COMMIT_CHECKPOINT_INTERVAL)
1789        .map(|v| v.to_owned())
1790        .unwrap_or_else(|| "60".to_owned());
1791    let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1792        ErrorCode::InvalidInputSyntax(format!(
1793            "commit_checkpoint_interval must be greater than 0: {}",
1794            commit_checkpoint_interval
1795        ))
1796    })?;
1797
1798    if commit_checkpoint_interval == 0 {
1799        bail!("commit_checkpoint_interval must be greater than 0");
1800    }
1801
1802    // remove commit_checkpoint_interval from source options, otherwise it will be considered as an unknown field.
1803    source
1804        .as_mut()
1805        .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1806
1807    let sink_decouple = session.config().sink_decouple();
1808    if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1809        bail!(
1810            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1811        )
1812    }
1813
1814    sink_with.insert(
1815        COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1816        commit_checkpoint_interval.to_string(),
1817    );
1818    sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1819
1820    sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1821
1822    if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1823        match enable_compaction.to_lowercase().as_str() {
1824            "true" => {
1825                sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1826            }
1827            "false" => {
1828                sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1829            }
1830            _ => {
1831                return Err(ErrorCode::InvalidInputSyntax(format!(
1832                    "enable_compaction must be true or false: {}",
1833                    enable_compaction
1834                ))
1835                .into());
1836            }
1837        }
1838
1839        // remove enable_compaction from source options, otherwise it will be considered as an unknown field.
1840        source
1841            .as_mut()
1842            .map(|x| x.with_properties.remove("enable_compaction"));
1843    } else {
1844        sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1845    }
1846
1847    if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1848        let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1849            ErrorCode::InvalidInputSyntax(format!(
1850                "compaction_interval_sec must be greater than 0: {}",
1851                commit_checkpoint_interval
1852            ))
1853        })?;
1854        if compaction_interval_sec == 0 {
1855            bail!("compaction_interval_sec must be greater than 0");
1856        }
1857        sink_with.insert(
1858            "compaction_interval_sec".to_owned(),
1859            compaction_interval_sec.to_string(),
1860        );
1861        // remove compaction_interval_sec from source options, otherwise it will be considered as an unknown field.
1862        source
1863            .as_mut()
1864            .map(|x| x.with_properties.remove("compaction_interval_sec"));
1865    }
1866
1867    let has_enabled_snapshot_expiration = if let Some(enable_snapshot_expiration) =
1868        handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1869    {
1870        // remove enable_snapshot_expiration from source options, otherwise it will be considered as an unknown field.
1871        source
1872            .as_mut()
1873            .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1874        match enable_snapshot_expiration.to_lowercase().as_str() {
1875            "true" => {
1876                sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1877                true
1878            }
1879            "false" => {
1880                sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1881                false
1882            }
1883            _ => {
1884                return Err(ErrorCode::InvalidInputSyntax(format!(
1885                    "enable_snapshot_expiration must be true or false: {}",
1886                    enable_snapshot_expiration
1887                ))
1888                .into());
1889            }
1890        }
1891    } else {
1892        sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1893        true
1894    };
1895
1896    if has_enabled_snapshot_expiration {
1897        // configuration for snapshot expiration
1898        if let Some(snapshot_expiration_retain_last) = handler_args
1899            .with_options
1900            .get(SNAPSHOT_EXPIRATION_RETAIN_LAST)
1901        {
1902            sink_with.insert(
1903                SNAPSHOT_EXPIRATION_RETAIN_LAST.to_owned(),
1904                snapshot_expiration_retain_last.to_owned(),
1905            );
1906            // remove snapshot_expiration_retain_last from source options, otherwise it will be considered as an unknown field.
1907            source
1908                .as_mut()
1909                .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_RETAIN_LAST));
1910        }
1911
1912        if let Some(snapshot_expiration_max_age) = handler_args
1913            .with_options
1914            .get(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS)
1915        {
1916            sink_with.insert(
1917                SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS.to_owned(),
1918                snapshot_expiration_max_age.to_owned(),
1919            );
1920            // remove snapshot_expiration_max_age from source options, otherwise it will be considered as an unknown field.
1921            source
1922                .as_mut()
1923                .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS));
1924        }
1925
1926        if let Some(snapshot_expiration_clear_expired_files) = handler_args
1927            .with_options
1928            .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1929        {
1930            sink_with.insert(
1931                SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES.to_owned(),
1932                snapshot_expiration_clear_expired_files.to_owned(),
1933            );
1934            // remove snapshot_expiration_clear_expired_files from source options, otherwise it will be considered as an unknown field.
1935            source.as_mut().map(|x| {
1936                x.with_properties
1937                    .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1938            });
1939        }
1940
1941        if let Some(snapshot_expiration_clear_expired_meta_data) = handler_args
1942            .with_options
1943            .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1944        {
1945            sink_with.insert(
1946                SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA.to_owned(),
1947                snapshot_expiration_clear_expired_meta_data.to_owned(),
1948            );
1949            // remove snapshot_expiration_clear_expired_meta_data from source options, otherwise it will be considered as an unknown field.
1950            source.as_mut().map(|x| {
1951                x.with_properties
1952                    .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1953            });
1954        }
1955    }
1956
1957    if let Some(format_version) = handler_args.with_options.get(FORMAT_VERSION) {
1958        let format_version = format_version.parse::<u8>().map_err(|_| {
1959            ErrorCode::InvalidInputSyntax(format!(
1960                "format_version must be 1, 2 or 3: {}",
1961                format_version
1962            ))
1963        })?;
1964        if format_version != 1 && format_version != 2 && format_version != 3 {
1965            bail!("format_version must be 1, 2 or 3");
1966        }
1967        sink_with.insert(FORMAT_VERSION.to_owned(), format_version.to_string());
1968
1969        // remove format_version from source options, otherwise it will be considered as an unknown field.
1970        source
1971            .as_mut()
1972            .map(|x| x.with_properties.remove(FORMAT_VERSION));
1973    }
1974
1975    if let Some(write_mode) = handler_args.with_options.get(WRITE_MODE) {
1976        let write_mode = IcebergWriteMode::try_from(write_mode.as_str()).map_err(|_| {
1977            ErrorCode::InvalidInputSyntax(format!(
1978                "invalid write_mode: {}, must be one of: {}, {}",
1979                write_mode, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
1980            ))
1981        })?;
1982
1983        match write_mode {
1984            IcebergWriteMode::MergeOnRead => {
1985                sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
1986            }
1987
1988            IcebergWriteMode::CopyOnWrite => {
1989                if table.append_only {
1990                    return Err(ErrorCode::NotSupported(
1991                        "COPY ON WRITE is not supported for append-only iceberg table".to_owned(),
1992                        "Please use MERGE ON READ instead".to_owned(),
1993                    )
1994                    .into());
1995                }
1996
1997                sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
1998            }
1999        }
2000
2001        // remove write_mode from source options, otherwise it will be considered as an unknown field.
2002        source
2003            .as_mut()
2004            .map(|x| x.with_properties.remove("write_mode"));
2005    } else {
2006        sink_with.insert(
2007            WRITE_MODE.to_owned(),
2008            ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
2009        );
2010    }
2011
2012    if let Some(max_snapshots_num_before_compaction) =
2013        handler_args.with_options.get(COMPACTION_MAX_SNAPSHOTS_NUM)
2014    {
2015        let max_snapshots_num_before_compaction = max_snapshots_num_before_compaction
2016            .parse::<u32>()
2017            .map_err(|_| {
2018                ErrorCode::InvalidInputSyntax(format!(
2019                    "{} must be greater than 0: {}",
2020                    COMPACTION_MAX_SNAPSHOTS_NUM, max_snapshots_num_before_compaction
2021                ))
2022            })?;
2023
2024        if max_snapshots_num_before_compaction == 0 {
2025            bail!(format!(
2026                "{} must be greater than 0",
2027                COMPACTION_MAX_SNAPSHOTS_NUM
2028            ));
2029        }
2030
2031        sink_with.insert(
2032            COMPACTION_MAX_SNAPSHOTS_NUM.to_owned(),
2033            max_snapshots_num_before_compaction.to_string(),
2034        );
2035
2036        // remove from source options, otherwise it will be considered as an unknown field.
2037        source
2038            .as_mut()
2039            .map(|x| x.with_properties.remove(COMPACTION_MAX_SNAPSHOTS_NUM));
2040    }
2041
2042    if let Some(small_files_threshold_mb) = handler_args
2043        .with_options
2044        .get(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2045    {
2046        let small_files_threshold_mb = small_files_threshold_mb.parse::<u64>().map_err(|_| {
2047            ErrorCode::InvalidInputSyntax(format!(
2048                "{} must be greater than 0: {}",
2049                COMPACTION_SMALL_FILES_THRESHOLD_MB, small_files_threshold_mb
2050            ))
2051        })?;
2052        if small_files_threshold_mb == 0 {
2053            bail!(format!(
2054                "{} must be a greater than 0",
2055                COMPACTION_SMALL_FILES_THRESHOLD_MB
2056            ));
2057        }
2058        sink_with.insert(
2059            COMPACTION_SMALL_FILES_THRESHOLD_MB.to_owned(),
2060            small_files_threshold_mb.to_string(),
2061        );
2062
2063        // remove from source options, otherwise it will be considered as an unknown field.
2064        source.as_mut().map(|x| {
2065            x.with_properties
2066                .remove(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2067        });
2068    }
2069
2070    if let Some(delete_files_count_threshold) = handler_args
2071        .with_options
2072        .get(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2073    {
2074        let delete_files_count_threshold =
2075            delete_files_count_threshold.parse::<usize>().map_err(|_| {
2076                ErrorCode::InvalidInputSyntax(format!(
2077                    "{} must be greater than 0: {}",
2078                    COMPACTION_DELETE_FILES_COUNT_THRESHOLD, delete_files_count_threshold
2079                ))
2080            })?;
2081        if delete_files_count_threshold == 0 {
2082            bail!(format!(
2083                "{} must be greater than 0",
2084                COMPACTION_DELETE_FILES_COUNT_THRESHOLD
2085            ));
2086        }
2087        sink_with.insert(
2088            COMPACTION_DELETE_FILES_COUNT_THRESHOLD.to_owned(),
2089            delete_files_count_threshold.to_string(),
2090        );
2091
2092        // remove from source options, otherwise it will be considered as an unknown field.
2093        source.as_mut().map(|x| {
2094            x.with_properties
2095                .remove(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2096        });
2097    }
2098
2099    if let Some(trigger_snapshot_count) = handler_args
2100        .with_options
2101        .get(COMPACTION_TRIGGER_SNAPSHOT_COUNT)
2102    {
2103        let trigger_snapshot_count = trigger_snapshot_count.parse::<usize>().map_err(|_| {
2104            ErrorCode::InvalidInputSyntax(format!(
2105                "{} must be greater than 0: {}",
2106                COMPACTION_TRIGGER_SNAPSHOT_COUNT, trigger_snapshot_count
2107            ))
2108        })?;
2109        if trigger_snapshot_count == 0 {
2110            bail!(format!(
2111                "{} must be greater than 0",
2112                COMPACTION_TRIGGER_SNAPSHOT_COUNT
2113            ));
2114        }
2115        sink_with.insert(
2116            COMPACTION_TRIGGER_SNAPSHOT_COUNT.to_owned(),
2117            trigger_snapshot_count.to_string(),
2118        );
2119
2120        // remove from source options, otherwise it will be considered as an unknown field.
2121        source
2122            .as_mut()
2123            .map(|x| x.with_properties.remove(COMPACTION_TRIGGER_SNAPSHOT_COUNT));
2124    }
2125
2126    if let Some(target_file_size_mb) = handler_args
2127        .with_options
2128        .get(COMPACTION_TARGET_FILE_SIZE_MB)
2129    {
2130        let target_file_size_mb = target_file_size_mb.parse::<u64>().map_err(|_| {
2131            ErrorCode::InvalidInputSyntax(format!(
2132                "{} must be greater than 0: {}",
2133                COMPACTION_TARGET_FILE_SIZE_MB, target_file_size_mb
2134            ))
2135        })?;
2136        if target_file_size_mb == 0 {
2137            bail!(format!(
2138                "{} must be greater than 0",
2139                COMPACTION_TARGET_FILE_SIZE_MB
2140            ));
2141        }
2142        sink_with.insert(
2143            COMPACTION_TARGET_FILE_SIZE_MB.to_owned(),
2144            target_file_size_mb.to_string(),
2145        );
2146        // remove from source options, otherwise it will be considered as an unknown field.
2147        source
2148            .as_mut()
2149            .map(|x| x.with_properties.remove(COMPACTION_TARGET_FILE_SIZE_MB));
2150    }
2151
2152    if let Some(compaction_type) = handler_args.with_options.get(COMPACTION_TYPE) {
2153        let compaction_type = CompactionType::try_from(compaction_type.as_str()).map_err(|_| {
2154            ErrorCode::InvalidInputSyntax(format!(
2155                "invalid compaction_type: {}, must be one of {:?}",
2156                compaction_type,
2157                &[
2158                    CompactionType::Full,
2159                    CompactionType::SmallFiles,
2160                    CompactionType::FilesWithDelete
2161                ]
2162            ))
2163        })?;
2164
2165        sink_with.insert(
2166            COMPACTION_TYPE.to_owned(),
2167            compaction_type.as_str().to_owned(),
2168        );
2169
2170        // remove from source options, otherwise it will be considered as an unknown field.
2171        source
2172            .as_mut()
2173            .map(|x| x.with_properties.remove(COMPACTION_TYPE));
2174    }
2175
2176    if let Some(write_parquet_compression) = handler_args
2177        .with_options
2178        .get(COMPACTION_WRITE_PARQUET_COMPRESSION)
2179    {
2180        sink_with.insert(
2181            COMPACTION_WRITE_PARQUET_COMPRESSION.to_owned(),
2182            write_parquet_compression.to_owned(),
2183        );
2184        // remove from source options, otherwise it will be considered as an unknown field.
2185        source.as_mut().map(|x| {
2186            x.with_properties
2187                .remove(COMPACTION_WRITE_PARQUET_COMPRESSION)
2188        });
2189    }
2190
2191    if let Some(write_parquet_max_row_group_rows) = handler_args
2192        .with_options
2193        .get(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS)
2194    {
2195        let write_parquet_max_row_group_rows = write_parquet_max_row_group_rows
2196            .parse::<usize>()
2197            .map_err(|_| {
2198                ErrorCode::InvalidInputSyntax(format!(
2199                    "{} must be a positive integer: {}",
2200                    COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS, write_parquet_max_row_group_rows
2201                ))
2202            })?;
2203        if write_parquet_max_row_group_rows == 0 {
2204            bail!(format!(
2205                "{} must be greater than 0",
2206                COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS
2207            ));
2208        }
2209        sink_with.insert(
2210            COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS.to_owned(),
2211            write_parquet_max_row_group_rows.to_string(),
2212        );
2213        // remove from source options, otherwise it will be considered as an unknown field.
2214        source.as_mut().map(|x| {
2215            x.with_properties
2216                .remove(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS)
2217        });
2218    }
2219
2220    let partition_by = handler_args
2221        .with_options
2222        .get("partition_by")
2223        .map(|v| v.to_owned());
2224
2225    if let Some(partition_by) = &partition_by {
2226        let mut partition_columns = vec![];
2227        for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
2228            table
2229                .columns()
2230                .iter()
2231                .find(|col| col.name().eq_ignore_ascii_case(&column))
2232                .ok_or_else(|| {
2233                    ErrorCode::InvalidInputSyntax(format!(
2234                        "Partition source column does not exist in schema: {}",
2235                        column
2236                    ))
2237                })?;
2238
2239            partition_columns.push(column);
2240        }
2241
2242        ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
2243            |_| {
2244                ErrorCode::InvalidInputSyntax(
2245                    "The partition columns should be the prefix of the primary key".to_owned(),
2246                )
2247            },
2248        )?;
2249
2250        sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
2251
2252        // remove partition_by from source options, otherwise it will be considered as an unknown field.
2253        source
2254            .as_mut()
2255            .map(|x| x.with_properties.remove("partition_by"));
2256    }
2257
2258    sink_handler_args.with_options =
2259        WithOptions::new(sink_with, Default::default(), connection_ref.clone());
2260    let SinkPlanContext {
2261        sink_plan,
2262        sink_catalog,
2263        ..
2264    } = gen_sink_plan(sink_handler_args, create_sink_stmt, None, true).await?;
2265    let sink_graph = build_graph(sink_plan, Some(GraphJobType::Sink))?;
2266
2267    let mut source_name = table_name.clone();
2268    *source_name.0.last_mut().unwrap() = Ident::from(
2269        (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
2270    );
2271    let create_source_stmt = CreateSourceStatement {
2272        temporary: false,
2273        if_not_exists: false,
2274        columns: vec![],
2275        source_name,
2276        wildcard_idx: Some(0),
2277        constraints: vec![],
2278        with_properties: WithProperties(vec![]),
2279        format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
2280        source_watermarks: vec![],
2281        include_column_options: vec![],
2282    };
2283
2284    let mut source_handler_args = handler_args.clone();
2285    let source_with = with_common;
2286    source_handler_args.with_options =
2287        WithOptions::new(source_with, Default::default(), connection_ref);
2288
2289    let overwrite_options = OverwriteOptions::new(&mut source_handler_args);
2290    let format_encode = create_source_stmt.format_encode.into_v2_with_warning();
2291    let (with_properties, refresh_mode) =
2292        bind_connector_props(&source_handler_args, &format_encode, true)?;
2293
2294    // Create iceberg sink table, used for iceberg source column binding. See `bind_columns_from_source_for_non_cdc` for more details.
2295    // TODO: We can derive the columns directly from table definition in the future, so that we don't need to pre-create the table catalog.
2296    let (iceberg_catalog, table_identifier) = {
2297        let sink_param = SinkParam::try_from_sink_catalog(sink_catalog.clone())?;
2298        let iceberg_sink = IcebergSink::try_from(sink_param)?;
2299        iceberg_sink.create_table_if_not_exists().await?;
2300
2301        let iceberg_catalog = iceberg_sink.config.create_catalog().await?;
2302        let table_identifier = iceberg_sink.config.full_table_name()?;
2303        (iceberg_catalog, table_identifier)
2304    };
2305
2306    let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
2307    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
2308        &session,
2309        &format_encode,
2310        Either::Left(&with_properties),
2311        create_source_type,
2312    )
2313    .await?;
2314    let mut col_id_gen = ColumnIdGenerator::new_initial();
2315
2316    let iceberg_source_catalog = bind_create_source_or_table_with_connector(
2317        source_handler_args,
2318        create_source_stmt.source_name,
2319        format_encode,
2320        with_properties,
2321        &create_source_stmt.columns,
2322        create_source_stmt.constraints,
2323        create_source_stmt.wildcard_idx,
2324        create_source_stmt.source_watermarks,
2325        columns_from_resolve_source,
2326        source_info,
2327        create_source_stmt.include_column_options,
2328        &mut col_id_gen,
2329        create_source_type,
2330        overwrite_options.source_rate_limit,
2331        SqlColumnStrategy::FollowChecked,
2332        refresh_mode,
2333    )
2334    .await?;
2335
2336    // before we create the table, ensure the JVM is initialized as we use jdbc catalog right now.
2337    // If JVM isn't initialized successfully, current not atomic ddl will result in a partially created iceberg engine table.
2338    let _ = Jvm::get_or_init()?;
2339
2340    let catalog_writer = session.catalog_writer()?;
2341    let action = match job_type {
2342        TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
2343        _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
2344    };
2345    let res = execute_with_long_running_notification(
2346        catalog_writer.create_iceberg_table(
2347            PbTableJobInfo {
2348                source,
2349                table: Some(table.to_prost()),
2350                fragment_graph: Some(graph),
2351                job_type: job_type as _,
2352            },
2353            PbSinkJobInfo {
2354                sink: Some(sink_catalog.to_proto()),
2355                fragment_graph: Some(sink_graph),
2356            },
2357            iceberg_source_catalog.to_prost(),
2358            if_not_exists,
2359        ),
2360        &session,
2361        "CREATE TABLE",
2362        action,
2363    )
2364    .await;
2365
2366    if res.is_err() {
2367        let _ = iceberg_catalog
2368            .drop_table(&table_identifier)
2369            .await
2370            .inspect_err(|err| {
2371                tracing::error!(
2372                    "failed to drop iceberg table {} after create iceberg engine table failed: {}",
2373                    table_identifier,
2374                    err.as_report()
2375                );
2376            });
2377        res?
2378    }
2379
2380    Ok(())
2381}
2382
2383pub fn check_create_table_with_source(
2384    with_options: &WithOptions,
2385    format_encode: Option<FormatEncodeOptions>,
2386    include_column_options: &IncludeOption,
2387    cdc_table_info: &Option<CdcTableInfo>,
2388) -> Result<Option<FormatEncodeOptions>> {
2389    // skip check for cdc table
2390    if cdc_table_info.is_some() {
2391        return Ok(format_encode);
2392    }
2393    let defined_source = with_options.is_source_connector();
2394
2395    if !include_column_options.is_empty() && !defined_source {
2396        return Err(ErrorCode::InvalidInputSyntax(
2397            "INCLUDE should be used with a connector".to_owned(),
2398        )
2399        .into());
2400    }
2401    if defined_source {
2402        format_encode.as_ref().ok_or_else(|| {
2403            ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
2404        })?;
2405    }
2406    Ok(format_encode)
2407}
2408
2409fn ensure_partition_columns_are_prefix_of_primary_key(
2410    partition_columns: &[String],
2411    primary_key_columns: &[String],
2412) -> std::result::Result<(), String> {
2413    if partition_columns.len() > primary_key_columns.len() {
2414        return Err("Partition columns cannot be longer than primary key columns.".to_owned());
2415    }
2416
2417    for (i, partition_col) in partition_columns.iter().enumerate() {
2418        if primary_key_columns.get(i) != Some(partition_col) {
2419            return Err(format!(
2420                "Partition column '{}' is not a prefix of the primary key.",
2421                partition_col
2422            ));
2423        }
2424    }
2425
2426    Ok(())
2427}
2428
2429#[allow(clippy::too_many_arguments)]
2430pub async fn generate_stream_graph_for_replace_table(
2431    _session: &Arc<SessionImpl>,
2432    table_name: ObjectName,
2433    original_catalog: &Arc<TableCatalog>,
2434    handler_args: HandlerArgs,
2435    statement: Statement,
2436    col_id_gen: ColumnIdGenerator,
2437    sql_column_strategy: SqlColumnStrategy,
2438) -> Result<(
2439    StreamFragmentGraph,
2440    TableCatalog,
2441    Option<SourceCatalog>,
2442    TableJobType,
2443)> {
2444    let Statement::CreateTable {
2445        columns,
2446        constraints,
2447        source_watermarks,
2448        append_only,
2449        on_conflict,
2450        with_version_columns,
2451        wildcard_idx,
2452        cdc_table_info,
2453        format_encode,
2454        include_column_options,
2455        engine,
2456        with_options,
2457        ..
2458    } = statement
2459    else {
2460        panic!("unexpected statement type: {:?}", statement);
2461    };
2462
2463    let format_encode = format_encode
2464        .clone()
2465        .map(|format_encode| format_encode.into_v2_with_warning());
2466
2467    let engine = match engine {
2468        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
2469        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
2470    };
2471
2472    let is_drop_connector =
2473        original_catalog.associated_source_id().is_some() && format_encode.is_none();
2474    if is_drop_connector {
2475        debug_assert!(
2476            source_watermarks.is_empty()
2477                && include_column_options.is_empty()
2478                && with_options
2479                    .iter()
2480                    .all(|opt| opt.name.real_value().to_lowercase() != "connector")
2481        );
2482    }
2483
2484    let props = CreateTableProps {
2485        definition: handler_args.normalized_sql.clone(),
2486        append_only,
2487        on_conflict: on_conflict.into(),
2488        with_version_columns: with_version_columns
2489            .iter()
2490            .map(|col| col.real_value())
2491            .collect(),
2492        webhook_info: original_catalog.webhook_info.clone(),
2493        engine,
2494    };
2495
2496    let ((plan, mut source, mut table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2497        (Some(format_encode), None) => (
2498            gen_create_table_plan_with_source(
2499                handler_args,
2500                ExplainOptions::default(),
2501                table_name,
2502                columns,
2503                wildcard_idx,
2504                constraints,
2505                format_encode,
2506                source_watermarks,
2507                col_id_gen,
2508                include_column_options,
2509                props,
2510                sql_column_strategy,
2511            )
2512            .await?,
2513            TableJobType::General,
2514        ),
2515        (None, None) => {
2516            let context = OptimizerContext::from_handler_args(handler_args);
2517            let (plan, table) = gen_create_table_plan(
2518                context,
2519                table_name,
2520                columns,
2521                constraints,
2522                col_id_gen,
2523                source_watermarks,
2524                props,
2525                true,
2526            )?;
2527            ((plan, None, table), TableJobType::General)
2528        }
2529        (None, Some(cdc_table)) => {
2530            let session = &handler_args.session;
2531            let (source, resolved_table_name) =
2532                get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2533
2534            let (cdc_with_options, normalized_external_table_name) =
2535                derive_with_options_for_cdc_table(
2536                    &source.with_properties,
2537                    cdc_table.external_table_name.clone(),
2538                )?;
2539
2540            let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2541
2542            let context: OptimizerContextRef =
2543                OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2544            let (plan, table) = gen_create_table_plan_for_cdc_table(
2545                context,
2546                source,
2547                normalized_external_table_name,
2548                columns,
2549                column_catalogs,
2550                pk_names,
2551                cdc_with_options,
2552                col_id_gen,
2553                on_conflict,
2554                with_version_columns
2555                    .iter()
2556                    .map(|col| col.real_value())
2557                    .collect(),
2558                include_column_options,
2559                table_name,
2560                resolved_table_name,
2561                original_catalog.database_id,
2562                original_catalog.schema_id,
2563                original_catalog.id(),
2564                engine,
2565            )?;
2566
2567            ((plan, None, table), TableJobType::SharedCdcSource)
2568        }
2569        (Some(_), Some(_)) => {
2570            return Err(ErrorCode::NotSupported(
2571                "Data format and encoding format doesn't apply to table created from a CDC source"
2572                    .into(),
2573                "Remove the FORMAT and ENCODE specification".into(),
2574            )
2575            .into());
2576        }
2577    };
2578
2579    if table.pk_column_ids() != original_catalog.pk_column_ids() {
2580        Err(ErrorCode::InvalidInputSyntax(
2581            "alter primary key of table is not supported".to_owned(),
2582        ))?
2583    }
2584
2585    let graph = build_graph(plan, Some(GraphJobType::Table))?;
2586
2587    // Fill the original table ID.
2588    table.id = original_catalog.id();
2589    if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2590        table.associated_source_id = Some(source_id);
2591
2592        let source = source.as_mut().unwrap();
2593        source.id = source_id;
2594        source.associated_table_id = Some(table.id());
2595    }
2596
2597    Ok((graph, table, source, job_type))
2598}
2599
2600fn get_source_and_resolved_table_name(
2601    session: &Arc<SessionImpl>,
2602    cdc_table: CdcTableInfo,
2603    table_name: ObjectName,
2604) -> Result<(Arc<SourceCatalog>, String)> {
2605    let db_name = &session.database();
2606    let (_, resolved_table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
2607
2608    let (source_schema, source_name) =
2609        Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
2610
2611    let source = {
2612        let catalog_reader = session.env().catalog_reader().read_guard();
2613        let schema_name = source_schema.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2614        let (source, _) = catalog_reader.get_source_by_name(
2615            db_name,
2616            SchemaPath::Name(schema_name.as_str()),
2617            source_name.as_str(),
2618        )?;
2619        source.clone()
2620    };
2621
2622    Ok((source, resolved_table_name))
2623}
2624
2625// validate the webhook_info and also bind the webhook_info to protobuf
2626fn bind_webhook_info(
2627    session: &Arc<SessionImpl>,
2628    columns_defs: &[ColumnDef],
2629    webhook_info: WebhookSourceInfo,
2630) -> Result<PbWebhookSourceInfo> {
2631    // validate columns
2632    if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &AstDataType::Jsonb
2633    {
2634        return Err(ErrorCode::InvalidInputSyntax(
2635            "Table with webhook source should have exactly one JSONB column".to_owned(),
2636        )
2637        .into());
2638    }
2639
2640    let WebhookSourceInfo {
2641        secret_ref,
2642        signature_expr,
2643        wait_for_persistence,
2644        is_batched,
2645    } = webhook_info;
2646
2647    // validate secret_ref
2648    let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2649        let db_name = &session.database();
2650        let (schema_name, secret_name) =
2651            Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
2652        let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2653        (
2654            Some(PbSecretRef {
2655                secret_id: secret_catalog.id,
2656                ref_as: match secret_ref.ref_as {
2657                    SecretRefAsType::Text => PbRefAsType::Text,
2658                    SecretRefAsType::File => PbRefAsType::File,
2659                }
2660                .into(),
2661            }),
2662            Some(secret_name),
2663        )
2664    } else {
2665        (None, None)
2666    };
2667
2668    let signature_expr = if let Some(signature_expr) = signature_expr {
2669        let secure_compare_context = SecureCompareContext {
2670            column_name: columns_defs[0].name.real_value(),
2671            secret_name,
2672        };
2673        let mut binder = Binder::new_for_ddl(session).with_secure_compare(secure_compare_context);
2674        let expr = binder.bind_expr(&signature_expr)?;
2675
2676        // validate expr, ensuring it is SECURE_COMPARE()
2677        if expr.as_function_call().is_none()
2678            || expr.as_function_call().unwrap().func_type()
2679                != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2680        {
2681            return Err(ErrorCode::InvalidInputSyntax(
2682                "The signature verification function must be SECURE_COMPARE()".to_owned(),
2683            )
2684            .into());
2685        }
2686
2687        Some(expr.to_expr_proto())
2688    } else {
2689        session.notice_to_user(
2690            "VALIDATE clause is strongly recommended for safety or production usages",
2691        );
2692        None
2693    };
2694
2695    let pb_webhook_info = PbWebhookSourceInfo {
2696        secret_ref: pb_secret_ref,
2697        signature_expr,
2698        wait_for_persistence,
2699        is_batched,
2700    };
2701
2702    Ok(pb_webhook_info)
2703}
2704
2705#[cfg(test)]
2706mod tests {
2707    use risingwave_common::catalog::{
2708        DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2709    };
2710    use risingwave_common::types::{DataType, StructType};
2711
2712    use super::*;
2713    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2714
2715    #[tokio::test]
2716    async fn test_create_table_handler() {
2717        let sql =
2718            "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2719        let frontend = LocalFrontend::new(Default::default()).await;
2720        frontend.run_sql(sql).await.unwrap();
2721
2722        let session = frontend.session_ref();
2723        let catalog_reader = session.env().catalog_reader().read_guard();
2724        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2725
2726        // Check table exists.
2727        let (table, _) = catalog_reader
2728            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2729            .unwrap();
2730        assert_eq!(table.name(), "t");
2731
2732        let columns = table
2733            .columns
2734            .iter()
2735            .map(|col| (col.name(), col.data_type().clone()))
2736            .collect::<HashMap<&str, DataType>>();
2737
2738        let expected_columns = maplit::hashmap! {
2739            ROW_ID_COLUMN_NAME => DataType::Serial,
2740            "v1" => DataType::Int16,
2741            "v2" => StructType::new(
2742                vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2743            )
2744            .with_ids([3, 4, 5].map(ColumnId::new))
2745            .into(),
2746            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2747        };
2748
2749        assert_eq!(columns, expected_columns, "{columns:#?}");
2750    }
2751
2752    #[test]
2753    fn test_bind_primary_key() {
2754        // Note: Column ID 0 is reserved for row ID column.
2755
2756        for (sql, expected) in [
2757            ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
2758            ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
2759            ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
2760            (
2761                "create table t (v1 int primary key, v2 int primary key)",
2762                Err("multiple primary keys are not allowed"),
2763            ),
2764            (
2765                "create table t (v1 int primary key primary key, v2 int)",
2766                Err("multiple primary keys are not allowed"),
2767            ),
2768            (
2769                "create table t (v1 int, v2 int, primary key (v1))",
2770                Ok(&[1]),
2771            ),
2772            (
2773                "create table t (v1 int, primary key (v2), v2 int)",
2774                Ok(&[2]),
2775            ),
2776            (
2777                "create table t (primary key (v2, v1), v1 int, v2 int)",
2778                Ok(&[2, 1]),
2779            ),
2780            (
2781                "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
2782                Err("multiple primary keys are not allowed"),
2783            ),
2784            (
2785                "create table t (v1 int primary key, primary key (v1), v2 int)",
2786                Err("multiple primary keys are not allowed"),
2787            ),
2788            (
2789                "create table t (v1 int, primary key (V3), v2 int)",
2790                Err("column \"v3\" named in key does not exist"),
2791            ),
2792        ] {
2793            let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
2794            let risingwave_sqlparser::ast::Statement::CreateTable {
2795                columns: column_defs,
2796                constraints,
2797                ..
2798            } = ast.remove(0)
2799            else {
2800                panic!("test case should be create table")
2801            };
2802            let actual: Result<_> = (|| {
2803                let mut columns = bind_sql_columns(&column_defs, false)?;
2804                let mut col_id_gen = ColumnIdGenerator::new_initial();
2805                for c in &mut columns {
2806                    col_id_gen.generate(c)?;
2807                }
2808
2809                let pk_names =
2810                    bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
2811                let (_, pk_column_ids, _) =
2812                    bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
2813                Ok(pk_column_ids)
2814            })();
2815            match (expected, actual) {
2816                (Ok(expected), Ok(actual)) => assert_eq!(
2817                    expected.iter().copied().map(ColumnId::new).collect_vec(),
2818                    actual,
2819                    "sql: {sql}"
2820                ),
2821                (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
2822                (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
2823                (Err(expected), Err(actual)) => assert!(
2824                    actual.to_string().contains(expected),
2825                    "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
2826                ),
2827            }
2828        }
2829    }
2830
2831    #[tokio::test]
2832    async fn test_duplicate_props_options() {
2833        let proto_file = create_proto_file(PROTO_FILE_DATA);
2834        let sql = format!(
2835            r#"CREATE TABLE t
2836    WITH (
2837        connector = 'kinesis',
2838        aws.region='user_test_topic',
2839        endpoint='172.10.1.1:9090,172.10.1.2:9090',
2840        aws.credentials.access_key_id = 'your_access_key_1',
2841        aws.credentials.secret_access_key = 'your_secret_key_1'
2842    )
2843    FORMAT PLAIN ENCODE PROTOBUF (
2844        message = '.test.TestRecord',
2845        aws.credentials.access_key_id = 'your_access_key_2',
2846        aws.credentials.secret_access_key = 'your_secret_key_2',
2847        schema.location = 'file://{}',
2848    )"#,
2849            proto_file.path().to_str().unwrap()
2850        );
2851        let frontend = LocalFrontend::new(Default::default()).await;
2852        frontend.run_sql(sql).await.unwrap();
2853
2854        let session = frontend.session_ref();
2855        let catalog_reader = session.env().catalog_reader().read_guard();
2856        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2857
2858        // Check source exists.
2859        let (source, _) = catalog_reader
2860            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2861            .unwrap();
2862        assert_eq!(source.name, "t");
2863
2864        // AwsAuth params exist in options.
2865        assert_eq!(
2866            source
2867                .info
2868                .format_encode_options
2869                .get("aws.credentials.access_key_id")
2870                .unwrap(),
2871            "your_access_key_2"
2872        );
2873        assert_eq!(
2874            source
2875                .info
2876                .format_encode_options
2877                .get("aws.credentials.secret_access_key")
2878                .unwrap(),
2879            "your_secret_key_2"
2880        );
2881
2882        // AwsAuth params exist in props.
2883        assert_eq!(
2884            source
2885                .with_properties
2886                .get("aws.credentials.access_key_id")
2887                .unwrap(),
2888            "your_access_key_1"
2889        );
2890        assert_eq!(
2891            source
2892                .with_properties
2893                .get("aws.credentials.secret_access_key")
2894                .unwrap(),
2895            "your_secret_key_1"
2896        );
2897
2898        // Options are not merged into props.
2899        assert!(!source.with_properties.contains_key("schema.location"));
2900    }
2901}