risingwave_frontend/handler/
create_table.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28    CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29    ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME,
30    TableId,
31};
32use risingwave_common::config::MetaBackend;
33use risingwave_common::global_jvm::Jvm;
34use risingwave_common::session_config::sink_decouple::SinkDecouple;
35use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
36use risingwave_common::util::value_encoding::DatumToProtoExt;
37use risingwave_common::{bail, bail_not_implemented};
38use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
39use risingwave_connector::source::cdc::build_cdc_table_id;
40use risingwave_connector::source::cdc::external::{
41    DATABASE_NAME_KEY, ExternalCdcTableType, ExternalTableConfig, ExternalTableImpl,
42    SCHEMA_NAME_KEY, TABLE_NAME_KEY,
43};
44use risingwave_connector::{
45    AUTO_SCHEMA_CHANGE_KEY, WithOptionsSecResolved, WithPropertiesExt, source,
46};
47use risingwave_pb::catalog::connection::Info as ConnectionInfo;
48use risingwave_pb::catalog::connection_params::ConnectionType;
49use risingwave_pb::catalog::{PbSource, PbWebhookSourceInfo, WatermarkDesc};
50use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
51use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
52use risingwave_pb::plan_common::{
53    AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
54};
55use risingwave_pb::secret::PbSecretRef;
56use risingwave_pb::secret::secret_ref::PbRefAsType;
57use risingwave_pb::stream_plan::StreamFragmentGraph;
58use risingwave_sqlparser::ast::{
59    CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
60    CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
61    FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
62    Statement, TableConstraint, WebhookSourceInfo, WithProperties,
63};
64use risingwave_sqlparser::parser::IncludeOption;
65use thiserror_ext::AsReport;
66
67use super::RwPgResponse;
68use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
69use crate::binder::{Clause, SecureCompareContext, WEBHOOK_PAYLOAD_FIELD_NAME, bind_data_type};
70use crate::catalog::root_catalog::SchemaPath;
71use crate::catalog::source_catalog::SourceCatalog;
72use crate::catalog::table_catalog::TableVersion;
73use crate::catalog::{ColumnId, DatabaseId, SchemaId, SourceId, check_column_name_not_reserved};
74use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
75use crate::expr::{Expr, ExprImpl, ExprRewriter};
76use crate::handler::HandlerArgs;
77use crate::handler::create_source::{
78    UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
79    bind_source_watermark, handle_addition_columns,
80};
81use crate::handler::util::{
82    LongRunningNotificationAction, SourceSchemaCompatExt, execute_with_long_running_notification,
83};
84use crate::optimizer::plan_node::generic::{SourceNodeKind, build_cdc_scan_options_with_options};
85use crate::optimizer::plan_node::{
86    LogicalCdcScan, LogicalPlanRef, LogicalSource, StreamPlanRef as PlanRef,
87};
88use crate::optimizer::property::{Order, RequiredDist};
89use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRoot};
90use crate::session::SessionImpl;
91use crate::session::current::notice_to_user;
92use crate::stream_fragmenter::{GraphJobType, build_graph};
93use crate::utils::OverwriteOptions;
94use crate::{Binder, Explain, TableCatalog, WithOptions};
95
96mod col_id_gen;
97pub use col_id_gen::*;
98use risingwave_connector::sink::SinkParam;
99use risingwave_connector::sink::iceberg::{
100    COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB, COMPACTION_DELETE_FILES_COUNT_THRESHOLD,
101    COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM, COMPACTION_SMALL_FILES_THRESHOLD_MB,
102    COMPACTION_TARGET_FILE_SIZE_MB, COMPACTION_TRIGGER_SNAPSHOT_COUNT, COMPACTION_TYPE,
103    COMPACTION_WRITE_PARQUET_COMPRESSION, COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES,
104    COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS, CompactionType, ENABLE_COMPACTION,
105    ENABLE_SNAPSHOT_EXPIRATION, FORMAT_VERSION,
106    ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB, ICEBERG_WRITE_MODE_COPY_ON_WRITE,
107    ICEBERG_WRITE_MODE_MERGE_ON_READ, IcebergSink, IcebergWriteMode, ORDER_KEY,
108    SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA,
109    SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS, SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE,
110    parse_partition_by_exprs, validate_order_key_columns,
111};
112use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
113
114use crate::handler::create_sink::{SinkPlanContext, gen_sink_plan};
115
116fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
117    for option_def in &c.options {
118        match option_def.option {
119            ColumnOption::GeneratedColumns(_) => {}
120            ColumnOption::DefaultValue(_) => {}
121            ColumnOption::DefaultValueInternal { .. } => {}
122            ColumnOption::Unique { is_primary: true } => {}
123            ColumnOption::Null => {}
124            ColumnOption::NotNull => {}
125            _ => bail_not_implemented!("column constraints \"{}\"", option_def),
126        }
127    }
128    Ok(())
129}
130
131/// Binds the column schemas declared in CREATE statement into `ColumnDesc`.
132/// If a column is marked as `primary key`, its `ColumnId` is also returned.
133/// This primary key is not combined with table constraints yet.
134pub fn bind_sql_columns(
135    column_defs: &[ColumnDef],
136    is_for_drop_table_connector: bool,
137) -> Result<Vec<ColumnCatalog>> {
138    let mut columns = Vec::with_capacity(column_defs.len());
139
140    for column in column_defs {
141        ensure_column_options_supported(column)?;
142        // Destruct to make sure all fields are properly handled rather than ignored.
143        // Do NOT use `..` to ignore fields you do not want to deal with.
144        // Reject them with a clear NotImplemented error.
145        let ColumnDef {
146            name,
147            data_type,
148            collation,
149            options,
150            ..
151        } = column;
152
153        let data_type = data_type
154            .clone()
155            .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
156        if let Some(collation) = collation {
157            // PostgreSQL will limit the datatypes that collate can work on.
158            // https://www.postgresql.org/docs/16/collation.html#COLLATION-CONCEPTS
159            //   > The built-in collatable data types are `text`, `varchar`, and `char`.
160            //
161            // But we don't support real collation, we simply ignore it here.
162            if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
163                bail_not_implemented!(
164                    "Collate collation other than `C` or `POSIX` is not implemented"
165                );
166            }
167
168            match data_type {
169                AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
170                _ => {
171                    return Err(ErrorCode::NotSupported(
172                        format!("{} is not a collatable data type", data_type),
173                        "The only built-in collatable data types are `varchar`, please check your type".into(),
174                    ).into());
175                }
176            }
177        }
178
179        if !is_for_drop_table_connector {
180            // additional column name may have prefix _rw
181            // When converting dropping the connector from table, the additional columns are converted to normal columns and keep the original name.
182            // Under this case, we loosen the check for _rw prefix.
183            check_column_name_not_reserved(&name.real_value())?;
184        }
185
186        let nullable: bool = !options
187            .iter()
188            .any(|def| matches!(def.option, ColumnOption::NotNull));
189
190        columns.push(ColumnCatalog {
191            column_desc: ColumnDesc {
192                data_type: bind_data_type(&data_type)?,
193                column_id: ColumnId::placeholder(),
194                name: name.real_value(),
195                generated_or_default_column: None,
196                description: None,
197                additional_column: AdditionalColumn { column_type: None },
198                version: ColumnDescVersion::LATEST,
199                system_column: None,
200                nullable,
201            },
202            is_hidden: false,
203        });
204    }
205
206    Ok(columns)
207}
208
209fn check_generated_column_constraints(
210    column_name: &String,
211    column_id: ColumnId,
212    expr: &ExprImpl,
213    column_catalogs: &[ColumnCatalog],
214    generated_column_names: &[String],
215    pk_column_ids: &[ColumnId],
216) -> Result<()> {
217    let input_refs = expr.collect_input_refs(column_catalogs.len());
218    for idx in input_refs.ones() {
219        let referred_generated_column = &column_catalogs[idx].column_desc.name;
220        if generated_column_names
221            .iter()
222            .any(|c| c == referred_generated_column)
223        {
224            return Err(ErrorCode::BindError(format!(
225                "Generated can not reference another generated column. \
226                But here generated column \"{}\" referenced another generated column \"{}\"",
227                column_name, referred_generated_column
228            ))
229            .into());
230        }
231    }
232
233    if pk_column_ids.contains(&column_id) && expr.is_impure() {
234        return Err(ErrorCode::BindError(format!(
235            "Generated columns with impure expressions should not be part of the primary key. \
236            Here column \"{}\" is defined as part of the primary key.",
237            column_name
238        ))
239        .into());
240    }
241
242    Ok(())
243}
244
245/// Binds constraints that can be only specified in column definitions,
246/// currently generated columns and default columns.
247pub fn bind_sql_column_constraints(
248    session: &SessionImpl,
249    table_name: String,
250    column_catalogs: &mut [ColumnCatalog],
251    columns: &[ColumnDef],
252    pk_column_ids: &[ColumnId],
253) -> Result<()> {
254    let generated_column_names = {
255        let mut names = vec![];
256        for column in columns {
257            for option_def in &column.options {
258                if let ColumnOption::GeneratedColumns(_) = option_def.option {
259                    names.push(column.name.real_value());
260                    break;
261                }
262            }
263        }
264        names
265    };
266
267    let mut binder = Binder::new_for_ddl(session);
268    binder.bind_columns_to_context(table_name, column_catalogs)?;
269
270    for column in columns {
271        let Some(idx) = column_catalogs
272            .iter()
273            .position(|c| c.name() == column.name.real_value())
274        else {
275            // It's possible that we don't follow the user defined columns in SQL but take the
276            // ones resolved from the source, thus missing some columns. Simply ignore them.
277            continue;
278        };
279
280        for option_def in &column.options {
281            match &option_def.option {
282                ColumnOption::GeneratedColumns(expr) => {
283                    binder.set_clause(Some(Clause::GeneratedColumn));
284
285                    let expr_impl = binder.bind_expr(expr).with_context(|| {
286                        format!(
287                            "fail to bind expression in generated column \"{}\"",
288                            column.name.real_value()
289                        )
290                    })?;
291
292                    check_generated_column_constraints(
293                        &column.name.real_value(),
294                        column_catalogs[idx].column_id(),
295                        &expr_impl,
296                        column_catalogs,
297                        &generated_column_names,
298                        pk_column_ids,
299                    )?;
300
301                    column_catalogs[idx].column_desc.generated_or_default_column = Some(
302                        GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
303                            expr: Some(expr_impl.to_expr_proto()),
304                        }),
305                    );
306                    binder.set_clause(None);
307                }
308                ColumnOption::DefaultValue(expr) => {
309                    let expr_impl = binder
310                        .bind_expr(expr)?
311                        .cast_assign(column_catalogs[idx].data_type())?;
312
313                    // Rewrite expressions to evaluate a snapshot value, used for missing values in the case of
314                    // schema change.
315                    //
316                    // TODO: Currently we don't support impure expressions other than `now()` (like `random()`),
317                    // so the rewritten expression should almost always be pure and we directly call `fold_const`
318                    // here. Actually we do not require purity of the expression here since we're only to get a
319                    // snapshot value.
320                    let rewritten_expr_impl = session
321                        .pinned_snapshot()
322                        .inline_now_proc_time()
323                        .rewrite_expr(expr_impl.clone());
324
325                    if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
326                        let snapshot_value = snapshot_value?;
327
328                        column_catalogs[idx].column_desc.generated_or_default_column =
329                            Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
330                                snapshot_value: Some(snapshot_value.to_protobuf()),
331                                expr: Some(expr_impl.to_expr_proto()),
332                                // persist the original expression
333                            }));
334                    } else {
335                        return Err(ErrorCode::BindError(format!(
336                            "Default expression used in column `{}` cannot be evaluated. \
337                            Use generated columns instead if you mean to reference other columns.",
338                            column.name
339                        ))
340                        .into());
341                    }
342                }
343                ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
344                    // When a `DEFAULT INTERNAL` is used internally for schema change, the persisted value
345                    // should already be set during purifcation. So if we encounter an empty value here, it
346                    // means the user has specified it explicitly in the SQL statement, typically by
347                    // directly copying the result of `SHOW CREATE TABLE` and executing it.
348                    if persisted.is_empty() {
349                        bail_bind_error!(
350                            "DEFAULT INTERNAL is only used for internal purposes, \
351                             please specify a concrete default value"
352                        );
353                    }
354
355                    let desc = DefaultColumnDesc::decode(&**persisted)
356                        .expect("failed to decode persisted `DefaultColumnDesc`");
357
358                    column_catalogs[idx].column_desc.generated_or_default_column =
359                        Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
360                }
361                _ => {}
362            }
363        }
364    }
365    Ok(())
366}
367
368/// Currently we only support Primary key table constraint, so just return pk names if it exists
369pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
370    let mut pk_column_names = vec![];
371
372    for constraint in table_constraints {
373        match constraint {
374            TableConstraint::Unique {
375                name: _,
376                columns,
377                is_primary: true,
378            } => {
379                if !pk_column_names.is_empty() {
380                    return Err(multiple_pk_definition_err());
381                }
382                pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
383            }
384            _ => bail_not_implemented!("table constraint \"{}\"", constraint),
385        }
386    }
387    Ok(pk_column_names)
388}
389
390pub fn bind_sql_pk_names(
391    columns_defs: &[ColumnDef],
392    pk_names_from_table_constraints: Vec<String>,
393) -> Result<Vec<String>> {
394    let mut pk_column_names = pk_names_from_table_constraints;
395
396    for column in columns_defs {
397        for option_def in &column.options {
398            if let ColumnOption::Unique { is_primary: true } = option_def.option {
399                if !pk_column_names.is_empty() {
400                    return Err(multiple_pk_definition_err());
401                }
402                pk_column_names.push(column.name.real_value());
403            };
404        }
405    }
406
407    Ok(pk_column_names)
408}
409
410fn multiple_pk_definition_err() -> RwError {
411    ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
412}
413
414/// Binds primary keys defined in SQL.
415///
416/// It returns the columns together with `pk_column_ids`, and an optional row id column index if
417/// added.
418pub fn bind_pk_and_row_id_on_relation(
419    mut columns: Vec<ColumnCatalog>,
420    pk_names: Vec<String>,
421    must_need_pk: bool,
422) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
423    for c in &columns {
424        assert!(c.column_id() != ColumnId::placeholder());
425    }
426
427    // Mapping from column name to column id.
428    let name_to_id = columns
429        .iter()
430        .map(|c| (c.name(), c.column_id()))
431        .collect::<HashMap<_, _>>();
432
433    let mut pk_column_ids: Vec<_> = pk_names
434        .iter()
435        .map(|name| {
436            name_to_id.get(name.as_str()).copied().ok_or_else(|| {
437                ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
438            })
439        })
440        .try_collect()?;
441
442    // Add `_row_id` column if `pk_column_ids` is empty and must_need_pk
443    let need_row_id = pk_column_ids.is_empty() && must_need_pk;
444
445    let row_id_index = need_row_id.then(|| {
446        let column = ColumnCatalog::row_id_column();
447        let index = columns.len();
448        pk_column_ids = vec![column.column_id()];
449        columns.push(column);
450        index
451    });
452
453    if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
454        Err(ErrorCode::InvalidInputSyntax(format!(
455            "column \"{col}\" specified more than once"
456        )))?;
457    }
458
459    Ok((columns, pk_column_ids, row_id_index))
460}
461
462/// `gen_create_table_plan_with_source` generates the plan for creating a table with an external
463/// stream source.
464#[allow(clippy::too_many_arguments)]
465pub(crate) async fn gen_create_table_plan_with_source(
466    mut handler_args: HandlerArgs,
467    explain_options: ExplainOptions,
468    table_name: ObjectName,
469    column_defs: Vec<ColumnDef>,
470    wildcard_idx: Option<usize>,
471    constraints: Vec<TableConstraint>,
472    format_encode: FormatEncodeOptions,
473    source_watermarks: Vec<SourceWatermark>,
474    mut col_id_gen: ColumnIdGenerator,
475    include_column_options: IncludeOption,
476    props: CreateTableProps,
477    sql_column_strategy: SqlColumnStrategy,
478) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog)> {
479    if props.append_only
480        && format_encode.format != Format::Plain
481        && format_encode.format != Format::Native
482    {
483        return Err(ErrorCode::BindError(format!(
484            "Append only table does not support format {}.",
485            format_encode.format
486        ))
487        .into());
488    }
489
490    let session = &handler_args.session;
491    let (with_properties, refresh_mode) =
492        bind_connector_props(&handler_args, &format_encode, false)?;
493    if with_properties.is_shareable_cdc_connector() {
494        generated_columns_check_for_cdc_table(&column_defs)?;
495        not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
496    } else if column_defs.iter().any(|col| {
497        col.options
498            .iter()
499            .any(|def| matches!(def.option, ColumnOption::NotNull))
500    }) {
501        // if non-cdc source
502        notice_to_user(
503            "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
504        );
505    }
506
507    let db_name: &str = &session.database();
508    let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
509
510    // TODO: omit this step if `sql_column_strategy` is `Follow`.
511    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
512        session,
513        &format_encode,
514        Either::Left(&with_properties),
515        CreateSourceType::Table,
516    )
517    .await?;
518
519    let overwrite_options = OverwriteOptions::new(&mut handler_args);
520    let rate_limit = overwrite_options.source_rate_limit;
521    let source = bind_create_source_or_table_with_connector(
522        handler_args.clone(),
523        table_name,
524        format_encode,
525        with_properties,
526        &column_defs,
527        constraints,
528        wildcard_idx,
529        source_watermarks,
530        columns_from_resolve_source,
531        source_info,
532        include_column_options,
533        &mut col_id_gen,
534        CreateSourceType::Table,
535        rate_limit,
536        sql_column_strategy,
537        refresh_mode,
538    )
539    .await?;
540
541    let context = OptimizerContext::new(handler_args, explain_options);
542
543    let (plan, table) = gen_table_plan_with_source(
544        context.into(),
545        schema_name,
546        source.clone(),
547        col_id_gen.into_version(),
548        props,
549    )?;
550
551    Ok((plan, Some(source), table))
552}
553
554/// `gen_create_table_plan` generates the plan for creating a table without an external stream
555/// source.
556#[allow(clippy::too_many_arguments)]
557pub(crate) fn gen_create_table_plan(
558    context: OptimizerContext,
559    table_name: ObjectName,
560    column_defs: Vec<ColumnDef>,
561    constraints: Vec<TableConstraint>,
562    mut col_id_gen: ColumnIdGenerator,
563    source_watermarks: Vec<SourceWatermark>,
564    props: CreateTableProps,
565    is_for_replace_plan: bool,
566) -> Result<(PlanRef, TableCatalog)> {
567    let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
568    for c in &mut columns {
569        col_id_gen.generate(c)?;
570    }
571
572    let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
573    if !secret_refs.is_empty() || !connection_refs.is_empty() {
574        return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
575    }
576
577    gen_create_table_plan_without_source(
578        context,
579        table_name,
580        columns,
581        column_defs,
582        constraints,
583        source_watermarks,
584        col_id_gen.into_version(),
585        props,
586    )
587}
588
589#[allow(clippy::too_many_arguments)]
590pub(crate) fn gen_create_table_plan_without_source(
591    context: OptimizerContext,
592    table_name: ObjectName,
593    columns: Vec<ColumnCatalog>,
594    column_defs: Vec<ColumnDef>,
595    constraints: Vec<TableConstraint>,
596    source_watermarks: Vec<SourceWatermark>,
597    version: TableVersion,
598    props: CreateTableProps,
599) -> Result<(PlanRef, TableCatalog)> {
600    // XXX: Why not bind outside?
601    let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
602    let (mut columns, pk_column_ids, row_id_index) =
603        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
604
605    let watermark_descs = bind_source_watermark(
606        context.session_ctx(),
607        table_name.real_value(),
608        source_watermarks,
609        &columns,
610    )?;
611
612    bind_sql_column_constraints(
613        context.session_ctx(),
614        table_name.real_value(),
615        &mut columns,
616        &column_defs,
617        &pk_column_ids,
618    )?;
619    let session = context.session_ctx().clone();
620
621    let db_name = &session.database();
622    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
623
624    let info = CreateTableInfo {
625        columns,
626        pk_column_ids,
627        row_id_index,
628        watermark_descs,
629        source_catalog: None,
630        version,
631    };
632
633    gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
634}
635
636fn gen_table_plan_with_source(
637    context: OptimizerContextRef,
638    schema_name: Option<String>,
639    source_catalog: SourceCatalog,
640    version: TableVersion,
641    props: CreateTableProps,
642) -> Result<(PlanRef, TableCatalog)> {
643    let table_name = source_catalog.name.clone();
644
645    let info = CreateTableInfo {
646        columns: source_catalog.columns.clone(),
647        pk_column_ids: source_catalog.pk_col_ids.clone(),
648        row_id_index: source_catalog.row_id_index,
649        watermark_descs: source_catalog.watermark_descs.clone(),
650        source_catalog: Some(source_catalog),
651        version,
652    };
653
654    gen_table_plan_inner(context, schema_name, table_name, info, props)
655}
656
657/// On-conflict behavior either from user input or existing table catalog.
658#[derive(Clone, Copy)]
659pub enum EitherOnConflict {
660    Ast(Option<OnConflict>),
661    Resolved(ConflictBehavior),
662}
663
664impl From<Option<OnConflict>> for EitherOnConflict {
665    fn from(v: Option<OnConflict>) -> Self {
666        Self::Ast(v)
667    }
668}
669
670impl From<ConflictBehavior> for EitherOnConflict {
671    fn from(v: ConflictBehavior) -> Self {
672        Self::Resolved(v)
673    }
674}
675
676impl EitherOnConflict {
677    /// Resolves the conflict behavior based on the given information.
678    pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
679        let conflict_behavior = match self {
680            EitherOnConflict::Ast(on_conflict) => {
681                if append_only {
682                    if row_id_as_pk {
683                        // Primary key will be generated, no conflict check needed.
684                        ConflictBehavior::NoCheck
685                    } else {
686                        // User defined PK on append-only table, enforce `DO NOTHING`.
687                        if let Some(on_conflict) = on_conflict
688                            && on_conflict != OnConflict::Nothing
689                        {
690                            return Err(ErrorCode::InvalidInputSyntax(
691                                "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
692                                     the ON CONFLICT behavior must be DO NOTHING."
693                                    .to_owned(),
694                            )
695                            .into());
696                        }
697                        ConflictBehavior::IgnoreConflict
698                    }
699                } else {
700                    // Default to `UPDATE FULL` for non-append-only tables.
701                    match on_conflict.unwrap_or(OnConflict::UpdateFull) {
702                        OnConflict::UpdateFull => ConflictBehavior::Overwrite,
703                        OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
704                        OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
705                    }
706                }
707            }
708            EitherOnConflict::Resolved(b) => b,
709        };
710
711        Ok(conflict_behavior)
712    }
713}
714
715/// Arguments of the functions that generate a table plan, part 1.
716///
717/// Compared to [`CreateTableProps`], this struct contains fields that need some work of binding
718/// or resolving based on the user input.
719pub struct CreateTableInfo {
720    pub columns: Vec<ColumnCatalog>,
721    pub pk_column_ids: Vec<ColumnId>,
722    pub row_id_index: Option<usize>,
723    pub watermark_descs: Vec<WatermarkDesc>,
724    pub source_catalog: Option<SourceCatalog>,
725    pub version: TableVersion,
726}
727
728/// Arguments of the functions that generate a table plan, part 2.
729///
730/// Compared to [`CreateTableInfo`], this struct contains fields that can be (relatively) simply
731/// obtained from the input or the context.
732pub struct CreateTableProps {
733    pub definition: String,
734    pub append_only: bool,
735    pub on_conflict: EitherOnConflict,
736    pub with_version_columns: Vec<String>,
737    pub webhook_info: Option<PbWebhookSourceInfo>,
738    pub engine: Engine,
739}
740
741#[allow(clippy::too_many_arguments)]
742fn gen_table_plan_inner(
743    context: OptimizerContextRef,
744    schema_name: Option<String>,
745    table_name: String,
746    info: CreateTableInfo,
747    props: CreateTableProps,
748) -> Result<(PlanRef, TableCatalog)> {
749    let CreateTableInfo {
750        ref columns,
751        row_id_index,
752        ref watermark_descs,
753        ref source_catalog,
754        ..
755    } = info;
756    let CreateTableProps { append_only, .. } = props;
757
758    let (database_id, schema_id) = context
759        .session_ctx()
760        .get_database_and_schema_id_for_create(schema_name)?;
761
762    let session = context.session_ctx().clone();
763    let retention_seconds = context.with_options().retention_seconds();
764
765    let source_node: LogicalPlanRef = LogicalSource::new(
766        source_catalog.clone().map(Rc::new),
767        columns.clone(),
768        row_id_index,
769        SourceNodeKind::CreateTable,
770        context.clone(),
771        None,
772    )?
773    .into();
774
775    let required_cols = FixedBitSet::with_capacity(columns.len());
776    let plan_root = PlanRoot::new_with_logical_plan(
777        source_node,
778        RequiredDist::Any,
779        Order::any(),
780        required_cols,
781        vec![],
782    );
783
784    let has_non_ttl_watermark = watermark_descs.iter().any(|d| !d.with_ttl);
785
786    if !append_only && has_non_ttl_watermark {
787        return Err(ErrorCode::NotSupported(
788            "Defining watermarks on table requires the table to be append only.".to_owned(),
789            "Use the key words `APPEND ONLY`".to_owned(),
790        )
791        .into());
792    }
793
794    if !append_only && retention_seconds.is_some() {
795        if session
796            .config()
797            .unsafe_enable_storage_retention_for_non_append_only_tables()
798        {
799            tracing::warn!(
800                "Storage retention is enabled for non-append-only table {}. This may lead to stream inconsistency.",
801                table_name
802            );
803            const NOTICE: &str = "Storage retention is enabled for non-append-only table. \
804                                  This may lead to stream inconsistency and unrecoverable \
805                                  node failure if there is any row INSERT/UPDATE/DELETE operation \
806                                  corresponding to the TTLed primary keys";
807            session.notice_to_user(NOTICE);
808        } else {
809            return Err(ErrorCode::NotSupported(
810                "Defining retention seconds on table requires the table to be append only."
811                    .to_owned(),
812                "Use the key words `APPEND ONLY`".to_owned(),
813            )
814            .into());
815        }
816    }
817
818    let materialize =
819        plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
820
821    let mut table = materialize.table().clone();
822    table.owner = session.user_id();
823
824    Ok((materialize.into(), table))
825}
826
827/// Generate stream plan for cdc table based on shared source.
828/// In replace workflow, the `table_id` is the id of the table to be replaced
829/// in create table workflow, the `table_id` is a placeholder will be filled in the Meta
830#[allow(clippy::too_many_arguments)]
831pub(crate) fn gen_create_table_plan_for_cdc_table(
832    context: OptimizerContextRef,
833    source: Arc<SourceCatalog>,
834    external_table_name: String,
835    column_defs: Vec<ColumnDef>,
836    source_watermarks: Vec<SourceWatermark>,
837    mut columns: Vec<ColumnCatalog>,
838    pk_names: Vec<String>,
839    cdc_with_options: WithOptionsSecResolved,
840    mut col_id_gen: ColumnIdGenerator,
841    on_conflict: Option<OnConflict>,
842    with_version_columns: Vec<String>,
843    include_column_options: IncludeOption,
844    table_name: ObjectName,
845    resolved_table_name: String, // table name without schema prefix
846    database_id: DatabaseId,
847    schema_id: SchemaId,
848    table_id: TableId,
849    engine: Engine,
850) -> Result<(PlanRef, TableCatalog)> {
851    let session = context.session_ctx().clone();
852
853    // append additional columns to the end
854    handle_addition_columns(
855        None,
856        &cdc_with_options,
857        include_column_options,
858        &mut columns,
859        true,
860    )?;
861
862    for c in &mut columns {
863        col_id_gen.generate(c)?;
864    }
865
866    let (mut columns, pk_column_ids, _row_id_index) =
867        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
868
869    let watermark_descs = bind_source_watermark(
870        context.session_ctx(),
871        table_name.real_value(),
872        source_watermarks,
873        &columns,
874    )?;
875
876    // NOTES: In auto schema change, default value is not provided in column definition.
877    bind_sql_column_constraints(
878        context.session_ctx(),
879        table_name.real_value(),
880        &mut columns,
881        &column_defs,
882        &pk_column_ids,
883    )?;
884
885    let definition = context.normalized_sql().to_owned();
886
887    let pk_column_indices = {
888        let mut id_to_idx = HashMap::new();
889        columns.iter().enumerate().for_each(|(idx, c)| {
890            id_to_idx.insert(c.column_id(), idx);
891        });
892        // pk column id must exist in table columns.
893        pk_column_ids
894            .iter()
895            .map(|c| id_to_idx.get(c).copied().unwrap())
896            .collect_vec()
897    };
898    let table_pk = pk_column_indices
899        .iter()
900        .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
901        .collect();
902
903    let (options, secret_refs) = cdc_with_options.into_parts();
904
905    let non_generated_column_descs = columns
906        .iter()
907        .filter(|&c| !c.is_generated())
908        .map(|c| c.column_desc.clone())
909        .collect_vec();
910    let non_generated_column_num = non_generated_column_descs.len();
911    let cdc_table_type = ExternalCdcTableType::from_properties(&options);
912    let cdc_table_desc = CdcTableDesc {
913        table_id,
914        source_id: source.id, // id of cdc source streaming job
915        external_table_name: external_table_name.clone(),
916        pk: table_pk,
917        columns: non_generated_column_descs,
918        stream_key: pk_column_indices,
919        connect_properties: options,
920        secret_refs,
921    };
922
923    tracing::debug!(?cdc_table_desc, "create cdc table");
924    let options = build_cdc_scan_options_with_options(context.with_options(), &cdc_table_type)?;
925
926    let logical_scan = LogicalCdcScan::create(
927        external_table_name.clone(),
928        Rc::new(cdc_table_desc),
929        context.clone(),
930        options,
931    );
932
933    let scan_node: LogicalPlanRef = logical_scan.into();
934    let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
935    let plan_root = PlanRoot::new_with_logical_plan(
936        scan_node,
937        RequiredDist::Any,
938        Order::any(),
939        required_cols,
940        vec![],
941    );
942
943    let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
944    let materialize = plan_root.gen_table_plan(
945        context,
946        resolved_table_name,
947        database_id,
948        schema_id,
949        CreateTableInfo {
950            columns,
951            pk_column_ids,
952            row_id_index: None,
953            watermark_descs,
954            source_catalog: Some((*source).clone()),
955            version: col_id_gen.into_version(),
956        },
957        CreateTableProps {
958            definition,
959            append_only: false,
960            on_conflict: on_conflict.into(),
961            with_version_columns,
962            webhook_info: None,
963            engine,
964        },
965    )?;
966
967    let mut table = materialize.table().clone();
968    table.owner = session.user_id();
969    table.cdc_table_id = Some(cdc_table_id);
970    table.cdc_table_type = Some(cdc_table_type);
971    Ok((materialize.into(), table))
972}
973
974/// Derive connector properties and normalize `external_table_name` for CDC tables.
975///
976/// Returns (`connector_properties`, `normalized_external_table_name`) where:
977/// - For SQL Server: Normalizes 'db.schema.table' (3 parts) to 'schema.table' (2 parts),
978///   because users can optionally include database name for verification, but it needs to be
979///   stripped to match the format returned by Debezium's `extract_table_name()`.
980/// - For MySQL/Postgres: Returns the original `external_table_name` unchanged.
981fn derive_with_options_for_cdc_table(
982    source_with_properties: &WithOptionsSecResolved,
983    external_table_name: String,
984) -> Result<(WithOptionsSecResolved, String)> {
985    use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
986    // we should remove the prefix from `full_table_name`
987    let source_database_name: &str = source_with_properties
988        .get("database.name")
989        .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
990        .as_str();
991    let mut with_options = source_with_properties.clone();
992    if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
993        match connector.as_str() {
994            MYSQL_CDC_CONNECTOR => {
995                // MySQL doesn't allow '.' in database name and table name, so we can split the
996                // external table name by '.' to get the table name
997                let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
998                    anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
999                })?;
1000                // We allow multiple database names in the source definition
1001                if !source_database_name
1002                    .split(',')
1003                    .map(|s| s.trim())
1004                    .any(|name| name == db_name)
1005                {
1006                    return Err(anyhow!(
1007                        "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
1008                        db_name,
1009                        source_database_name
1010                    ).into());
1011                }
1012                with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
1013                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1014                // Return original external_table_name unchanged for MySQL
1015                return Ok((with_options, external_table_name));
1016            }
1017            POSTGRES_CDC_CONNECTOR => {
1018                let (schema_name, table_name) = external_table_name
1019                    .split_once('.')
1020                    .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
1021
1022                // insert 'schema.name' into connect properties
1023                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1024                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1025                // Return original external_table_name unchanged for Postgres
1026                return Ok((with_options, external_table_name));
1027            }
1028            SQL_SERVER_CDC_CONNECTOR => {
1029                // SQL Server external table name must be in one of two formats:
1030                // 1. 'schemaName.tableName' (2 parts) - database is already specified in source
1031                // 2. 'databaseName.schemaName.tableName' (3 parts) - for explicit verification
1032                //
1033                // We do NOT allow single table name (e.g., 't') because:
1034                // - Unlike database name (already in source), schema name is NOT pre-specified
1035                // - User must explicitly provide schema (even if it's 'dbo')
1036                let parts: Vec<&str> = external_table_name.split('.').collect();
1037                let (schema_name, table_name) = match parts.len() {
1038                    3 => {
1039                        // Format: database.schema.table
1040                        // Verify that the database name matches the one in source definition
1041                        let db_name = parts[0];
1042                        let schema_name = parts[1];
1043                        let table_name = parts[2];
1044
1045                        if db_name != source_database_name {
1046                            return Err(anyhow!(
1047                                "The database name '{}' in FROM clause does not match the database name '{}' specified in source definition. \
1048                                 You can either use 'schema.table' format (recommended) or ensure the database name matches.",
1049                                db_name,
1050                                source_database_name
1051                            ).into());
1052                        }
1053                        (schema_name, table_name)
1054                    }
1055                    2 => {
1056                        // Format: schema.table (recommended)
1057                        // Database name is taken from source definition
1058                        let schema_name = parts[0];
1059                        let table_name = parts[1];
1060                        (schema_name, table_name)
1061                    }
1062                    1 => {
1063                        // Format: table only
1064                        // Reject with clear error message
1065                        return Err(anyhow!(
1066                            "Invalid table name format '{}'. For SQL Server CDC, you must specify the schema name. \
1067                             Use 'schema.table' format (e.g., 'dbo.{}') or 'database.schema.table' format (e.g., '{}.dbo.{}').",
1068                            external_table_name,
1069                            external_table_name,
1070                            source_database_name,
1071                            external_table_name
1072                        ).into());
1073                    }
1074                    _ => {
1075                        // Invalid format (4+ parts or empty)
1076                        return Err(anyhow!(
1077                            "Invalid table name format '{}'. Expected 'schema.table' or 'database.schema.table'.",
1078                            external_table_name
1079                        ).into());
1080                    }
1081                };
1082
1083                // Insert schema and table names into connector properties
1084                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1085                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1086
1087                // Normalize external_table_name to 'schema.table' format
1088                // This ensures consistency with extract_table_name() in message.rs
1089                let normalized_external_table_name = format!("{}.{}", schema_name, table_name);
1090                return Ok((with_options, normalized_external_table_name));
1091            }
1092            _ => {
1093                return Err(RwError::from(anyhow!(
1094                    "connector {} is not supported for cdc table",
1095                    connector
1096                )));
1097            }
1098        };
1099    }
1100    unreachable!("All valid CDC connectors should have returned by now")
1101}
1102
1103#[allow(clippy::too_many_arguments)]
1104pub(super) async fn handle_create_table_plan(
1105    handler_args: HandlerArgs,
1106    explain_options: ExplainOptions,
1107    format_encode: Option<FormatEncodeOptions>,
1108    cdc_table_info: Option<CdcTableInfo>,
1109    table_name: &ObjectName,
1110    column_defs: Vec<ColumnDef>,
1111    wildcard_idx: Option<usize>,
1112    constraints: Vec<TableConstraint>,
1113    source_watermarks: Vec<SourceWatermark>,
1114    append_only: bool,
1115    on_conflict: Option<OnConflict>,
1116    with_version_columns: Vec<String>,
1117    include_column_options: IncludeOption,
1118    webhook_info: Option<WebhookSourceInfo>,
1119    engine: Engine,
1120) -> Result<(
1121    PlanRef,
1122    Option<SourceCatalog>,
1123    TableCatalog,
1124    TableJobType,
1125    Option<SourceId>,
1126)> {
1127    let col_id_gen = ColumnIdGenerator::new_initial();
1128    let format_encode = check_create_table_with_source(
1129        &handler_args.with_options,
1130        format_encode,
1131        &include_column_options,
1132        &cdc_table_info,
1133    )?;
1134    let webhook_info = webhook_info
1135        .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1136        .transpose()?;
1137
1138    let props = CreateTableProps {
1139        definition: handler_args.normalized_sql.clone(),
1140        append_only,
1141        on_conflict: on_conflict.into(),
1142        with_version_columns: with_version_columns.clone(),
1143        webhook_info,
1144        engine,
1145    };
1146
1147    let ((plan, source, table), job_type, shared_shource_id) = match (
1148        format_encode,
1149        cdc_table_info.as_ref(),
1150    ) {
1151        (Some(format_encode), None) => (
1152            gen_create_table_plan_with_source(
1153                handler_args,
1154                explain_options,
1155                table_name.clone(),
1156                column_defs,
1157                wildcard_idx,
1158                constraints,
1159                format_encode,
1160                source_watermarks,
1161                col_id_gen,
1162                include_column_options,
1163                props,
1164                SqlColumnStrategy::FollowChecked,
1165            )
1166            .await?,
1167            TableJobType::General,
1168            None,
1169        ),
1170        (None, None) => {
1171            let context = OptimizerContext::new(handler_args, explain_options);
1172            let (plan, table) = gen_create_table_plan(
1173                context,
1174                table_name.clone(),
1175                column_defs,
1176                constraints,
1177                col_id_gen,
1178                source_watermarks,
1179                props,
1180                false,
1181            )?;
1182
1183            ((plan, None, table), TableJobType::General, None)
1184        }
1185
1186        (None, Some(cdc_table)) => {
1187            sanity_check_for_table_on_cdc_source(
1188                append_only,
1189                &column_defs,
1190                &wildcard_idx,
1191                &constraints,
1192                &source_watermarks,
1193            )?;
1194
1195            generated_columns_check_for_cdc_table(&column_defs)?;
1196            not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1197
1198            let session = &handler_args.session;
1199            let db_name = &session.database();
1200            let user_name = &session.user_name();
1201            let search_path = session.config().search_path();
1202            let (schema_name, resolved_table_name) =
1203                Binder::resolve_schema_qualified_name(db_name, table_name)?;
1204            let (database_id, schema_id) =
1205                session.get_database_and_schema_id_for_create(schema_name.clone())?;
1206
1207            // cdc table cannot be append-only
1208            let (source_schema, source_name) =
1209                Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
1210
1211            let source = {
1212                let catalog_reader = session.env().catalog_reader().read_guard();
1213                let schema_path =
1214                    SchemaPath::new(source_schema.as_deref(), &search_path, user_name);
1215
1216                let (source, _) = catalog_reader.get_source_by_name(
1217                    db_name,
1218                    schema_path,
1219                    source_name.as_str(),
1220                )?;
1221                source.clone()
1222            };
1223            let (cdc_with_options, normalized_external_table_name) =
1224                derive_with_options_for_cdc_table(
1225                    &source.with_properties,
1226                    cdc_table.external_table_name.clone(),
1227                )?;
1228
1229            let (columns, pk_names) = match wildcard_idx {
1230                Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1231                None => {
1232                    for column_def in &column_defs {
1233                        for option_def in &column_def.options {
1234                            if let ColumnOption::DefaultValue(_)
1235                            | ColumnOption::DefaultValueInternal { .. } = option_def.option
1236                            {
1237                                return Err(ErrorCode::NotSupported(
1238                                            "Default value for columns defined on the table created from a CDC source".into(),
1239                                            "Remove the default value expression in the column definitions".into(),
1240                                        )
1241                                            .into());
1242                            }
1243                        }
1244                    }
1245
1246                    let (columns, pk_names) =
1247                        bind_cdc_table_schema(&column_defs, &constraints, false)?;
1248                    // read default value definition from external db
1249                    let (options, secret_refs) = cdc_with_options.clone().into_parts();
1250                    let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1251                        .context("failed to extract external table config")?;
1252
1253                    (columns, pk_names)
1254                }
1255            };
1256
1257            let context: OptimizerContextRef =
1258                OptimizerContext::new(handler_args, explain_options).into();
1259            let shared_source_id = source.id;
1260            let (plan, table) = gen_create_table_plan_for_cdc_table(
1261                context,
1262                source,
1263                normalized_external_table_name,
1264                column_defs,
1265                source_watermarks,
1266                columns,
1267                pk_names,
1268                cdc_with_options,
1269                col_id_gen,
1270                on_conflict,
1271                with_version_columns,
1272                include_column_options,
1273                table_name.clone(),
1274                resolved_table_name,
1275                database_id,
1276                schema_id,
1277                TableId::placeholder(),
1278                engine,
1279            )?;
1280
1281            (
1282                (plan, None, table),
1283                TableJobType::SharedCdcSource,
1284                Some(shared_source_id),
1285            )
1286        }
1287        (Some(_), Some(_)) => {
1288            return Err(ErrorCode::NotSupported(
1289                "Data format and encoding format doesn't apply to table created from a CDC source"
1290                    .into(),
1291                "Remove the FORMAT and ENCODE specification".into(),
1292            )
1293            .into());
1294        }
1295    };
1296    Ok((plan, source, table, job_type, shared_shource_id))
1297}
1298
1299// For both table from cdc source and table with cdc connector
1300fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1301    let mut found_generated_column = false;
1302    for column in columns {
1303        let mut is_generated = false;
1304
1305        for option_def in &column.options {
1306            if let ColumnOption::GeneratedColumns(_) = option_def.option {
1307                is_generated = true;
1308                break;
1309            }
1310        }
1311
1312        if is_generated {
1313            found_generated_column = true;
1314        } else if found_generated_column {
1315            return Err(ErrorCode::NotSupported(
1316                "Non-generated column found after a generated column.".into(),
1317                "Ensure that all generated columns appear at the end of the cdc table definition."
1318                    .into(),
1319            )
1320            .into());
1321        }
1322    }
1323    Ok(())
1324}
1325
1326// For both table from cdc source and table with cdc connector
1327fn not_null_check_for_cdc_table(
1328    wildcard_idx: &Option<usize>,
1329    column_defs: &Vec<ColumnDef>,
1330) -> Result<()> {
1331    if !wildcard_idx.is_some()
1332        && column_defs.iter().any(|col| {
1333            col.options
1334                .iter()
1335                .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1336        })
1337    {
1338        return Err(ErrorCode::NotSupported(
1339            "CDC table with NOT NULL constraint is not supported".to_owned(),
1340            "Please remove the NOT NULL constraint for columns".to_owned(),
1341        )
1342        .into());
1343    }
1344    Ok(())
1345}
1346
1347// Only for table from cdc source
1348fn sanity_check_for_table_on_cdc_source(
1349    append_only: bool,
1350    column_defs: &Vec<ColumnDef>,
1351    wildcard_idx: &Option<usize>,
1352    constraints: &Vec<TableConstraint>,
1353    source_watermarks: &Vec<SourceWatermark>,
1354) -> Result<()> {
1355    // wildcard cannot be used with column definitions
1356    if wildcard_idx.is_some() && !column_defs.is_empty() {
1357        return Err(ErrorCode::NotSupported(
1358            "wildcard(*) and column definitions cannot be used together".to_owned(),
1359            "Remove the wildcard or column definitions".to_owned(),
1360        )
1361        .into());
1362    }
1363
1364    // cdc table must have primary key constraint or primary key column
1365    if !wildcard_idx.is_some()
1366        && !constraints.iter().any(|c| {
1367            matches!(
1368                c,
1369                TableConstraint::Unique {
1370                    is_primary: true,
1371                    ..
1372                }
1373            )
1374        })
1375        && !column_defs.iter().any(|col| {
1376            col.options
1377                .iter()
1378                .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1379        })
1380    {
1381        return Err(ErrorCode::NotSupported(
1382            "CDC table without primary key constraint is not supported".to_owned(),
1383            "Please define a primary key".to_owned(),
1384        )
1385        .into());
1386    }
1387
1388    if append_only {
1389        return Err(ErrorCode::NotSupported(
1390            "append only modifier on the table created from a CDC source".into(),
1391            "Remove the APPEND ONLY clause".into(),
1392        )
1393        .into());
1394    }
1395
1396    if !source_watermarks.is_empty()
1397        && source_watermarks
1398            .iter()
1399            .any(|watermark| !watermark.with_ttl)
1400    {
1401        return Err(ErrorCode::NotSupported(
1402            "non-TTL watermark defined on the table created from a CDC source".into(),
1403            "Use `WATERMARK ... WITH TTL` instead.".into(),
1404        )
1405        .into());
1406    }
1407
1408    Ok(())
1409}
1410
1411/// Derive schema for cdc table when create a new Table or alter an existing Table
1412async fn bind_cdc_table_schema_externally(
1413    cdc_with_options: WithOptionsSecResolved,
1414) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1415    // read cdc table schema from external db or parsing the schema from SQL definitions
1416    let (options, secret_refs) = cdc_with_options.into_parts();
1417    let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1418        .context("failed to extract external table config")?;
1419
1420    let table = ExternalTableImpl::connect(config)
1421        .await
1422        .context("failed to auto derive table schema")?;
1423
1424    Ok((
1425        table
1426            .column_descs()
1427            .iter()
1428            .cloned()
1429            .map(|column_desc| ColumnCatalog {
1430                column_desc,
1431                is_hidden: false,
1432            })
1433            .collect(),
1434        table.pk_names().clone(),
1435    ))
1436}
1437
1438/// Derive schema for cdc table when create a new Table or alter an existing Table
1439fn bind_cdc_table_schema(
1440    column_defs: &Vec<ColumnDef>,
1441    constraints: &Vec<TableConstraint>,
1442    is_for_replace_plan: bool,
1443) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1444    let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1445
1446    let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1447    Ok((columns, pk_names))
1448}
1449
1450#[allow(clippy::too_many_arguments)]
1451pub async fn handle_create_table(
1452    handler_args: HandlerArgs,
1453    table_name: ObjectName,
1454    column_defs: Vec<ColumnDef>,
1455    wildcard_idx: Option<usize>,
1456    constraints: Vec<TableConstraint>,
1457    if_not_exists: bool,
1458    format_encode: Option<FormatEncodeOptions>,
1459    source_watermarks: Vec<SourceWatermark>,
1460    append_only: bool,
1461    on_conflict: Option<OnConflict>,
1462    with_version_columns: Vec<String>,
1463    cdc_table_info: Option<CdcTableInfo>,
1464    include_column_options: IncludeOption,
1465    webhook_info: Option<WebhookSourceInfo>,
1466    ast_engine: risingwave_sqlparser::ast::Engine,
1467) -> Result<RwPgResponse> {
1468    let session = handler_args.session.clone();
1469
1470    if append_only {
1471        session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1472    }
1473
1474    session.check_cluster_limits().await?;
1475
1476    let engine = match ast_engine {
1477        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1478        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1479    };
1480
1481    if let Either::Right(resp) = session.check_relation_name_duplicated(
1482        table_name.clone(),
1483        StatementType::CREATE_TABLE,
1484        if_not_exists,
1485    )? {
1486        return Ok(resp);
1487    }
1488
1489    let (graph, source, hummock_table, job_type, shared_source_id) = {
1490        let (plan, source, table, job_type, shared_source_id) = handle_create_table_plan(
1491            handler_args.clone(),
1492            ExplainOptions::default(),
1493            format_encode,
1494            cdc_table_info,
1495            &table_name,
1496            column_defs.clone(),
1497            wildcard_idx,
1498            constraints.clone(),
1499            source_watermarks,
1500            append_only,
1501            on_conflict,
1502            with_version_columns,
1503            include_column_options,
1504            webhook_info,
1505            engine,
1506        )
1507        .await?;
1508        tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1509
1510        let graph = build_graph(plan, Some(GraphJobType::Table))?;
1511
1512        (graph, source, table, job_type, shared_source_id)
1513    };
1514
1515    tracing::trace!(
1516        "name={}, graph=\n{}",
1517        table_name,
1518        serde_json::to_string_pretty(&graph).unwrap()
1519    );
1520
1521    let dependencies = shared_source_id
1522        .map(|id| HashSet::from([id.as_object_id()]))
1523        .unwrap_or_default();
1524
1525    // Handle engine
1526    match engine {
1527        Engine::Hummock => {
1528            let catalog_writer = session.catalog_writer()?;
1529            let action = match job_type {
1530                TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
1531                _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
1532            };
1533            execute_with_long_running_notification(
1534                catalog_writer.create_table(
1535                    source.map(|s| s.to_prost()),
1536                    hummock_table.to_prost(),
1537                    graph,
1538                    job_type,
1539                    if_not_exists,
1540                    dependencies,
1541                ),
1542                &session,
1543                "CREATE TABLE",
1544                action,
1545            )
1546            .await?;
1547        }
1548        Engine::Iceberg => {
1549            let hummock_table_name = hummock_table.name.clone();
1550            session.create_staging_table(hummock_table.clone());
1551            let res = Box::pin(create_iceberg_engine_table(
1552                session.clone(),
1553                handler_args,
1554                source.map(|s| s.to_prost()),
1555                hummock_table,
1556                graph,
1557                table_name,
1558                job_type,
1559                if_not_exists,
1560            ))
1561            .await;
1562            session.drop_staging_table(&hummock_table_name);
1563            res?
1564        }
1565    }
1566
1567    Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1568}
1569
1570/// Iceberg table engine is composed of hummock table, iceberg sink and iceberg source.
1571///
1572/// 1. fetch iceberg engine options from the meta node. Or use iceberg engine connection provided by users.
1573/// 2. create a hummock table
1574/// 3. create an iceberg sink
1575/// 4. create an iceberg source
1576///
1577/// See <https://github.com/risingwavelabs/risingwave/issues/21586> for an architecture diagram.
1578#[allow(clippy::too_many_arguments)]
1579pub async fn create_iceberg_engine_table(
1580    session: Arc<SessionImpl>,
1581    handler_args: HandlerArgs,
1582    mut source: Option<PbSource>,
1583    table: TableCatalog,
1584    graph: StreamFragmentGraph,
1585    table_name: ObjectName,
1586    job_type: PbTableJobType,
1587    if_not_exists: bool,
1588) -> Result<()> {
1589    let rw_db_name = session
1590        .env()
1591        .catalog_reader()
1592        .read_guard()
1593        .get_database_by_id(table.database_id)?
1594        .name()
1595        .to_owned();
1596    let rw_schema_name = session
1597        .env()
1598        .catalog_reader()
1599        .read_guard()
1600        .get_schema_by_id(table.database_id, table.schema_id)?
1601        .name()
1602        .clone();
1603    let iceberg_catalog_name = rw_db_name.clone();
1604    let iceberg_database_name = rw_schema_name.clone();
1605    let iceberg_table_name = table_name.0.last().unwrap().real_value();
1606
1607    let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1608    let sink_decouple = session.config().sink_decouple();
1609    if matches!(sink_decouple, SinkDecouple::Disable) {
1610        bail!(
1611            "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1612        );
1613    }
1614
1615    let mut connection_ref = BTreeMap::new();
1616    let with_common = if iceberg_engine_connection.is_empty() {
1617        bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1618    } else {
1619        let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1620        assert_eq!(parts.len(), 2);
1621        let connection_catalog =
1622            session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1623        if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1624            if params.connection_type == ConnectionType::Iceberg as i32 {
1625                // With iceberg engine connection:
1626                connection_ref.insert(
1627                    "connection".to_owned(),
1628                    ConnectionRefValue {
1629                        connection_name: ObjectName::from(vec![
1630                            Ident::from(parts[0]),
1631                            Ident::from(parts[1]),
1632                        ]),
1633                    },
1634                );
1635
1636                let mut with_common = BTreeMap::new();
1637                with_common.insert("connector".to_owned(), "iceberg".to_owned());
1638                with_common.insert("database.name".to_owned(), iceberg_database_name);
1639                with_common.insert("table.name".to_owned(), iceberg_table_name);
1640
1641                let hosted_catalog = params
1642                    .properties
1643                    .get("hosted_catalog")
1644                    .map(|s| s.eq_ignore_ascii_case("true"))
1645                    .unwrap_or(false);
1646                if hosted_catalog {
1647                    let meta_client = session.env().meta_client();
1648                    let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1649
1650                    let meta_store_endpoint =
1651                        url::Url::parse(&meta_store_endpoint).map_err(|_| {
1652                            ErrorCode::InternalError(
1653                                "failed to parse the meta store endpoint".to_owned(),
1654                            )
1655                        })?;
1656                    let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1657                    let meta_store_user = meta_store_endpoint.username().to_owned();
1658                    let meta_store_password = match meta_store_endpoint.password() {
1659                        Some(password) => percent_decode_str(password)
1660                            .decode_utf8()
1661                            .map_err(|_| {
1662                                ErrorCode::InternalError(
1663                                    "failed to parse password from meta store endpoint".to_owned(),
1664                                )
1665                            })?
1666                            .into_owned(),
1667                        None => "".to_owned(),
1668                    };
1669                    let meta_store_host = meta_store_endpoint
1670                        .host_str()
1671                        .ok_or_else(|| {
1672                            ErrorCode::InternalError(
1673                                "failed to parse host from meta store endpoint".to_owned(),
1674                            )
1675                        })?
1676                        .to_owned();
1677                    let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1678                        ErrorCode::InternalError(
1679                            "failed to parse port from meta store endpoint".to_owned(),
1680                        )
1681                    })?;
1682                    let meta_store_database = meta_store_endpoint
1683                        .path()
1684                        .trim_start_matches('/')
1685                        .to_owned();
1686
1687                    let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1688                        bail!("failed to parse meta backend: {}", meta_store_backend);
1689                    };
1690
1691                    let catalog_uri = match meta_backend {
1692                        MetaBackend::Postgres => {
1693                            format!(
1694                                "jdbc:postgresql://{}:{}/{}",
1695                                meta_store_host, meta_store_port, meta_store_database
1696                            )
1697                        }
1698                        MetaBackend::Mysql => {
1699                            format!(
1700                                "jdbc:mysql://{}:{}/{}",
1701                                meta_store_host, meta_store_port, meta_store_database
1702                            )
1703                        }
1704                        MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1705                            bail!(
1706                                "Unsupported meta backend for iceberg engine table: {}",
1707                                meta_store_backend
1708                            );
1709                        }
1710                    };
1711
1712                    with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1713                    with_common.insert("catalog.uri".to_owned(), catalog_uri);
1714                    with_common.insert("catalog.jdbc.user".to_owned(), meta_store_user);
1715                    with_common.insert("catalog.jdbc.password".to_owned(), meta_store_password);
1716                    with_common.insert("catalog.name".to_owned(), iceberg_catalog_name);
1717                }
1718
1719                with_common
1720            } else {
1721                return Err(RwError::from(ErrorCode::InvalidParameterValue(
1722                    "Only iceberg connection could be used in iceberg engine".to_owned(),
1723                )));
1724            }
1725        } else {
1726            return Err(RwError::from(ErrorCode::InvalidParameterValue(
1727                "Private Link Service has been deprecated. Please create a new connection instead."
1728                    .to_owned(),
1729            )));
1730        }
1731    };
1732
1733    // Iceberg sinks require a primary key, if none is provided, we will use the _row_id column
1734    // Fetch primary key from columns
1735    let mut pks = table
1736        .pk_column_names()
1737        .iter()
1738        .map(|c| c.to_string())
1739        .collect::<Vec<String>>();
1740
1741    // For the table without primary key. We will use `_row_id` as primary key.
1742    if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1743        pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1744    }
1745
1746    let sink_from = CreateSink::From(table_name.clone());
1747
1748    let mut sink_name = table_name.clone();
1749    *sink_name.0.last_mut().unwrap() = Ident::from(
1750        (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1751    );
1752    let create_sink_stmt = CreateSinkStatement {
1753        if_not_exists: false,
1754        sink_name,
1755        with_properties: WithProperties(vec![]),
1756        sink_from,
1757        columns: vec![],
1758        emit_mode: None,
1759        sink_schema: None,
1760        into_table_name: None,
1761    };
1762
1763    let mut sink_handler_args = handler_args.clone();
1764
1765    let mut sink_with = with_common.clone();
1766
1767    // TODO: Iceberg with pk index doesn't support auto schema change
1768    if !handler_args
1769        .with_options
1770        .get(ENABLE_COMPACTION)
1771        .is_some_and(|val| val.eq_ignore_ascii_case("true"))
1772    {
1773        sink_with.insert(AUTO_SCHEMA_CHANGE_KEY.to_owned(), "true".to_owned());
1774    }
1775
1776    if table.append_only {
1777        sink_with.insert("type".to_owned(), "append-only".to_owned());
1778    } else {
1779        sink_with.insert("primary_key".to_owned(), pks.join(","));
1780        sink_with.insert("type".to_owned(), "upsert".to_owned());
1781    }
1782    // sink_with.insert(SINK_SNAPSHOT_OPTION.to_owned(), "false".to_owned());
1783    //
1784    // Note: in theory, we don't need to backfill from the table to the sink,
1785    // but we don't have atomic DDL now https://github.com/risingwavelabs/risingwave/issues/21863
1786    // so it may have potential data loss problem on the first barrier.
1787    //
1788    // For non-append-only table, we can always solve it by the initial sink with backfill, since
1789    // data will be present in hummock table.
1790    //
1791    // For append-only table, we need to be more careful.
1792    //
1793    // The possible cases for a table:
1794    // - For table without connector: it doesn't matter, since there's no data before the table is created
1795    // - For table with connector: we workarounded it by setting SOURCE_RATE_LIMIT to 0
1796    //   + If we support blocking DDL for table with connector, we need to be careful.
1797    // - For table with an upstream job: Specifically, CDC table from shared CDC source.
1798    //   + Data may come from both upstream connector, and CDC table backfill, so we need to pause both of them.
1799    //   + For now we don't support APPEND ONLY CDC table, so it's safe.
1800    let commit_checkpoint_interval = handler_args
1801        .with_options
1802        .get(COMMIT_CHECKPOINT_INTERVAL)
1803        .map(|v| v.to_owned())
1804        .unwrap_or_else(|| "60".to_owned());
1805    let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1806        ErrorCode::InvalidInputSyntax(format!(
1807            "commit_checkpoint_interval must be greater than 0: {}",
1808            commit_checkpoint_interval
1809        ))
1810    })?;
1811
1812    if commit_checkpoint_interval == 0 {
1813        bail!("commit_checkpoint_interval must be greater than 0");
1814    }
1815
1816    // remove commit_checkpoint_interval from source options, otherwise it will be considered as an unknown field.
1817    source
1818        .as_mut()
1819        .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1820
1821    let sink_decouple = session.config().sink_decouple();
1822    if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1823        bail!(
1824            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1825        )
1826    }
1827
1828    sink_with.insert(
1829        COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1830        commit_checkpoint_interval.to_string(),
1831    );
1832
1833    let commit_checkpoint_size_threshold_mb = handler_args
1834        .with_options
1835        .get(COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB)
1836        .map(|v| v.to_owned())
1837        .unwrap_or_else(|| ICEBERG_DEFAULT_COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB.to_string());
1838    let commit_checkpoint_size_threshold_mb = commit_checkpoint_size_threshold_mb
1839        .parse::<u64>()
1840        .map_err(|_| {
1841            ErrorCode::InvalidInputSyntax(format!(
1842                "{} must be greater than 0: {}",
1843                COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB, commit_checkpoint_size_threshold_mb
1844            ))
1845        })?;
1846    if commit_checkpoint_size_threshold_mb == 0 {
1847        bail!("{COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB} must be greater than 0");
1848    }
1849
1850    // remove commit_checkpoint_size_threshold_mb from source options, otherwise it will be considered as an unknown field.
1851    source.as_mut().map(|x| {
1852        x.with_properties
1853            .remove(COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB)
1854    });
1855    sink_with.insert(
1856        COMMIT_CHECKPOINT_SIZE_THRESHOLD_MB.to_owned(),
1857        commit_checkpoint_size_threshold_mb.to_string(),
1858    );
1859    sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1860
1861    sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1862
1863    if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1864        match enable_compaction.to_lowercase().as_str() {
1865            "true" => {
1866                sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1867            }
1868            "false" => {
1869                sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1870            }
1871            _ => {
1872                return Err(ErrorCode::InvalidInputSyntax(format!(
1873                    "enable_compaction must be true or false: {}",
1874                    enable_compaction
1875                ))
1876                .into());
1877            }
1878        }
1879
1880        // remove enable_compaction from source options, otherwise it will be considered as an unknown field.
1881        source
1882            .as_mut()
1883            .map(|x| x.with_properties.remove("enable_compaction"));
1884    } else {
1885        sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1886    }
1887
1888    if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1889        let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1890            ErrorCode::InvalidInputSyntax(format!(
1891                "compaction_interval_sec must be greater than 0: {}",
1892                commit_checkpoint_interval
1893            ))
1894        })?;
1895        if compaction_interval_sec == 0 {
1896            bail!("compaction_interval_sec must be greater than 0");
1897        }
1898        sink_with.insert(
1899            "compaction_interval_sec".to_owned(),
1900            compaction_interval_sec.to_string(),
1901        );
1902        // remove compaction_interval_sec from source options, otherwise it will be considered as an unknown field.
1903        source
1904            .as_mut()
1905            .map(|x| x.with_properties.remove("compaction_interval_sec"));
1906    }
1907
1908    let has_enabled_snapshot_expiration = if let Some(enable_snapshot_expiration) =
1909        handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1910    {
1911        // remove enable_snapshot_expiration from source options, otherwise it will be considered as an unknown field.
1912        source
1913            .as_mut()
1914            .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1915        match enable_snapshot_expiration.to_lowercase().as_str() {
1916            "true" => {
1917                sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1918                true
1919            }
1920            "false" => {
1921                sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1922                false
1923            }
1924            _ => {
1925                return Err(ErrorCode::InvalidInputSyntax(format!(
1926                    "enable_snapshot_expiration must be true or false: {}",
1927                    enable_snapshot_expiration
1928                ))
1929                .into());
1930            }
1931        }
1932    } else {
1933        sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1934        true
1935    };
1936
1937    if has_enabled_snapshot_expiration {
1938        // configuration for snapshot expiration
1939        if let Some(snapshot_expiration_retain_last) = handler_args
1940            .with_options
1941            .get(SNAPSHOT_EXPIRATION_RETAIN_LAST)
1942        {
1943            sink_with.insert(
1944                SNAPSHOT_EXPIRATION_RETAIN_LAST.to_owned(),
1945                snapshot_expiration_retain_last.to_owned(),
1946            );
1947            // remove snapshot_expiration_retain_last from source options, otherwise it will be considered as an unknown field.
1948            source
1949                .as_mut()
1950                .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_RETAIN_LAST));
1951        }
1952
1953        if let Some(snapshot_expiration_max_age) = handler_args
1954            .with_options
1955            .get(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS)
1956        {
1957            sink_with.insert(
1958                SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS.to_owned(),
1959                snapshot_expiration_max_age.to_owned(),
1960            );
1961            // remove snapshot_expiration_max_age from source options, otherwise it will be considered as an unknown field.
1962            source
1963                .as_mut()
1964                .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS));
1965        }
1966
1967        if let Some(snapshot_expiration_clear_expired_files) = handler_args
1968            .with_options
1969            .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1970        {
1971            sink_with.insert(
1972                SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES.to_owned(),
1973                snapshot_expiration_clear_expired_files.to_owned(),
1974            );
1975            // remove snapshot_expiration_clear_expired_files from source options, otherwise it will be considered as an unknown field.
1976            source.as_mut().map(|x| {
1977                x.with_properties
1978                    .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1979            });
1980        }
1981
1982        if let Some(snapshot_expiration_clear_expired_meta_data) = handler_args
1983            .with_options
1984            .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1985        {
1986            sink_with.insert(
1987                SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA.to_owned(),
1988                snapshot_expiration_clear_expired_meta_data.to_owned(),
1989            );
1990            // remove snapshot_expiration_clear_expired_meta_data from source options, otherwise it will be considered as an unknown field.
1991            source.as_mut().map(|x| {
1992                x.with_properties
1993                    .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1994            });
1995        }
1996    }
1997
1998    if let Some(format_version) = handler_args.with_options.get(FORMAT_VERSION) {
1999        let format_version = format_version.parse::<u8>().map_err(|_| {
2000            ErrorCode::InvalidInputSyntax(format!(
2001                "format_version must be 1, 2 or 3: {}",
2002                format_version
2003            ))
2004        })?;
2005        if format_version != 1 && format_version != 2 && format_version != 3 {
2006            bail!("format_version must be 1, 2 or 3");
2007        }
2008        sink_with.insert(FORMAT_VERSION.to_owned(), format_version.to_string());
2009
2010        // remove format_version from source options, otherwise it will be considered as an unknown field.
2011        source
2012            .as_mut()
2013            .map(|x| x.with_properties.remove(FORMAT_VERSION));
2014    }
2015
2016    if let Some(write_mode) = handler_args.with_options.get(WRITE_MODE) {
2017        let write_mode = IcebergWriteMode::try_from(write_mode.as_str()).map_err(|_| {
2018            ErrorCode::InvalidInputSyntax(format!(
2019                "invalid write_mode: {}, must be one of: {}, {}",
2020                write_mode, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
2021            ))
2022        })?;
2023
2024        match write_mode {
2025            IcebergWriteMode::MergeOnRead => {
2026                sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
2027            }
2028
2029            IcebergWriteMode::CopyOnWrite => {
2030                if table.append_only {
2031                    return Err(ErrorCode::NotSupported(
2032                        "COPY ON WRITE is not supported for append-only iceberg table".to_owned(),
2033                        "Please use MERGE ON READ instead".to_owned(),
2034                    )
2035                    .into());
2036                }
2037
2038                sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
2039            }
2040        }
2041
2042        // remove write_mode from source options, otherwise it will be considered as an unknown field.
2043        source
2044            .as_mut()
2045            .map(|x| x.with_properties.remove("write_mode"));
2046    } else {
2047        sink_with.insert(
2048            WRITE_MODE.to_owned(),
2049            ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
2050        );
2051    }
2052
2053    if let Some(max_snapshots_num_before_compaction) =
2054        handler_args.with_options.get(COMPACTION_MAX_SNAPSHOTS_NUM)
2055    {
2056        let max_snapshots_num_before_compaction = max_snapshots_num_before_compaction
2057            .parse::<u32>()
2058            .map_err(|_| {
2059                ErrorCode::InvalidInputSyntax(format!(
2060                    "{} must be greater than 0: {}",
2061                    COMPACTION_MAX_SNAPSHOTS_NUM, max_snapshots_num_before_compaction
2062                ))
2063            })?;
2064
2065        if max_snapshots_num_before_compaction == 0 {
2066            bail!(format!(
2067                "{} must be greater than 0",
2068                COMPACTION_MAX_SNAPSHOTS_NUM
2069            ));
2070        }
2071
2072        sink_with.insert(
2073            COMPACTION_MAX_SNAPSHOTS_NUM.to_owned(),
2074            max_snapshots_num_before_compaction.to_string(),
2075        );
2076
2077        // remove from source options, otherwise it will be considered as an unknown field.
2078        source
2079            .as_mut()
2080            .map(|x| x.with_properties.remove(COMPACTION_MAX_SNAPSHOTS_NUM));
2081    }
2082
2083    if let Some(small_files_threshold_mb) = handler_args
2084        .with_options
2085        .get(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2086    {
2087        let small_files_threshold_mb = small_files_threshold_mb.parse::<u64>().map_err(|_| {
2088            ErrorCode::InvalidInputSyntax(format!(
2089                "{} must be greater than 0: {}",
2090                COMPACTION_SMALL_FILES_THRESHOLD_MB, small_files_threshold_mb
2091            ))
2092        })?;
2093        if small_files_threshold_mb == 0 {
2094            bail!(format!(
2095                "{} must be a greater than 0",
2096                COMPACTION_SMALL_FILES_THRESHOLD_MB
2097            ));
2098        }
2099        sink_with.insert(
2100            COMPACTION_SMALL_FILES_THRESHOLD_MB.to_owned(),
2101            small_files_threshold_mb.to_string(),
2102        );
2103
2104        // remove from source options, otherwise it will be considered as an unknown field.
2105        source.as_mut().map(|x| {
2106            x.with_properties
2107                .remove(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2108        });
2109    }
2110
2111    if let Some(delete_files_count_threshold) = handler_args
2112        .with_options
2113        .get(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2114    {
2115        let delete_files_count_threshold =
2116            delete_files_count_threshold.parse::<usize>().map_err(|_| {
2117                ErrorCode::InvalidInputSyntax(format!(
2118                    "{} must be greater than 0: {}",
2119                    COMPACTION_DELETE_FILES_COUNT_THRESHOLD, delete_files_count_threshold
2120                ))
2121            })?;
2122        if delete_files_count_threshold == 0 {
2123            bail!(format!(
2124                "{} must be greater than 0",
2125                COMPACTION_DELETE_FILES_COUNT_THRESHOLD
2126            ));
2127        }
2128        sink_with.insert(
2129            COMPACTION_DELETE_FILES_COUNT_THRESHOLD.to_owned(),
2130            delete_files_count_threshold.to_string(),
2131        );
2132
2133        // remove from source options, otherwise it will be considered as an unknown field.
2134        source.as_mut().map(|x| {
2135            x.with_properties
2136                .remove(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2137        });
2138    }
2139
2140    if let Some(trigger_snapshot_count) = handler_args
2141        .with_options
2142        .get(COMPACTION_TRIGGER_SNAPSHOT_COUNT)
2143    {
2144        let trigger_snapshot_count = trigger_snapshot_count.parse::<usize>().map_err(|_| {
2145            ErrorCode::InvalidInputSyntax(format!(
2146                "{} must be greater than 0: {}",
2147                COMPACTION_TRIGGER_SNAPSHOT_COUNT, trigger_snapshot_count
2148            ))
2149        })?;
2150        if trigger_snapshot_count == 0 {
2151            bail!(format!(
2152                "{} must be greater than 0",
2153                COMPACTION_TRIGGER_SNAPSHOT_COUNT
2154            ));
2155        }
2156        sink_with.insert(
2157            COMPACTION_TRIGGER_SNAPSHOT_COUNT.to_owned(),
2158            trigger_snapshot_count.to_string(),
2159        );
2160
2161        // remove from source options, otherwise it will be considered as an unknown field.
2162        source
2163            .as_mut()
2164            .map(|x| x.with_properties.remove(COMPACTION_TRIGGER_SNAPSHOT_COUNT));
2165    }
2166
2167    if let Some(target_file_size_mb) = handler_args
2168        .with_options
2169        .get(COMPACTION_TARGET_FILE_SIZE_MB)
2170    {
2171        let target_file_size_mb = target_file_size_mb.parse::<u64>().map_err(|_| {
2172            ErrorCode::InvalidInputSyntax(format!(
2173                "{} must be greater than 0: {}",
2174                COMPACTION_TARGET_FILE_SIZE_MB, target_file_size_mb
2175            ))
2176        })?;
2177        if target_file_size_mb == 0 {
2178            bail!(format!(
2179                "{} must be greater than 0",
2180                COMPACTION_TARGET_FILE_SIZE_MB
2181            ));
2182        }
2183        sink_with.insert(
2184            COMPACTION_TARGET_FILE_SIZE_MB.to_owned(),
2185            target_file_size_mb.to_string(),
2186        );
2187        // remove from source options, otherwise it will be considered as an unknown field.
2188        source
2189            .as_mut()
2190            .map(|x| x.with_properties.remove(COMPACTION_TARGET_FILE_SIZE_MB));
2191    }
2192
2193    if let Some(compaction_type) = handler_args.with_options.get(COMPACTION_TYPE) {
2194        let compaction_type = CompactionType::try_from(compaction_type.as_str()).map_err(|_| {
2195            ErrorCode::InvalidInputSyntax(format!(
2196                "invalid compaction_type: {}, must be one of {:?}",
2197                compaction_type,
2198                &[
2199                    CompactionType::Full,
2200                    CompactionType::SmallFiles,
2201                    CompactionType::FilesWithDelete
2202                ]
2203            ))
2204        })?;
2205
2206        sink_with.insert(
2207            COMPACTION_TYPE.to_owned(),
2208            compaction_type.as_str().to_owned(),
2209        );
2210
2211        // remove from source options, otherwise it will be considered as an unknown field.
2212        source
2213            .as_mut()
2214            .map(|x| x.with_properties.remove(COMPACTION_TYPE));
2215    }
2216
2217    if let Some(write_parquet_compression) = handler_args
2218        .with_options
2219        .get(COMPACTION_WRITE_PARQUET_COMPRESSION)
2220    {
2221        sink_with.insert(
2222            COMPACTION_WRITE_PARQUET_COMPRESSION.to_owned(),
2223            write_parquet_compression.to_owned(),
2224        );
2225        // remove from source options, otherwise it will be considered as an unknown field.
2226        source.as_mut().map(|x| {
2227            x.with_properties
2228                .remove(COMPACTION_WRITE_PARQUET_COMPRESSION)
2229        });
2230    }
2231
2232    if let Some(write_parquet_max_row_group_rows) = handler_args
2233        .with_options
2234        .get(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS)
2235    {
2236        let write_parquet_max_row_group_rows = write_parquet_max_row_group_rows
2237            .parse::<usize>()
2238            .map_err(|_| {
2239                ErrorCode::InvalidInputSyntax(format!(
2240                    "{} must be a positive integer: {}",
2241                    COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS, write_parquet_max_row_group_rows
2242                ))
2243            })?;
2244        if write_parquet_max_row_group_rows == 0 {
2245            bail!(format!(
2246                "{} must be greater than 0",
2247                COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS
2248            ));
2249        }
2250        sink_with.insert(
2251            COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS.to_owned(),
2252            write_parquet_max_row_group_rows.to_string(),
2253        );
2254        // remove from source options, otherwise it will be considered as an unknown field.
2255        source.as_mut().map(|x| {
2256            x.with_properties
2257                .remove(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_ROWS)
2258        });
2259    }
2260
2261    if let Some(write_parquet_max_row_group_bytes) = handler_args
2262        .with_options
2263        .get(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES)
2264    {
2265        let write_parquet_max_row_group_bytes = write_parquet_max_row_group_bytes
2266            .parse::<usize>()
2267            .map_err(|_| {
2268                ErrorCode::InvalidInputSyntax(format!(
2269                    "{} must be a positive integer: {}",
2270                    COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES, write_parquet_max_row_group_bytes
2271                ))
2272            })?;
2273        if write_parquet_max_row_group_bytes == 0 {
2274            bail!(format!(
2275                "{} must be greater than 0",
2276                COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES
2277            ));
2278        }
2279        sink_with.insert(
2280            COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES.to_owned(),
2281            write_parquet_max_row_group_bytes.to_string(),
2282        );
2283        source.as_mut().map(|x| {
2284            x.with_properties
2285                .remove(COMPACTION_WRITE_PARQUET_MAX_ROW_GROUP_BYTES)
2286        });
2287    }
2288
2289    let partition_by = handler_args
2290        .with_options
2291        .get("partition_by")
2292        .map(|v| v.to_owned());
2293
2294    if let Some(partition_by) = &partition_by {
2295        let mut partition_columns = vec![];
2296        for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
2297            table
2298                .columns()
2299                .iter()
2300                .find(|col| col.name().eq_ignore_ascii_case(&column))
2301                .ok_or_else(|| {
2302                    ErrorCode::InvalidInputSyntax(format!(
2303                        "Partition source column does not exist in schema: {}",
2304                        column
2305                    ))
2306                })?;
2307
2308            partition_columns.push(column);
2309        }
2310
2311        ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
2312            |_| {
2313                ErrorCode::InvalidInputSyntax(
2314                    "The partition columns should be the prefix of the primary key".to_owned(),
2315                )
2316            },
2317        )?;
2318
2319        sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
2320
2321        // remove partition_by from source options, otherwise it will be considered as an unknown field.
2322        source
2323            .as_mut()
2324            .map(|x| x.with_properties.remove("partition_by"));
2325    }
2326
2327    let order_key = handler_args
2328        .with_options
2329        .get(ORDER_KEY)
2330        .map(|v| v.to_owned());
2331    if let Some(order_key) = &order_key {
2332        validate_order_key_columns(order_key, table.columns().iter().map(|col| col.name()))
2333            .map_err(|err| ErrorCode::InvalidInputSyntax(err.to_report_string()))?;
2334
2335        sink_with.insert(ORDER_KEY.to_owned(), order_key.to_owned());
2336
2337        source.as_mut().map(|x| x.with_properties.remove(ORDER_KEY));
2338    }
2339
2340    sink_handler_args.with_options =
2341        WithOptions::new(sink_with, Default::default(), connection_ref.clone());
2342    let SinkPlanContext {
2343        sink_plan,
2344        sink_catalog,
2345        ..
2346    } = gen_sink_plan(sink_handler_args, create_sink_stmt, None, true).await?;
2347    let sink_graph = build_graph(sink_plan, Some(GraphJobType::Sink))?;
2348
2349    let mut source_name = table_name.clone();
2350    *source_name.0.last_mut().unwrap() = Ident::from(
2351        (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
2352    );
2353    let create_source_stmt = CreateSourceStatement {
2354        temporary: false,
2355        if_not_exists: false,
2356        columns: vec![],
2357        source_name,
2358        wildcard_idx: Some(0),
2359        constraints: vec![],
2360        with_properties: WithProperties(vec![]),
2361        format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
2362        source_watermarks: vec![],
2363        include_column_options: vec![],
2364    };
2365
2366    let mut source_handler_args = handler_args.clone();
2367    let source_with = with_common;
2368    source_handler_args.with_options =
2369        WithOptions::new(source_with, Default::default(), connection_ref);
2370
2371    let overwrite_options = OverwriteOptions::new(&mut source_handler_args);
2372    let format_encode = create_source_stmt.format_encode.into_v2_with_warning();
2373    let (with_properties, refresh_mode) =
2374        bind_connector_props(&source_handler_args, &format_encode, true)?;
2375
2376    // Create iceberg sink table, used for iceberg source column binding. See `bind_columns_from_source_for_non_cdc` for more details.
2377    // TODO: We can derive the columns directly from table definition in the future, so that we don't need to pre-create the table catalog.
2378    let (iceberg_catalog, table_identifier) = {
2379        let sink_param = SinkParam::try_from_sink_catalog(sink_catalog.clone())?;
2380        let iceberg_sink = IcebergSink::try_from(sink_param)?;
2381        iceberg_sink.create_table_if_not_exists().await?;
2382
2383        let iceberg_catalog = iceberg_sink.config.create_catalog().await?;
2384        let table_identifier = iceberg_sink.config.full_table_name()?;
2385        (iceberg_catalog, table_identifier)
2386    };
2387
2388    let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
2389    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
2390        &session,
2391        &format_encode,
2392        Either::Left(&with_properties),
2393        create_source_type,
2394    )
2395    .await?;
2396    let mut col_id_gen = ColumnIdGenerator::new_initial();
2397
2398    let iceberg_source_catalog = bind_create_source_or_table_with_connector(
2399        source_handler_args,
2400        create_source_stmt.source_name,
2401        format_encode,
2402        with_properties,
2403        &create_source_stmt.columns,
2404        create_source_stmt.constraints,
2405        create_source_stmt.wildcard_idx,
2406        create_source_stmt.source_watermarks,
2407        columns_from_resolve_source,
2408        source_info,
2409        create_source_stmt.include_column_options,
2410        &mut col_id_gen,
2411        create_source_type,
2412        overwrite_options.source_rate_limit,
2413        SqlColumnStrategy::FollowChecked,
2414        refresh_mode,
2415    )
2416    .await?;
2417
2418    // before we create the table, ensure the JVM is initialized as we use jdbc catalog right now.
2419    // If JVM isn't initialized successfully, current not atomic ddl will result in a partially created iceberg engine table.
2420    let _ = Jvm::get_or_init()?;
2421
2422    let catalog_writer = session.catalog_writer()?;
2423    let action = match job_type {
2424        TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
2425        _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
2426    };
2427    let res = execute_with_long_running_notification(
2428        catalog_writer.create_iceberg_table(
2429            PbTableJobInfo {
2430                source,
2431                table: Some(table.to_prost()),
2432                fragment_graph: Some(graph),
2433                job_type: job_type as _,
2434            },
2435            PbSinkJobInfo {
2436                sink: Some(sink_catalog.to_proto()),
2437                fragment_graph: Some(sink_graph),
2438            },
2439            iceberg_source_catalog.to_prost(),
2440            if_not_exists,
2441        ),
2442        &session,
2443        "CREATE TABLE",
2444        action,
2445    )
2446    .await;
2447
2448    if res.is_err() {
2449        let _ = iceberg_catalog
2450            .drop_table(&table_identifier)
2451            .await
2452            .inspect_err(|err| {
2453                tracing::error!(
2454                    "failed to drop iceberg table {} after create iceberg engine table failed: {}",
2455                    table_identifier,
2456                    err.as_report()
2457                );
2458            });
2459        res?
2460    }
2461
2462    Ok(())
2463}
2464
2465pub fn check_create_table_with_source(
2466    with_options: &WithOptions,
2467    format_encode: Option<FormatEncodeOptions>,
2468    include_column_options: &IncludeOption,
2469    cdc_table_info: &Option<CdcTableInfo>,
2470) -> Result<Option<FormatEncodeOptions>> {
2471    // skip check for cdc table
2472    if cdc_table_info.is_some() {
2473        return Ok(format_encode);
2474    }
2475    let defined_source = with_options.is_source_connector();
2476
2477    if !include_column_options.is_empty() && !defined_source {
2478        return Err(ErrorCode::InvalidInputSyntax(
2479            "INCLUDE should be used with a connector".to_owned(),
2480        )
2481        .into());
2482    }
2483    if defined_source {
2484        format_encode.as_ref().ok_or_else(|| {
2485            ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
2486        })?;
2487    }
2488    Ok(format_encode)
2489}
2490
2491fn ensure_partition_columns_are_prefix_of_primary_key(
2492    partition_columns: &[String],
2493    primary_key_columns: &[String],
2494) -> std::result::Result<(), String> {
2495    if partition_columns.len() > primary_key_columns.len() {
2496        return Err("Partition columns cannot be longer than primary key columns.".to_owned());
2497    }
2498
2499    for (i, partition_col) in partition_columns.iter().enumerate() {
2500        if primary_key_columns.get(i) != Some(partition_col) {
2501            return Err(format!(
2502                "Partition column '{}' is not a prefix of the primary key.",
2503                partition_col
2504            ));
2505        }
2506    }
2507
2508    Ok(())
2509}
2510
2511#[allow(clippy::too_many_arguments)]
2512pub async fn generate_stream_graph_for_replace_table(
2513    _session: &Arc<SessionImpl>,
2514    table_name: ObjectName,
2515    original_catalog: &Arc<TableCatalog>,
2516    handler_args: HandlerArgs,
2517    statement: Statement,
2518    col_id_gen: ColumnIdGenerator,
2519    sql_column_strategy: SqlColumnStrategy,
2520) -> Result<(
2521    StreamFragmentGraph,
2522    TableCatalog,
2523    Option<SourceCatalog>,
2524    TableJobType,
2525)> {
2526    let Statement::CreateTable {
2527        columns,
2528        constraints,
2529        source_watermarks,
2530        append_only,
2531        on_conflict,
2532        with_version_columns,
2533        wildcard_idx,
2534        cdc_table_info,
2535        format_encode,
2536        include_column_options,
2537        engine,
2538        with_options,
2539        ..
2540    } = statement
2541    else {
2542        panic!("unexpected statement type: {:?}", statement);
2543    };
2544
2545    let format_encode = format_encode
2546        .clone()
2547        .map(|format_encode| format_encode.into_v2_with_warning());
2548
2549    let engine = match engine {
2550        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
2551        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
2552    };
2553
2554    let is_drop_connector =
2555        original_catalog.associated_source_id().is_some() && format_encode.is_none();
2556    if is_drop_connector {
2557        debug_assert!(
2558            source_watermarks.is_empty()
2559                && include_column_options.is_empty()
2560                && with_options
2561                    .iter()
2562                    .all(|opt| opt.name.real_value().to_lowercase() != "connector")
2563        );
2564    }
2565
2566    let props = CreateTableProps {
2567        definition: handler_args.normalized_sql.clone(),
2568        append_only,
2569        on_conflict: on_conflict.into(),
2570        with_version_columns: with_version_columns
2571            .iter()
2572            .map(|col| col.real_value())
2573            .collect(),
2574        webhook_info: original_catalog.webhook_info.clone(),
2575        engine,
2576    };
2577
2578    let ((plan, mut source, mut table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2579        (Some(format_encode), None) => (
2580            gen_create_table_plan_with_source(
2581                handler_args,
2582                ExplainOptions::default(),
2583                table_name,
2584                columns,
2585                wildcard_idx,
2586                constraints,
2587                format_encode,
2588                source_watermarks,
2589                col_id_gen,
2590                include_column_options,
2591                props,
2592                sql_column_strategy,
2593            )
2594            .await?,
2595            TableJobType::General,
2596        ),
2597        (None, None) => {
2598            let context = OptimizerContext::from_handler_args(handler_args);
2599            let (plan, table) = gen_create_table_plan(
2600                context,
2601                table_name,
2602                columns,
2603                constraints,
2604                col_id_gen,
2605                source_watermarks,
2606                props,
2607                true,
2608            )?;
2609            ((plan, None, table), TableJobType::General)
2610        }
2611        (None, Some(cdc_table)) => {
2612            sanity_check_for_table_on_cdc_source(
2613                append_only,
2614                &columns,
2615                &wildcard_idx,
2616                &constraints,
2617                &source_watermarks,
2618            )?;
2619
2620            let session = &handler_args.session;
2621            let (source, resolved_table_name) =
2622                get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2623
2624            let (cdc_with_options, normalized_external_table_name) =
2625                derive_with_options_for_cdc_table(
2626                    &source.with_properties,
2627                    cdc_table.external_table_name.clone(),
2628                )?;
2629
2630            let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2631
2632            let context: OptimizerContextRef =
2633                OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2634            let (plan, table) = gen_create_table_plan_for_cdc_table(
2635                context,
2636                source,
2637                normalized_external_table_name,
2638                columns,
2639                source_watermarks,
2640                column_catalogs,
2641                pk_names,
2642                cdc_with_options,
2643                col_id_gen,
2644                on_conflict,
2645                with_version_columns
2646                    .iter()
2647                    .map(|col| col.real_value())
2648                    .collect(),
2649                include_column_options,
2650                table_name,
2651                resolved_table_name,
2652                original_catalog.database_id,
2653                original_catalog.schema_id,
2654                original_catalog.id(),
2655                engine,
2656            )?;
2657
2658            ((plan, None, table), TableJobType::SharedCdcSource)
2659        }
2660        (Some(_), Some(_)) => {
2661            return Err(ErrorCode::NotSupported(
2662                "Data format and encoding format doesn't apply to table created from a CDC source"
2663                    .into(),
2664                "Remove the FORMAT and ENCODE specification".into(),
2665            )
2666            .into());
2667        }
2668    };
2669
2670    if table.pk_column_ids() != original_catalog.pk_column_ids() {
2671        Err(ErrorCode::InvalidInputSyntax(
2672            "alter primary key of table is not supported".to_owned(),
2673        ))?
2674    }
2675
2676    let graph = build_graph(plan, Some(GraphJobType::Table))?;
2677
2678    // Fill the original table ID.
2679    table.id = original_catalog.id();
2680    if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2681        table.associated_source_id = Some(source_id);
2682
2683        let source = source.as_mut().unwrap();
2684        source.id = source_id;
2685        source.associated_table_id = Some(table.id());
2686    }
2687
2688    Ok((graph, table, source, job_type))
2689}
2690
2691fn get_source_and_resolved_table_name(
2692    session: &Arc<SessionImpl>,
2693    cdc_table: CdcTableInfo,
2694    table_name: ObjectName,
2695) -> Result<(Arc<SourceCatalog>, String)> {
2696    let db_name = &session.database();
2697    let (_, resolved_table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
2698
2699    let (source_schema, source_name) =
2700        Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
2701
2702    let source = {
2703        let catalog_reader = session.env().catalog_reader().read_guard();
2704        let schema_name = source_schema.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2705        let (source, _) = catalog_reader.get_source_by_name(
2706            db_name,
2707            SchemaPath::Name(schema_name.as_str()),
2708            source_name.as_str(),
2709        )?;
2710        source.clone()
2711    };
2712
2713    Ok((source, resolved_table_name))
2714}
2715
2716// validate the webhook_info and also bind the webhook_info to protobuf
2717fn bind_webhook_info(
2718    session: &Arc<SessionImpl>,
2719    column_defs: &[ColumnDef],
2720    webhook_info: WebhookSourceInfo,
2721) -> Result<PbWebhookSourceInfo> {
2722    let WebhookSourceInfo {
2723        secret_ref,
2724        signature_expr,
2725        wait_for_persistence,
2726        is_batched,
2727    } = webhook_info;
2728
2729    for column in column_defs {
2730        for option_def in &column.options {
2731            match option_def.option {
2732                ColumnOption::Null => {}
2733                ColumnOption::GeneratedColumns(_) => {
2734                    return Err(ErrorCode::InvalidInputSyntax(
2735                        "generated columns are not supported for webhook tables".to_owned(),
2736                    )
2737                    .into());
2738                }
2739                ColumnOption::DefaultValue(_) | ColumnOption::DefaultValueInternal { .. } => {
2740                    return Err(ErrorCode::InvalidInputSyntax(
2741                        "default values are not supported for webhook tables".to_owned(),
2742                    )
2743                    .into());
2744                }
2745                ColumnOption::NotNull
2746                | ColumnOption::Unique { .. }
2747                | ColumnOption::ForeignKey { .. }
2748                | ColumnOption::Check(_)
2749                | ColumnOption::DialectSpecific(_) => {
2750                    return Err(ErrorCode::InvalidInputSyntax(
2751                        "only NULL column option is supported for webhook tables".to_owned(),
2752                    )
2753                    .into());
2754                }
2755            }
2756        }
2757    }
2758
2759    // validate secret_ref
2760    let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2761        let db_name = &session.database();
2762        let (schema_name, secret_name) =
2763            Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
2764        let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2765        (
2766            Some(PbSecretRef {
2767                secret_id: secret_catalog.id,
2768                ref_as: match secret_ref.ref_as {
2769                    SecretRefAsType::Text => PbRefAsType::Text,
2770                    SecretRefAsType::File => PbRefAsType::File,
2771                }
2772                .into(),
2773            }),
2774            Some(secret_name),
2775        )
2776    } else {
2777        (None, None)
2778    };
2779
2780    let signature_expr = if let Some(signature_expr) = signature_expr {
2781        let payload_name = if column_defs.len() == 1
2782            && column_defs[0].data_type.as_ref() == Some(&AstDataType::Jsonb)
2783        {
2784            column_defs[0].name.real_value()
2785        } else {
2786            WEBHOOK_PAYLOAD_FIELD_NAME.to_owned()
2787        };
2788        let secure_compare_context = SecureCompareContext {
2789            payload_name,
2790            secret_name,
2791        };
2792        let mut binder = Binder::new_for_ddl(session).with_secure_compare(secure_compare_context);
2793        let expr = binder.bind_expr(&signature_expr)?;
2794
2795        // validate expr, ensuring it is SECURE_COMPARE()
2796        if expr.as_function_call().is_none()
2797            || expr.as_function_call().unwrap().func_type()
2798                != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2799        {
2800            return Err(ErrorCode::InvalidInputSyntax(
2801                "The signature verification function must be SECURE_COMPARE()".to_owned(),
2802            )
2803            .into());
2804        }
2805
2806        Some(expr.to_expr_proto())
2807    } else {
2808        session.notice_to_user(
2809            "VALIDATE clause is strongly recommended for safety or production usages",
2810        );
2811        None
2812    };
2813
2814    let pb_webhook_info = PbWebhookSourceInfo {
2815        secret_ref: pb_secret_ref,
2816        signature_expr,
2817        wait_for_persistence,
2818        is_batched,
2819    };
2820
2821    Ok(pb_webhook_info)
2822}
2823
2824#[cfg(test)]
2825mod tests {
2826    use risingwave_common::catalog::{
2827        DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2828    };
2829    use risingwave_common::types::{DataType, StructType};
2830
2831    use super::*;
2832    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2833
2834    #[tokio::test]
2835    async fn test_create_table_handler() {
2836        let sql =
2837            "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2838        let frontend = LocalFrontend::new(Default::default()).await;
2839        frontend.run_sql(sql).await.unwrap();
2840
2841        let session = frontend.session_ref();
2842        let catalog_reader = session.env().catalog_reader().read_guard();
2843        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2844
2845        // Check table exists.
2846        let (table, _) = catalog_reader
2847            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2848            .unwrap();
2849        assert_eq!(table.name(), "t");
2850
2851        let columns = table
2852            .columns
2853            .iter()
2854            .map(|col| (col.name(), col.data_type().clone()))
2855            .collect::<HashMap<&str, DataType>>();
2856
2857        let expected_columns = maplit::hashmap! {
2858            ROW_ID_COLUMN_NAME => DataType::Serial,
2859            "v1" => DataType::Int16,
2860            "v2" => StructType::new(
2861                vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2862            )
2863            .with_ids([3, 4, 5].map(ColumnId::new))
2864            .into(),
2865            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2866        };
2867
2868        assert_eq!(columns, expected_columns, "{columns:#?}");
2869    }
2870
2871    #[tokio::test]
2872    async fn test_create_webhook_table_with_arbitrary_columns() {
2873        let frontend = LocalFrontend::new(Default::default()).await;
2874        frontend
2875            .run_sql("create schema ingest_schema;")
2876            .await
2877            .unwrap();
2878        frontend
2879            .run_sql(
2880                r#"
2881                create table ingest_schema.orders (
2882                    id int,
2883                    customer_name varchar,
2884                    amount double precision,
2885                    primary key (id)
2886                ) with (
2887                    connector = 'webhook'
2888                ) validate as secure_compare(
2889                    headers->>'x-rw-signature',
2890                    'sha256=' || encode(hmac('webhook-secret', payload, 'sha256'), 'hex')
2891                );
2892                "#,
2893            )
2894            .await
2895            .unwrap();
2896
2897        let session = frontend.session_ref();
2898        let catalog_reader = session.env().catalog_reader().read_guard();
2899        let (table, _) = catalog_reader
2900            .get_created_table_by_name(
2901                DEFAULT_DATABASE_NAME,
2902                SchemaPath::Name("ingest_schema"),
2903                "orders",
2904            )
2905            .unwrap();
2906
2907        assert!(table.webhook_info.is_some());
2908        assert_eq!(
2909            table
2910                .columns
2911                .iter()
2912                .filter(|column| column.can_dml())
2913                .count(),
2914            3
2915        );
2916    }
2917
2918    #[tokio::test]
2919    async fn test_create_webhook_table_uses_single_jsonb_column_name_in_validate() {
2920        let frontend = LocalFrontend::new(Default::default()).await;
2921        frontend
2922            .run_sql(
2923                r#"
2924                create table webhook_single_column (
2925                    body jsonb
2926                ) with (
2927                    connector = 'webhook'
2928                ) validate as secure_compare(
2929                    headers->>'x-rw-signature',
2930                    'sha256=' || encode(hmac('webhook-secret', body, 'sha256'), 'hex')
2931                );
2932                "#,
2933            )
2934            .await
2935            .unwrap();
2936    }
2937
2938    #[tokio::test]
2939    async fn test_create_webhook_table_with_generated_columns() {
2940        let frontend = LocalFrontend::new(Default::default()).await;
2941        let err = frontend
2942            .run_sql(
2943                r#"
2944                create table webhook_generated_columns (
2945                    id int,
2946                    amount double precision,
2947                    amount_with_fee double precision as amount + 1.0
2948                ) with (
2949                    connector = 'webhook'
2950                );
2951                "#,
2952            )
2953            .await
2954            .unwrap_err();
2955
2956        assert!(
2957            err.to_string()
2958                .contains("generated columns are not supported for webhook tables"),
2959            "{err:?}"
2960        );
2961    }
2962
2963    #[tokio::test]
2964    async fn test_create_webhook_table_with_default_value() {
2965        let frontend = LocalFrontend::new(Default::default()).await;
2966        let err = frontend
2967            .run_sql(
2968                r#"
2969                create table webhook_default_value (
2970                    id int default 42,
2971                    amount double precision
2972                ) with (
2973                    connector = 'webhook'
2974                );
2975                "#,
2976            )
2977            .await
2978            .unwrap_err();
2979
2980        assert!(
2981            err.to_string()
2982                .contains("default values are not supported for webhook tables"),
2983            "{err:?}"
2984        );
2985    }
2986
2987    #[tokio::test]
2988    async fn test_create_webhook_table_with_not_null_option() {
2989        let frontend = LocalFrontend::new(Default::default()).await;
2990        let err = frontend
2991            .run_sql(
2992                r#"
2993                create table webhook_not_null (
2994                    id int not null,
2995                    amount double precision
2996                ) with (
2997                    connector = 'webhook'
2998                );
2999                "#,
3000            )
3001            .await
3002            .unwrap_err();
3003
3004        assert!(
3005            err.to_string()
3006                .contains("only NULL column option is supported for webhook tables"),
3007            "{err:?}"
3008        );
3009    }
3010
3011    #[test]
3012    fn test_bind_primary_key() {
3013        // Note: Column ID 0 is reserved for row ID column.
3014
3015        for (sql, expected) in [
3016            ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
3017            ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
3018            ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
3019            (
3020                "create table t (v1 int primary key, v2 int primary key)",
3021                Err("multiple primary keys are not allowed"),
3022            ),
3023            (
3024                "create table t (v1 int primary key primary key, v2 int)",
3025                Err("multiple primary keys are not allowed"),
3026            ),
3027            (
3028                "create table t (v1 int, v2 int, primary key (v1))",
3029                Ok(&[1]),
3030            ),
3031            (
3032                "create table t (v1 int, primary key (v2), v2 int)",
3033                Ok(&[2]),
3034            ),
3035            (
3036                "create table t (primary key (v2, v1), v1 int, v2 int)",
3037                Ok(&[2, 1]),
3038            ),
3039            (
3040                "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
3041                Err("multiple primary keys are not allowed"),
3042            ),
3043            (
3044                "create table t (v1 int primary key, primary key (v1), v2 int)",
3045                Err("multiple primary keys are not allowed"),
3046            ),
3047            (
3048                "create table t (v1 int, primary key (V3), v2 int)",
3049                Err("column \"v3\" named in key does not exist"),
3050            ),
3051        ] {
3052            let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
3053            let risingwave_sqlparser::ast::Statement::CreateTable {
3054                columns: column_defs,
3055                constraints,
3056                ..
3057            } = ast.remove(0)
3058            else {
3059                panic!("test case should be create table")
3060            };
3061            let actual: Result<_> = (|| {
3062                let mut columns = bind_sql_columns(&column_defs, false)?;
3063                let mut col_id_gen = ColumnIdGenerator::new_initial();
3064                for c in &mut columns {
3065                    col_id_gen.generate(c)?;
3066                }
3067
3068                let pk_names =
3069                    bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
3070                let (_, pk_column_ids, _) =
3071                    bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
3072                Ok(pk_column_ids)
3073            })();
3074            match (expected, actual) {
3075                (Ok(expected), Ok(actual)) => assert_eq!(
3076                    expected.iter().copied().map(ColumnId::new).collect_vec(),
3077                    actual,
3078                    "sql: {sql}"
3079                ),
3080                (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
3081                (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
3082                (Err(expected), Err(actual)) => assert!(
3083                    actual.to_string().contains(expected),
3084                    "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
3085                ),
3086            }
3087        }
3088    }
3089
3090    #[tokio::test]
3091    async fn test_duplicate_props_options() {
3092        let proto_file = create_proto_file(PROTO_FILE_DATA);
3093        let sql = format!(
3094            r#"CREATE TABLE t
3095    WITH (
3096        connector = 'kinesis',
3097        aws.region='user_test_topic',
3098        endpoint='172.10.1.1:9090,172.10.1.2:9090',
3099        aws.credentials.access_key_id = 'your_access_key_1',
3100        aws.credentials.secret_access_key = 'your_secret_key_1'
3101    )
3102    FORMAT PLAIN ENCODE PROTOBUF (
3103        message = '.test.TestRecord',
3104        aws.credentials.access_key_id = 'your_access_key_2',
3105        aws.credentials.secret_access_key = 'your_secret_key_2',
3106        schema.location = 'file://{}',
3107    )"#,
3108            proto_file.path().to_str().unwrap()
3109        );
3110        let frontend = LocalFrontend::new(Default::default()).await;
3111        frontend.run_sql(sql).await.unwrap();
3112
3113        let session = frontend.session_ref();
3114        let catalog_reader = session.env().catalog_reader().read_guard();
3115        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
3116
3117        // Check source exists.
3118        let (source, _) = catalog_reader
3119            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
3120            .unwrap();
3121        assert_eq!(source.name, "t");
3122
3123        // AwsAuth params exist in options.
3124        assert_eq!(
3125            source
3126                .info
3127                .format_encode_options
3128                .get("aws.credentials.access_key_id")
3129                .unwrap(),
3130            "your_access_key_2"
3131        );
3132        assert_eq!(
3133            source
3134                .info
3135                .format_encode_options
3136                .get("aws.credentials.secret_access_key")
3137                .unwrap(),
3138            "your_secret_key_2"
3139        );
3140
3141        // AwsAuth params exist in props.
3142        assert_eq!(
3143            source
3144                .with_properties
3145                .get("aws.credentials.access_key_id")
3146                .unwrap(),
3147            "your_access_key_1"
3148        );
3149        assert_eq!(
3150            source
3151                .with_properties
3152                .get("aws.credentials.secret_access_key")
3153                .unwrap(),
3154            "your_secret_key_1"
3155        );
3156
3157        // Options are not merged into props.
3158        assert!(!source.with_properties.contains_key("schema.location"));
3159    }
3160}