risingwave_frontend/handler/
create_table.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28    CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29    ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME,
30    TableId,
31};
32use risingwave_common::config::MetaBackend;
33use risingwave_common::global_jvm::Jvm;
34use risingwave_common::session_config::sink_decouple::SinkDecouple;
35use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
36use risingwave_common::util::value_encoding::DatumToProtoExt;
37use risingwave_common::{bail, bail_not_implemented};
38use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
39use risingwave_connector::source::cdc::build_cdc_table_id;
40use risingwave_connector::source::cdc::external::{
41    DATABASE_NAME_KEY, ExternalCdcTableType, ExternalTableConfig, ExternalTableImpl,
42    SCHEMA_NAME_KEY, TABLE_NAME_KEY,
43};
44use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, source};
45use risingwave_pb::catalog::connection::Info as ConnectionInfo;
46use risingwave_pb::catalog::connection_params::ConnectionType;
47use risingwave_pb::catalog::{PbSource, PbWebhookSourceInfo, WatermarkDesc};
48use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
49use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
50use risingwave_pb::plan_common::{
51    AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
52};
53use risingwave_pb::secret::PbSecretRef;
54use risingwave_pb::secret::secret_ref::PbRefAsType;
55use risingwave_pb::stream_plan::StreamFragmentGraph;
56use risingwave_sqlparser::ast::{
57    CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
58    CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
59    FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
60    Statement, TableConstraint, WebhookSourceInfo, WithProperties,
61};
62use risingwave_sqlparser::parser::{IncludeOption, Parser};
63use thiserror_ext::AsReport;
64
65use super::RwPgResponse;
66use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
67use crate::binder::{Clause, SecureCompareContext, bind_data_type};
68use crate::catalog::root_catalog::SchemaPath;
69use crate::catalog::source_catalog::SourceCatalog;
70use crate::catalog::table_catalog::TableVersion;
71use crate::catalog::{ColumnId, DatabaseId, SchemaId, SourceId, check_column_name_not_reserved};
72use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
73use crate::expr::{Expr, ExprImpl, ExprRewriter};
74use crate::handler::HandlerArgs;
75use crate::handler::create_source::{
76    UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
77    bind_source_watermark, handle_addition_columns,
78};
79use crate::handler::util::{
80    LongRunningNotificationAction, SourceSchemaCompatExt, execute_with_long_running_notification,
81};
82use crate::optimizer::plan_node::generic::{SourceNodeKind, build_cdc_scan_options_with_options};
83use crate::optimizer::plan_node::{
84    LogicalCdcScan, LogicalPlanRef, LogicalSource, StreamPlanRef as PlanRef,
85};
86use crate::optimizer::property::{Order, RequiredDist};
87use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRoot};
88use crate::session::SessionImpl;
89use crate::session::current::notice_to_user;
90use crate::stream_fragmenter::{GraphJobType, build_graph};
91use crate::utils::OverwriteOptions;
92use crate::{Binder, Explain, TableCatalog, WithOptions};
93
94mod col_id_gen;
95pub use col_id_gen::*;
96use risingwave_connector::sink::SinkParam;
97use risingwave_connector::sink::iceberg::{
98    COMPACTION_DELETE_FILES_COUNT_THRESHOLD, COMPACTION_INTERVAL_SEC, COMPACTION_MAX_SNAPSHOTS_NUM,
99    COMPACTION_SMALL_FILES_THRESHOLD_MB, COMPACTION_TARGET_FILE_SIZE_MB,
100    COMPACTION_TRIGGER_SNAPSHOT_COUNT, COMPACTION_TYPE, CompactionType, ENABLE_COMPACTION,
101    ENABLE_SNAPSHOT_EXPIRATION, ICEBERG_WRITE_MODE_COPY_ON_WRITE, ICEBERG_WRITE_MODE_MERGE_ON_READ,
102    IcebergSink, IcebergWriteMode, SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES,
103    SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA, SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS,
104    SNAPSHOT_EXPIRATION_RETAIN_LAST, WRITE_MODE, parse_partition_by_exprs,
105};
106use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
107
108use crate::handler::create_sink::{SinkPlanContext, gen_sink_plan};
109
110fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
111    for option_def in &c.options {
112        match option_def.option {
113            ColumnOption::GeneratedColumns(_) => {}
114            ColumnOption::DefaultValue(_) => {}
115            ColumnOption::DefaultValueInternal { .. } => {}
116            ColumnOption::Unique { is_primary: true } => {}
117            ColumnOption::Null => {}
118            ColumnOption::NotNull => {}
119            _ => bail_not_implemented!("column constraints \"{}\"", option_def),
120        }
121    }
122    Ok(())
123}
124
125/// Binds the column schemas declared in CREATE statement into `ColumnDesc`.
126/// If a column is marked as `primary key`, its `ColumnId` is also returned.
127/// This primary key is not combined with table constraints yet.
128pub fn bind_sql_columns(
129    column_defs: &[ColumnDef],
130    is_for_drop_table_connector: bool,
131) -> Result<Vec<ColumnCatalog>> {
132    let mut columns = Vec::with_capacity(column_defs.len());
133
134    for column in column_defs {
135        ensure_column_options_supported(column)?;
136        // Destruct to make sure all fields are properly handled rather than ignored.
137        // Do NOT use `..` to ignore fields you do not want to deal with.
138        // Reject them with a clear NotImplemented error.
139        let ColumnDef {
140            name,
141            data_type,
142            collation,
143            options,
144            ..
145        } = column;
146
147        let data_type = data_type
148            .clone()
149            .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
150        if let Some(collation) = collation {
151            // PostgreSQL will limit the datatypes that collate can work on.
152            // https://www.postgresql.org/docs/16/collation.html#COLLATION-CONCEPTS
153            //   > The built-in collatable data types are `text`, `varchar`, and `char`.
154            //
155            // But we don't support real collation, we simply ignore it here.
156            if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
157                bail_not_implemented!(
158                    "Collate collation other than `C` or `POSIX` is not implemented"
159                );
160            }
161
162            match data_type {
163                AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
164                _ => {
165                    return Err(ErrorCode::NotSupported(
166                        format!("{} is not a collatable data type", data_type),
167                        "The only built-in collatable data types are `varchar`, please check your type".into(),
168                    ).into());
169                }
170            }
171        }
172
173        if !is_for_drop_table_connector {
174            // additional column name may have prefix _rw
175            // When converting dropping the connector from table, the additional columns are converted to normal columns and keep the original name.
176            // Under this case, we loosen the check for _rw prefix.
177            check_column_name_not_reserved(&name.real_value())?;
178        }
179
180        let nullable: bool = !options
181            .iter()
182            .any(|def| matches!(def.option, ColumnOption::NotNull));
183
184        columns.push(ColumnCatalog {
185            column_desc: ColumnDesc {
186                data_type: bind_data_type(&data_type)?,
187                column_id: ColumnId::placeholder(),
188                name: name.real_value(),
189                generated_or_default_column: None,
190                description: None,
191                additional_column: AdditionalColumn { column_type: None },
192                version: ColumnDescVersion::LATEST,
193                system_column: None,
194                nullable,
195            },
196            is_hidden: false,
197        });
198    }
199
200    Ok(columns)
201}
202
203fn check_generated_column_constraints(
204    column_name: &String,
205    column_id: ColumnId,
206    expr: &ExprImpl,
207    column_catalogs: &[ColumnCatalog],
208    generated_column_names: &[String],
209    pk_column_ids: &[ColumnId],
210) -> Result<()> {
211    let input_refs = expr.collect_input_refs(column_catalogs.len());
212    for idx in input_refs.ones() {
213        let referred_generated_column = &column_catalogs[idx].column_desc.name;
214        if generated_column_names
215            .iter()
216            .any(|c| c == referred_generated_column)
217        {
218            return Err(ErrorCode::BindError(format!(
219                "Generated can not reference another generated column. \
220                But here generated column \"{}\" referenced another generated column \"{}\"",
221                column_name, referred_generated_column
222            ))
223            .into());
224        }
225    }
226
227    if pk_column_ids.contains(&column_id) && expr.is_impure() {
228        return Err(ErrorCode::BindError(format!(
229            "Generated columns with impure expressions should not be part of the primary key. \
230            Here column \"{}\" is defined as part of the primary key.",
231            column_name
232        ))
233        .into());
234    }
235
236    Ok(())
237}
238
239/// Binds constraints that can be only specified in column definitions,
240/// currently generated columns and default columns.
241pub fn bind_sql_column_constraints(
242    session: &SessionImpl,
243    table_name: String,
244    column_catalogs: &mut [ColumnCatalog],
245    columns: &[ColumnDef],
246    pk_column_ids: &[ColumnId],
247) -> Result<()> {
248    let generated_column_names = {
249        let mut names = vec![];
250        for column in columns {
251            for option_def in &column.options {
252                if let ColumnOption::GeneratedColumns(_) = option_def.option {
253                    names.push(column.name.real_value());
254                    break;
255                }
256            }
257        }
258        names
259    };
260
261    let mut binder = Binder::new_for_ddl(session);
262    binder.bind_columns_to_context(table_name, column_catalogs)?;
263
264    for column in columns {
265        let Some(idx) = column_catalogs
266            .iter()
267            .position(|c| c.name() == column.name.real_value())
268        else {
269            // It's possible that we don't follow the user defined columns in SQL but take the
270            // ones resolved from the source, thus missing some columns. Simply ignore them.
271            continue;
272        };
273
274        for option_def in &column.options {
275            match &option_def.option {
276                ColumnOption::GeneratedColumns(expr) => {
277                    binder.set_clause(Some(Clause::GeneratedColumn));
278
279                    let expr_impl = binder.bind_expr(expr).with_context(|| {
280                        format!(
281                            "fail to bind expression in generated column \"{}\"",
282                            column.name.real_value()
283                        )
284                    })?;
285
286                    check_generated_column_constraints(
287                        &column.name.real_value(),
288                        column_catalogs[idx].column_id(),
289                        &expr_impl,
290                        column_catalogs,
291                        &generated_column_names,
292                        pk_column_ids,
293                    )?;
294
295                    column_catalogs[idx].column_desc.generated_or_default_column = Some(
296                        GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
297                            expr: Some(expr_impl.to_expr_proto()),
298                        }),
299                    );
300                    binder.set_clause(None);
301                }
302                ColumnOption::DefaultValue(expr) => {
303                    let expr_impl = binder
304                        .bind_expr(expr)?
305                        .cast_assign(column_catalogs[idx].data_type())?;
306
307                    // Rewrite expressions to evaluate a snapshot value, used for missing values in the case of
308                    // schema change.
309                    //
310                    // TODO: Currently we don't support impure expressions other than `now()` (like `random()`),
311                    // so the rewritten expression should almost always be pure and we directly call `fold_const`
312                    // here. Actually we do not require purity of the expression here since we're only to get a
313                    // snapshot value.
314                    let rewritten_expr_impl = session
315                        .pinned_snapshot()
316                        .inline_now_proc_time()
317                        .rewrite_expr(expr_impl.clone());
318
319                    if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
320                        let snapshot_value = snapshot_value?;
321
322                        column_catalogs[idx].column_desc.generated_or_default_column =
323                            Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
324                                snapshot_value: Some(snapshot_value.to_protobuf()),
325                                expr: Some(expr_impl.to_expr_proto()),
326                                // persist the original expression
327                            }));
328                    } else {
329                        return Err(ErrorCode::BindError(format!(
330                            "Default expression used in column `{}` cannot be evaluated. \
331                            Use generated columns instead if you mean to reference other columns.",
332                            column.name
333                        ))
334                        .into());
335                    }
336                }
337                ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
338                    // When a `DEFAULT INTERNAL` is used internally for schema change, the persisted value
339                    // should already be set during purifcation. So if we encounter an empty value here, it
340                    // means the user has specified it explicitly in the SQL statement, typically by
341                    // directly copying the result of `SHOW CREATE TABLE` and executing it.
342                    if persisted.is_empty() {
343                        bail_bind_error!(
344                            "DEFAULT INTERNAL is only used for internal purposes, \
345                             please specify a concrete default value"
346                        );
347                    }
348
349                    let desc = DefaultColumnDesc::decode(&**persisted)
350                        .expect("failed to decode persisted `DefaultColumnDesc`");
351
352                    column_catalogs[idx].column_desc.generated_or_default_column =
353                        Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
354                }
355                _ => {}
356            }
357        }
358    }
359    Ok(())
360}
361
362/// Currently we only support Primary key table constraint, so just return pk names if it exists
363pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
364    let mut pk_column_names = vec![];
365
366    for constraint in table_constraints {
367        match constraint {
368            TableConstraint::Unique {
369                name: _,
370                columns,
371                is_primary: true,
372            } => {
373                if !pk_column_names.is_empty() {
374                    return Err(multiple_pk_definition_err());
375                }
376                pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
377            }
378            _ => bail_not_implemented!("table constraint \"{}\"", constraint),
379        }
380    }
381    Ok(pk_column_names)
382}
383
384pub fn bind_sql_pk_names(
385    columns_defs: &[ColumnDef],
386    pk_names_from_table_constraints: Vec<String>,
387) -> Result<Vec<String>> {
388    let mut pk_column_names = pk_names_from_table_constraints;
389
390    for column in columns_defs {
391        for option_def in &column.options {
392            if let ColumnOption::Unique { is_primary: true } = option_def.option {
393                if !pk_column_names.is_empty() {
394                    return Err(multiple_pk_definition_err());
395                }
396                pk_column_names.push(column.name.real_value());
397            };
398        }
399    }
400
401    Ok(pk_column_names)
402}
403
404fn multiple_pk_definition_err() -> RwError {
405    ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
406}
407
408/// Binds primary keys defined in SQL.
409///
410/// It returns the columns together with `pk_column_ids`, and an optional row id column index if
411/// added.
412pub fn bind_pk_and_row_id_on_relation(
413    mut columns: Vec<ColumnCatalog>,
414    pk_names: Vec<String>,
415    must_need_pk: bool,
416) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
417    for c in &columns {
418        assert!(c.column_id() != ColumnId::placeholder());
419    }
420
421    // Mapping from column name to column id.
422    let name_to_id = columns
423        .iter()
424        .map(|c| (c.name(), c.column_id()))
425        .collect::<HashMap<_, _>>();
426
427    let mut pk_column_ids: Vec<_> = pk_names
428        .iter()
429        .map(|name| {
430            name_to_id.get(name.as_str()).copied().ok_or_else(|| {
431                ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
432            })
433        })
434        .try_collect()?;
435
436    // Add `_row_id` column if `pk_column_ids` is empty and must_need_pk
437    let need_row_id = pk_column_ids.is_empty() && must_need_pk;
438
439    let row_id_index = need_row_id.then(|| {
440        let column = ColumnCatalog::row_id_column();
441        let index = columns.len();
442        pk_column_ids = vec![column.column_id()];
443        columns.push(column);
444        index
445    });
446
447    if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
448        Err(ErrorCode::InvalidInputSyntax(format!(
449            "column \"{col}\" specified more than once"
450        )))?;
451    }
452
453    Ok((columns, pk_column_ids, row_id_index))
454}
455
456/// `gen_create_table_plan_with_source` generates the plan for creating a table with an external
457/// stream source.
458#[allow(clippy::too_many_arguments)]
459pub(crate) async fn gen_create_table_plan_with_source(
460    mut handler_args: HandlerArgs,
461    explain_options: ExplainOptions,
462    table_name: ObjectName,
463    column_defs: Vec<ColumnDef>,
464    wildcard_idx: Option<usize>,
465    constraints: Vec<TableConstraint>,
466    format_encode: FormatEncodeOptions,
467    source_watermarks: Vec<SourceWatermark>,
468    mut col_id_gen: ColumnIdGenerator,
469    include_column_options: IncludeOption,
470    props: CreateTableProps,
471    sql_column_strategy: SqlColumnStrategy,
472) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog)> {
473    if props.append_only
474        && format_encode.format != Format::Plain
475        && format_encode.format != Format::Native
476    {
477        return Err(ErrorCode::BindError(format!(
478            "Append only table does not support format {}.",
479            format_encode.format
480        ))
481        .into());
482    }
483
484    let session = &handler_args.session;
485    let (with_properties, refresh_mode) =
486        bind_connector_props(&handler_args, &format_encode, false)?;
487    if with_properties.is_shareable_cdc_connector() {
488        generated_columns_check_for_cdc_table(&column_defs)?;
489        not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
490    } else if column_defs.iter().any(|col| {
491        col.options
492            .iter()
493            .any(|def| matches!(def.option, ColumnOption::NotNull))
494    }) {
495        // if non-cdc source
496        notice_to_user(
497            "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
498        );
499    }
500
501    let db_name: &str = &session.database();
502    let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
503
504    // TODO: omit this step if `sql_column_strategy` is `Follow`.
505    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
506        session,
507        &format_encode,
508        Either::Left(&with_properties),
509        CreateSourceType::Table,
510    )
511    .await?;
512
513    let overwrite_options = OverwriteOptions::new(&mut handler_args);
514    let rate_limit = overwrite_options.source_rate_limit;
515    let source = bind_create_source_or_table_with_connector(
516        handler_args.clone(),
517        table_name,
518        format_encode,
519        with_properties,
520        &column_defs,
521        constraints,
522        wildcard_idx,
523        source_watermarks,
524        columns_from_resolve_source,
525        source_info,
526        include_column_options,
527        &mut col_id_gen,
528        CreateSourceType::Table,
529        rate_limit,
530        sql_column_strategy,
531        refresh_mode,
532    )
533    .await?;
534
535    let context = OptimizerContext::new(handler_args, explain_options);
536
537    let (plan, table) = gen_table_plan_with_source(
538        context.into(),
539        schema_name,
540        source.clone(),
541        col_id_gen.into_version(),
542        props,
543    )?;
544
545    Ok((plan, Some(source), table))
546}
547
548/// `gen_create_table_plan` generates the plan for creating a table without an external stream
549/// source.
550#[allow(clippy::too_many_arguments)]
551pub(crate) fn gen_create_table_plan(
552    context: OptimizerContext,
553    table_name: ObjectName,
554    column_defs: Vec<ColumnDef>,
555    constraints: Vec<TableConstraint>,
556    mut col_id_gen: ColumnIdGenerator,
557    source_watermarks: Vec<SourceWatermark>,
558    props: CreateTableProps,
559    is_for_replace_plan: bool,
560) -> Result<(PlanRef, TableCatalog)> {
561    let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
562    for c in &mut columns {
563        col_id_gen.generate(c)?;
564    }
565
566    let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
567    if !secret_refs.is_empty() || !connection_refs.is_empty() {
568        return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
569    }
570
571    gen_create_table_plan_without_source(
572        context,
573        table_name,
574        columns,
575        column_defs,
576        constraints,
577        source_watermarks,
578        col_id_gen.into_version(),
579        props,
580    )
581}
582
583#[allow(clippy::too_many_arguments)]
584pub(crate) fn gen_create_table_plan_without_source(
585    context: OptimizerContext,
586    table_name: ObjectName,
587    columns: Vec<ColumnCatalog>,
588    column_defs: Vec<ColumnDef>,
589    constraints: Vec<TableConstraint>,
590    source_watermarks: Vec<SourceWatermark>,
591    version: TableVersion,
592    props: CreateTableProps,
593) -> Result<(PlanRef, TableCatalog)> {
594    // XXX: Why not bind outside?
595    let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
596    let (mut columns, pk_column_ids, row_id_index) =
597        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
598
599    let watermark_descs = bind_source_watermark(
600        context.session_ctx(),
601        table_name.real_value(),
602        source_watermarks,
603        &columns,
604    )?;
605
606    bind_sql_column_constraints(
607        context.session_ctx(),
608        table_name.real_value(),
609        &mut columns,
610        &column_defs,
611        &pk_column_ids,
612    )?;
613    let session = context.session_ctx().clone();
614
615    let db_name = &session.database();
616    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
617
618    let info = CreateTableInfo {
619        columns,
620        pk_column_ids,
621        row_id_index,
622        watermark_descs,
623        source_catalog: None,
624        version,
625    };
626
627    gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
628}
629
630fn gen_table_plan_with_source(
631    context: OptimizerContextRef,
632    schema_name: Option<String>,
633    source_catalog: SourceCatalog,
634    version: TableVersion,
635    props: CreateTableProps,
636) -> Result<(PlanRef, TableCatalog)> {
637    let table_name = source_catalog.name.clone();
638
639    let info = CreateTableInfo {
640        columns: source_catalog.columns.clone(),
641        pk_column_ids: source_catalog.pk_col_ids.clone(),
642        row_id_index: source_catalog.row_id_index,
643        watermark_descs: source_catalog.watermark_descs.clone(),
644        source_catalog: Some(source_catalog),
645        version,
646    };
647
648    gen_table_plan_inner(context, schema_name, table_name, info, props)
649}
650
651/// On-conflict behavior either from user input or existing table catalog.
652#[derive(Clone, Copy)]
653pub enum EitherOnConflict {
654    Ast(Option<OnConflict>),
655    Resolved(ConflictBehavior),
656}
657
658impl From<Option<OnConflict>> for EitherOnConflict {
659    fn from(v: Option<OnConflict>) -> Self {
660        Self::Ast(v)
661    }
662}
663
664impl From<ConflictBehavior> for EitherOnConflict {
665    fn from(v: ConflictBehavior) -> Self {
666        Self::Resolved(v)
667    }
668}
669
670impl EitherOnConflict {
671    /// Resolves the conflict behavior based on the given information.
672    pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
673        let conflict_behavior = match self {
674            EitherOnConflict::Ast(on_conflict) => {
675                if append_only {
676                    if row_id_as_pk {
677                        // Primary key will be generated, no conflict check needed.
678                        ConflictBehavior::NoCheck
679                    } else {
680                        // User defined PK on append-only table, enforce `DO NOTHING`.
681                        if let Some(on_conflict) = on_conflict
682                            && on_conflict != OnConflict::Nothing
683                        {
684                            return Err(ErrorCode::InvalidInputSyntax(
685                                "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
686                                     the ON CONFLICT behavior must be DO NOTHING."
687                                    .to_owned(),
688                            )
689                            .into());
690                        }
691                        ConflictBehavior::IgnoreConflict
692                    }
693                } else {
694                    // Default to `UPDATE FULL` for non-append-only tables.
695                    match on_conflict.unwrap_or(OnConflict::UpdateFull) {
696                        OnConflict::UpdateFull => ConflictBehavior::Overwrite,
697                        OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
698                        OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
699                    }
700                }
701            }
702            EitherOnConflict::Resolved(b) => b,
703        };
704
705        Ok(conflict_behavior)
706    }
707}
708
709/// Arguments of the functions that generate a table plan, part 1.
710///
711/// Compared to [`CreateTableProps`], this struct contains fields that need some work of binding
712/// or resolving based on the user input.
713pub struct CreateTableInfo {
714    pub columns: Vec<ColumnCatalog>,
715    pub pk_column_ids: Vec<ColumnId>,
716    pub row_id_index: Option<usize>,
717    pub watermark_descs: Vec<WatermarkDesc>,
718    pub source_catalog: Option<SourceCatalog>,
719    pub version: TableVersion,
720}
721
722/// Arguments of the functions that generate a table plan, part 2.
723///
724/// Compared to [`CreateTableInfo`], this struct contains fields that can be (relatively) simply
725/// obtained from the input or the context.
726pub struct CreateTableProps {
727    pub definition: String,
728    pub append_only: bool,
729    pub on_conflict: EitherOnConflict,
730    pub with_version_columns: Vec<String>,
731    pub webhook_info: Option<PbWebhookSourceInfo>,
732    pub engine: Engine,
733}
734
735#[allow(clippy::too_many_arguments)]
736fn gen_table_plan_inner(
737    context: OptimizerContextRef,
738    schema_name: Option<String>,
739    table_name: String,
740    info: CreateTableInfo,
741    props: CreateTableProps,
742) -> Result<(PlanRef, TableCatalog)> {
743    let CreateTableInfo {
744        ref columns,
745        row_id_index,
746        ref watermark_descs,
747        ref source_catalog,
748        ..
749    } = info;
750    let CreateTableProps { append_only, .. } = props;
751
752    let (database_id, schema_id) = context
753        .session_ctx()
754        .get_database_and_schema_id_for_create(schema_name)?;
755
756    let session = context.session_ctx().clone();
757    let retention_seconds = context.with_options().retention_seconds();
758
759    let source_node: LogicalPlanRef = LogicalSource::new(
760        source_catalog.clone().map(Rc::new),
761        columns.clone(),
762        row_id_index,
763        SourceNodeKind::CreateTable,
764        context.clone(),
765        None,
766    )?
767    .into();
768
769    let required_cols = FixedBitSet::with_capacity(columns.len());
770    let plan_root = PlanRoot::new_with_logical_plan(
771        source_node,
772        RequiredDist::Any,
773        Order::any(),
774        required_cols,
775        vec![],
776    );
777
778    let has_non_ttl_watermark = watermark_descs.iter().any(|d| !d.with_ttl);
779
780    if !append_only && has_non_ttl_watermark {
781        return Err(ErrorCode::NotSupported(
782            "Defining watermarks on table requires the table to be append only.".to_owned(),
783            "Use the key words `APPEND ONLY`".to_owned(),
784        )
785        .into());
786    }
787
788    if !append_only && retention_seconds.is_some() {
789        if session
790            .config()
791            .unsafe_enable_storage_retention_for_non_append_only_tables()
792        {
793            tracing::warn!(
794                "Storage retention is enabled for non-append-only table {}. This may lead to stream inconsistency.",
795                table_name
796            );
797            const NOTICE: &str = "Storage retention is enabled for non-append-only table. \
798                                  This may lead to stream inconsistency and unrecoverable \
799                                  node failure if there is any row INSERT/UPDATE/DELETE operation \
800                                  corresponding to the TTLed primary keys";
801            session.notice_to_user(NOTICE);
802        } else {
803            return Err(ErrorCode::NotSupported(
804                "Defining retention seconds on table requires the table to be append only."
805                    .to_owned(),
806                "Use the key words `APPEND ONLY`".to_owned(),
807            )
808            .into());
809        }
810    }
811
812    let materialize =
813        plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
814
815    let mut table = materialize.table().clone();
816    table.owner = session.user_id();
817
818    Ok((materialize.into(), table))
819}
820
821/// Generate stream plan for cdc table based on shared source.
822/// In replace workflow, the `table_id` is the id of the table to be replaced
823/// in create table workflow, the `table_id` is a placeholder will be filled in the Meta
824#[allow(clippy::too_many_arguments)]
825pub(crate) fn gen_create_table_plan_for_cdc_table(
826    context: OptimizerContextRef,
827    source: Arc<SourceCatalog>,
828    external_table_name: String,
829    column_defs: Vec<ColumnDef>,
830    mut columns: Vec<ColumnCatalog>,
831    pk_names: Vec<String>,
832    cdc_with_options: WithOptionsSecResolved,
833    mut col_id_gen: ColumnIdGenerator,
834    on_conflict: Option<OnConflict>,
835    with_version_columns: Vec<String>,
836    include_column_options: IncludeOption,
837    table_name: ObjectName,
838    resolved_table_name: String, // table name without schema prefix
839    database_id: DatabaseId,
840    schema_id: SchemaId,
841    table_id: TableId,
842    engine: Engine,
843) -> Result<(PlanRef, TableCatalog)> {
844    let session = context.session_ctx().clone();
845
846    // append additional columns to the end
847    handle_addition_columns(
848        None,
849        &cdc_with_options,
850        include_column_options,
851        &mut columns,
852        true,
853    )?;
854
855    for c in &mut columns {
856        col_id_gen.generate(c)?;
857    }
858
859    let (mut columns, pk_column_ids, _row_id_index) =
860        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
861
862    // NOTES: In auto schema change, default value is not provided in column definition.
863    bind_sql_column_constraints(
864        context.session_ctx(),
865        table_name.real_value(),
866        &mut columns,
867        &column_defs,
868        &pk_column_ids,
869    )?;
870
871    let definition = context.normalized_sql().to_owned();
872
873    let pk_column_indices = {
874        let mut id_to_idx = HashMap::new();
875        columns.iter().enumerate().for_each(|(idx, c)| {
876            id_to_idx.insert(c.column_id(), idx);
877        });
878        // pk column id must exist in table columns.
879        pk_column_ids
880            .iter()
881            .map(|c| id_to_idx.get(c).copied().unwrap())
882            .collect_vec()
883    };
884    let table_pk = pk_column_indices
885        .iter()
886        .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
887        .collect();
888
889    let (options, secret_refs) = cdc_with_options.into_parts();
890
891    let non_generated_column_descs = columns
892        .iter()
893        .filter(|&c| !c.is_generated())
894        .map(|c| c.column_desc.clone())
895        .collect_vec();
896    let non_generated_column_num = non_generated_column_descs.len();
897    let cdc_table_type = ExternalCdcTableType::from_properties(&options);
898    let cdc_table_desc = CdcTableDesc {
899        table_id,
900        source_id: source.id, // id of cdc source streaming job
901        external_table_name: external_table_name.clone(),
902        pk: table_pk,
903        columns: non_generated_column_descs,
904        stream_key: pk_column_indices,
905        connect_properties: options,
906        secret_refs,
907    };
908
909    tracing::debug!(?cdc_table_desc, "create cdc table");
910    let options = build_cdc_scan_options_with_options(context.with_options(), &cdc_table_type)?;
911
912    let logical_scan = LogicalCdcScan::create(
913        external_table_name.clone(),
914        Rc::new(cdc_table_desc),
915        context.clone(),
916        options,
917    );
918
919    let scan_node: LogicalPlanRef = logical_scan.into();
920    let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
921    let plan_root = PlanRoot::new_with_logical_plan(
922        scan_node,
923        RequiredDist::Any,
924        Order::any(),
925        required_cols,
926        vec![],
927    );
928
929    let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
930    let materialize = plan_root.gen_table_plan(
931        context,
932        resolved_table_name,
933        database_id,
934        schema_id,
935        CreateTableInfo {
936            columns,
937            pk_column_ids,
938            row_id_index: None,
939            watermark_descs: vec![],
940            source_catalog: Some((*source).clone()),
941            version: col_id_gen.into_version(),
942        },
943        CreateTableProps {
944            definition,
945            append_only: false,
946            on_conflict: on_conflict.into(),
947            with_version_columns,
948            webhook_info: None,
949            engine,
950        },
951    )?;
952
953    let mut table = materialize.table().clone();
954    table.owner = session.user_id();
955    table.cdc_table_id = Some(cdc_table_id);
956    table.cdc_table_type = Some(cdc_table_type);
957    Ok((materialize.into(), table))
958}
959
960/// Derive connector properties and normalize `external_table_name` for CDC tables.
961///
962/// Returns (`connector_properties`, `normalized_external_table_name`) where:
963/// - For SQL Server: Normalizes 'db.schema.table' (3 parts) to 'schema.table' (2 parts),
964///   because users can optionally include database name for verification, but it needs to be
965///   stripped to match the format returned by Debezium's `extract_table_name()`.
966/// - For MySQL/Postgres: Returns the original `external_table_name` unchanged.
967fn derive_with_options_for_cdc_table(
968    source_with_properties: &WithOptionsSecResolved,
969    external_table_name: String,
970) -> Result<(WithOptionsSecResolved, String)> {
971    use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
972    // we should remove the prefix from `full_table_name`
973    let source_database_name: &str = source_with_properties
974        .get("database.name")
975        .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
976        .as_str();
977    let mut with_options = source_with_properties.clone();
978    if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
979        match connector.as_str() {
980            MYSQL_CDC_CONNECTOR => {
981                // MySQL doesn't allow '.' in database name and table name, so we can split the
982                // external table name by '.' to get the table name
983                let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
984                    anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
985                })?;
986                // We allow multiple database names in the source definition
987                if !source_database_name
988                    .split(',')
989                    .map(|s| s.trim())
990                    .any(|name| name == db_name)
991                {
992                    return Err(anyhow!(
993                        "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
994                        db_name,
995                        source_database_name
996                    ).into());
997                }
998                with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
999                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1000                // Return original external_table_name unchanged for MySQL
1001                return Ok((with_options, external_table_name));
1002            }
1003            POSTGRES_CDC_CONNECTOR => {
1004                let (schema_name, table_name) = external_table_name
1005                    .split_once('.')
1006                    .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
1007
1008                // insert 'schema.name' into connect properties
1009                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1010                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1011                // Return original external_table_name unchanged for Postgres
1012                return Ok((with_options, external_table_name));
1013            }
1014            SQL_SERVER_CDC_CONNECTOR => {
1015                // SQL Server external table name must be in one of two formats:
1016                // 1. 'schemaName.tableName' (2 parts) - database is already specified in source
1017                // 2. 'databaseName.schemaName.tableName' (3 parts) - for explicit verification
1018                //
1019                // We do NOT allow single table name (e.g., 't') because:
1020                // - Unlike database name (already in source), schema name is NOT pre-specified
1021                // - User must explicitly provide schema (even if it's 'dbo')
1022                let parts: Vec<&str> = external_table_name.split('.').collect();
1023                let (schema_name, table_name) = match parts.len() {
1024                    3 => {
1025                        // Format: database.schema.table
1026                        // Verify that the database name matches the one in source definition
1027                        let db_name = parts[0];
1028                        let schema_name = parts[1];
1029                        let table_name = parts[2];
1030
1031                        if db_name != source_database_name {
1032                            return Err(anyhow!(
1033                                "The database name '{}' in FROM clause does not match the database name '{}' specified in source definition. \
1034                                 You can either use 'schema.table' format (recommended) or ensure the database name matches.",
1035                                db_name,
1036                                source_database_name
1037                            ).into());
1038                        }
1039                        (schema_name, table_name)
1040                    }
1041                    2 => {
1042                        // Format: schema.table (recommended)
1043                        // Database name is taken from source definition
1044                        let schema_name = parts[0];
1045                        let table_name = parts[1];
1046                        (schema_name, table_name)
1047                    }
1048                    1 => {
1049                        // Format: table only
1050                        // Reject with clear error message
1051                        return Err(anyhow!(
1052                            "Invalid table name format '{}'. For SQL Server CDC, you must specify the schema name. \
1053                             Use 'schema.table' format (e.g., 'dbo.{}') or 'database.schema.table' format (e.g., '{}.dbo.{}').",
1054                            external_table_name,
1055                            external_table_name,
1056                            source_database_name,
1057                            external_table_name
1058                        ).into());
1059                    }
1060                    _ => {
1061                        // Invalid format (4+ parts or empty)
1062                        return Err(anyhow!(
1063                            "Invalid table name format '{}'. Expected 'schema.table' or 'database.schema.table'.",
1064                            external_table_name
1065                        ).into());
1066                    }
1067                };
1068
1069                // Insert schema and table names into connector properties
1070                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1071                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1072
1073                // Normalize external_table_name to 'schema.table' format
1074                // This ensures consistency with extract_table_name() in message.rs
1075                let normalized_external_table_name = format!("{}.{}", schema_name, table_name);
1076                return Ok((with_options, normalized_external_table_name));
1077            }
1078            _ => {
1079                return Err(RwError::from(anyhow!(
1080                    "connector {} is not supported for cdc table",
1081                    connector
1082                )));
1083            }
1084        };
1085    }
1086    unreachable!("All valid CDC connectors should have returned by now")
1087}
1088
1089#[allow(clippy::too_many_arguments)]
1090pub(super) async fn handle_create_table_plan(
1091    handler_args: HandlerArgs,
1092    explain_options: ExplainOptions,
1093    format_encode: Option<FormatEncodeOptions>,
1094    cdc_table_info: Option<CdcTableInfo>,
1095    table_name: &ObjectName,
1096    column_defs: Vec<ColumnDef>,
1097    wildcard_idx: Option<usize>,
1098    constraints: Vec<TableConstraint>,
1099    source_watermarks: Vec<SourceWatermark>,
1100    append_only: bool,
1101    on_conflict: Option<OnConflict>,
1102    with_version_columns: Vec<String>,
1103    include_column_options: IncludeOption,
1104    webhook_info: Option<WebhookSourceInfo>,
1105    engine: Engine,
1106) -> Result<(
1107    PlanRef,
1108    Option<SourceCatalog>,
1109    TableCatalog,
1110    TableJobType,
1111    Option<SourceId>,
1112)> {
1113    let col_id_gen = ColumnIdGenerator::new_initial();
1114    let format_encode = check_create_table_with_source(
1115        &handler_args.with_options,
1116        format_encode,
1117        &include_column_options,
1118        &cdc_table_info,
1119    )?;
1120    let webhook_info = webhook_info
1121        .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1122        .transpose()?;
1123
1124    let props = CreateTableProps {
1125        definition: handler_args.normalized_sql.clone(),
1126        append_only,
1127        on_conflict: on_conflict.into(),
1128        with_version_columns: with_version_columns.clone(),
1129        webhook_info,
1130        engine,
1131    };
1132
1133    let ((plan, source, table), job_type, shared_shource_id) = match (
1134        format_encode,
1135        cdc_table_info.as_ref(),
1136    ) {
1137        (Some(format_encode), None) => (
1138            gen_create_table_plan_with_source(
1139                handler_args,
1140                explain_options,
1141                table_name.clone(),
1142                column_defs,
1143                wildcard_idx,
1144                constraints,
1145                format_encode,
1146                source_watermarks,
1147                col_id_gen,
1148                include_column_options,
1149                props,
1150                SqlColumnStrategy::FollowChecked,
1151            )
1152            .await?,
1153            TableJobType::General,
1154            None,
1155        ),
1156        (None, None) => {
1157            let context = OptimizerContext::new(handler_args, explain_options);
1158            let (plan, table) = gen_create_table_plan(
1159                context,
1160                table_name.clone(),
1161                column_defs,
1162                constraints,
1163                col_id_gen,
1164                source_watermarks,
1165                props,
1166                false,
1167            )?;
1168
1169            ((plan, None, table), TableJobType::General, None)
1170        }
1171
1172        (None, Some(cdc_table)) => {
1173            sanity_check_for_table_on_cdc_source(
1174                append_only,
1175                &column_defs,
1176                &wildcard_idx,
1177                &constraints,
1178                &source_watermarks,
1179            )?;
1180
1181            generated_columns_check_for_cdc_table(&column_defs)?;
1182            not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1183
1184            let session = &handler_args.session;
1185            let db_name = &session.database();
1186            let user_name = &session.user_name();
1187            let search_path = session.config().search_path();
1188            let (schema_name, resolved_table_name) =
1189                Binder::resolve_schema_qualified_name(db_name, table_name)?;
1190            let (database_id, schema_id) =
1191                session.get_database_and_schema_id_for_create(schema_name.clone())?;
1192
1193            // cdc table cannot be append-only
1194            let (source_schema, source_name) =
1195                Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
1196
1197            let source = {
1198                let catalog_reader = session.env().catalog_reader().read_guard();
1199                let schema_path =
1200                    SchemaPath::new(source_schema.as_deref(), &search_path, user_name);
1201
1202                let (source, _) = catalog_reader.get_source_by_name(
1203                    db_name,
1204                    schema_path,
1205                    source_name.as_str(),
1206                )?;
1207                source.clone()
1208            };
1209            let (cdc_with_options, normalized_external_table_name) =
1210                derive_with_options_for_cdc_table(
1211                    &source.with_properties,
1212                    cdc_table.external_table_name.clone(),
1213                )?;
1214
1215            let (columns, pk_names) = match wildcard_idx {
1216                Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1217                None => {
1218                    for column_def in &column_defs {
1219                        for option_def in &column_def.options {
1220                            if let ColumnOption::DefaultValue(_)
1221                            | ColumnOption::DefaultValueInternal { .. } = option_def.option
1222                            {
1223                                return Err(ErrorCode::NotSupported(
1224                                            "Default value for columns defined on the table created from a CDC source".into(),
1225                                            "Remove the default value expression in the column definitions".into(),
1226                                        )
1227                                            .into());
1228                            }
1229                        }
1230                    }
1231
1232                    let (columns, pk_names) =
1233                        bind_cdc_table_schema(&column_defs, &constraints, false)?;
1234                    // read default value definition from external db
1235                    let (options, secret_refs) = cdc_with_options.clone().into_parts();
1236                    let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1237                        .context("failed to extract external table config")?;
1238
1239                    (columns, pk_names)
1240                }
1241            };
1242
1243            let context: OptimizerContextRef =
1244                OptimizerContext::new(handler_args, explain_options).into();
1245            let shared_source_id = source.id;
1246            let (plan, table) = gen_create_table_plan_for_cdc_table(
1247                context,
1248                source,
1249                normalized_external_table_name,
1250                column_defs,
1251                columns,
1252                pk_names,
1253                cdc_with_options,
1254                col_id_gen,
1255                on_conflict,
1256                with_version_columns,
1257                include_column_options,
1258                table_name.clone(),
1259                resolved_table_name,
1260                database_id,
1261                schema_id,
1262                TableId::placeholder(),
1263                engine,
1264            )?;
1265
1266            (
1267                (plan, None, table),
1268                TableJobType::SharedCdcSource,
1269                Some(shared_source_id),
1270            )
1271        }
1272        (Some(_), Some(_)) => {
1273            return Err(ErrorCode::NotSupported(
1274                "Data format and encoding format doesn't apply to table created from a CDC source"
1275                    .into(),
1276                "Remove the FORMAT and ENCODE specification".into(),
1277            )
1278            .into());
1279        }
1280    };
1281    Ok((plan, source, table, job_type, shared_shource_id))
1282}
1283
1284// For both table from cdc source and table with cdc connector
1285fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1286    let mut found_generated_column = false;
1287    for column in columns {
1288        let mut is_generated = false;
1289
1290        for option_def in &column.options {
1291            if let ColumnOption::GeneratedColumns(_) = option_def.option {
1292                is_generated = true;
1293                break;
1294            }
1295        }
1296
1297        if is_generated {
1298            found_generated_column = true;
1299        } else if found_generated_column {
1300            return Err(ErrorCode::NotSupported(
1301                "Non-generated column found after a generated column.".into(),
1302                "Ensure that all generated columns appear at the end of the cdc table definition."
1303                    .into(),
1304            )
1305            .into());
1306        }
1307    }
1308    Ok(())
1309}
1310
1311// For both table from cdc source and table with cdc connector
1312fn not_null_check_for_cdc_table(
1313    wildcard_idx: &Option<usize>,
1314    column_defs: &Vec<ColumnDef>,
1315) -> Result<()> {
1316    if !wildcard_idx.is_some()
1317        && column_defs.iter().any(|col| {
1318            col.options
1319                .iter()
1320                .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1321        })
1322    {
1323        return Err(ErrorCode::NotSupported(
1324            "CDC table with NOT NULL constraint is not supported".to_owned(),
1325            "Please remove the NOT NULL constraint for columns".to_owned(),
1326        )
1327        .into());
1328    }
1329    Ok(())
1330}
1331
1332// Only for table from cdc source
1333fn sanity_check_for_table_on_cdc_source(
1334    append_only: bool,
1335    column_defs: &Vec<ColumnDef>,
1336    wildcard_idx: &Option<usize>,
1337    constraints: &Vec<TableConstraint>,
1338    source_watermarks: &Vec<SourceWatermark>,
1339) -> Result<()> {
1340    // wildcard cannot be used with column definitions
1341    if wildcard_idx.is_some() && !column_defs.is_empty() {
1342        return Err(ErrorCode::NotSupported(
1343            "wildcard(*) and column definitions cannot be used together".to_owned(),
1344            "Remove the wildcard or column definitions".to_owned(),
1345        )
1346        .into());
1347    }
1348
1349    // cdc table must have primary key constraint or primary key column
1350    if !wildcard_idx.is_some()
1351        && !constraints.iter().any(|c| {
1352            matches!(
1353                c,
1354                TableConstraint::Unique {
1355                    is_primary: true,
1356                    ..
1357                }
1358            )
1359        })
1360        && !column_defs.iter().any(|col| {
1361            col.options
1362                .iter()
1363                .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1364        })
1365    {
1366        return Err(ErrorCode::NotSupported(
1367            "CDC table without primary key constraint is not supported".to_owned(),
1368            "Please define a primary key".to_owned(),
1369        )
1370        .into());
1371    }
1372
1373    if append_only {
1374        return Err(ErrorCode::NotSupported(
1375            "append only modifier on the table created from a CDC source".into(),
1376            "Remove the APPEND ONLY clause".into(),
1377        )
1378        .into());
1379    }
1380
1381    if !source_watermarks.is_empty() {
1382        return Err(ErrorCode::NotSupported(
1383            "watermark defined on the table created from a CDC source".into(),
1384            "Remove the Watermark definitions".into(),
1385        )
1386        .into());
1387    }
1388
1389    Ok(())
1390}
1391
1392/// Derive schema for cdc table when create a new Table or alter an existing Table
1393async fn bind_cdc_table_schema_externally(
1394    cdc_with_options: WithOptionsSecResolved,
1395) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1396    // read cdc table schema from external db or parsing the schema from SQL definitions
1397    let (options, secret_refs) = cdc_with_options.into_parts();
1398    let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1399        .context("failed to extract external table config")?;
1400
1401    let table = ExternalTableImpl::connect(config)
1402        .await
1403        .context("failed to auto derive table schema")?;
1404
1405    Ok((
1406        table
1407            .column_descs()
1408            .iter()
1409            .cloned()
1410            .map(|column_desc| ColumnCatalog {
1411                column_desc,
1412                is_hidden: false,
1413            })
1414            .collect(),
1415        table.pk_names().clone(),
1416    ))
1417}
1418
1419/// Derive schema for cdc table when create a new Table or alter an existing Table
1420fn bind_cdc_table_schema(
1421    column_defs: &Vec<ColumnDef>,
1422    constraints: &Vec<TableConstraint>,
1423    is_for_replace_plan: bool,
1424) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1425    let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1426
1427    let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1428    Ok((columns, pk_names))
1429}
1430
1431#[allow(clippy::too_many_arguments)]
1432pub async fn handle_create_table(
1433    handler_args: HandlerArgs,
1434    table_name: ObjectName,
1435    column_defs: Vec<ColumnDef>,
1436    wildcard_idx: Option<usize>,
1437    constraints: Vec<TableConstraint>,
1438    if_not_exists: bool,
1439    format_encode: Option<FormatEncodeOptions>,
1440    source_watermarks: Vec<SourceWatermark>,
1441    append_only: bool,
1442    on_conflict: Option<OnConflict>,
1443    with_version_columns: Vec<String>,
1444    cdc_table_info: Option<CdcTableInfo>,
1445    include_column_options: IncludeOption,
1446    webhook_info: Option<WebhookSourceInfo>,
1447    ast_engine: risingwave_sqlparser::ast::Engine,
1448) -> Result<RwPgResponse> {
1449    let session = handler_args.session.clone();
1450
1451    if append_only {
1452        session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1453    }
1454
1455    session.check_cluster_limits().await?;
1456
1457    let engine = match ast_engine {
1458        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1459        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1460    };
1461
1462    if let Either::Right(resp) = session.check_relation_name_duplicated(
1463        table_name.clone(),
1464        StatementType::CREATE_TABLE,
1465        if_not_exists,
1466    )? {
1467        return Ok(resp);
1468    }
1469
1470    let (graph, source, hummock_table, job_type, shared_source_id) = {
1471        let (plan, source, table, job_type, shared_source_id) = handle_create_table_plan(
1472            handler_args.clone(),
1473            ExplainOptions::default(),
1474            format_encode,
1475            cdc_table_info,
1476            &table_name,
1477            column_defs.clone(),
1478            wildcard_idx,
1479            constraints.clone(),
1480            source_watermarks,
1481            append_only,
1482            on_conflict,
1483            with_version_columns,
1484            include_column_options,
1485            webhook_info,
1486            engine,
1487        )
1488        .await?;
1489        tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1490
1491        let graph = build_graph(plan, Some(GraphJobType::Table))?;
1492
1493        (graph, source, table, job_type, shared_source_id)
1494    };
1495
1496    tracing::trace!(
1497        "name={}, graph=\n{}",
1498        table_name,
1499        serde_json::to_string_pretty(&graph).unwrap()
1500    );
1501
1502    let dependencies = shared_source_id
1503        .map(|id| HashSet::from([id.as_object_id()]))
1504        .unwrap_or_default();
1505
1506    // Handle engine
1507    match engine {
1508        Engine::Hummock => {
1509            let catalog_writer = session.catalog_writer()?;
1510            let action = match job_type {
1511                TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
1512                _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
1513            };
1514            execute_with_long_running_notification(
1515                catalog_writer.create_table(
1516                    source.map(|s| s.to_prost()),
1517                    hummock_table.to_prost(),
1518                    graph,
1519                    job_type,
1520                    if_not_exists,
1521                    dependencies,
1522                ),
1523                &session,
1524                "CREATE TABLE",
1525                action,
1526            )
1527            .await?;
1528        }
1529        Engine::Iceberg => {
1530            let hummock_table_name = hummock_table.name.clone();
1531            session.create_staging_table(hummock_table.clone());
1532            let res = create_iceberg_engine_table(
1533                session.clone(),
1534                handler_args,
1535                source.map(|s| s.to_prost()),
1536                hummock_table,
1537                graph,
1538                table_name,
1539                job_type,
1540                if_not_exists,
1541            )
1542            .await;
1543            session.drop_staging_table(&hummock_table_name);
1544            res?
1545        }
1546    }
1547
1548    Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1549}
1550
1551/// Iceberg table engine is composed of hummock table, iceberg sink and iceberg source.
1552///
1553/// 1. fetch iceberg engine options from the meta node. Or use iceberg engine connection provided by users.
1554/// 2. create a hummock table
1555/// 3. create an iceberg sink
1556/// 4. create an iceberg source
1557///
1558/// See <https://github.com/risingwavelabs/risingwave/issues/21586> for an architecture diagram.
1559#[allow(clippy::too_many_arguments)]
1560pub async fn create_iceberg_engine_table(
1561    session: Arc<SessionImpl>,
1562    handler_args: HandlerArgs,
1563    mut source: Option<PbSource>,
1564    table: TableCatalog,
1565    graph: StreamFragmentGraph,
1566    table_name: ObjectName,
1567    job_type: PbTableJobType,
1568    if_not_exists: bool,
1569) -> Result<()> {
1570    let rw_db_name = session
1571        .env()
1572        .catalog_reader()
1573        .read_guard()
1574        .get_database_by_id(table.database_id)?
1575        .name()
1576        .to_owned();
1577    let rw_schema_name = session
1578        .env()
1579        .catalog_reader()
1580        .read_guard()
1581        .get_schema_by_id(table.database_id, table.schema_id)?
1582        .name()
1583        .clone();
1584    let iceberg_catalog_name = rw_db_name.clone();
1585    let iceberg_database_name = rw_schema_name.clone();
1586    let iceberg_table_name = table_name.0.last().unwrap().real_value();
1587
1588    let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1589    let sink_decouple = session.config().sink_decouple();
1590    if matches!(sink_decouple, SinkDecouple::Disable) {
1591        bail!(
1592            "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1593        );
1594    }
1595
1596    let mut connection_ref = BTreeMap::new();
1597    let with_common = if iceberg_engine_connection.is_empty() {
1598        bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1599    } else {
1600        let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1601        assert_eq!(parts.len(), 2);
1602        let connection_catalog =
1603            session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1604        if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1605            if params.connection_type == ConnectionType::Iceberg as i32 {
1606                // With iceberg engine connection:
1607                connection_ref.insert(
1608                    "connection".to_owned(),
1609                    ConnectionRefValue {
1610                        connection_name: ObjectName::from(vec![
1611                            Ident::from(parts[0]),
1612                            Ident::from(parts[1]),
1613                        ]),
1614                    },
1615                );
1616
1617                let mut with_common = BTreeMap::new();
1618                with_common.insert("connector".to_owned(), "iceberg".to_owned());
1619                with_common.insert("database.name".to_owned(), iceberg_database_name);
1620                with_common.insert("table.name".to_owned(), iceberg_table_name);
1621
1622                let hosted_catalog = params
1623                    .properties
1624                    .get("hosted_catalog")
1625                    .map(|s| s.eq_ignore_ascii_case("true"))
1626                    .unwrap_or(false);
1627                if hosted_catalog {
1628                    let meta_client = session.env().meta_client();
1629                    let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1630
1631                    let meta_store_endpoint =
1632                        url::Url::parse(&meta_store_endpoint).map_err(|_| {
1633                            ErrorCode::InternalError(
1634                                "failed to parse the meta store endpoint".to_owned(),
1635                            )
1636                        })?;
1637                    let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1638                    let meta_store_user = meta_store_endpoint.username().to_owned();
1639                    let meta_store_password = match meta_store_endpoint.password() {
1640                        Some(password) => percent_decode_str(password)
1641                            .decode_utf8()
1642                            .map_err(|_| {
1643                                ErrorCode::InternalError(
1644                                    "failed to parse password from meta store endpoint".to_owned(),
1645                                )
1646                            })?
1647                            .into_owned(),
1648                        None => "".to_owned(),
1649                    };
1650                    let meta_store_host = meta_store_endpoint
1651                        .host_str()
1652                        .ok_or_else(|| {
1653                            ErrorCode::InternalError(
1654                                "failed to parse host from meta store endpoint".to_owned(),
1655                            )
1656                        })?
1657                        .to_owned();
1658                    let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1659                        ErrorCode::InternalError(
1660                            "failed to parse port from meta store endpoint".to_owned(),
1661                        )
1662                    })?;
1663                    let meta_store_database = meta_store_endpoint
1664                        .path()
1665                        .trim_start_matches('/')
1666                        .to_owned();
1667
1668                    let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1669                        bail!("failed to parse meta backend: {}", meta_store_backend);
1670                    };
1671
1672                    let catalog_uri = match meta_backend {
1673                        MetaBackend::Postgres => {
1674                            format!(
1675                                "jdbc:postgresql://{}:{}/{}",
1676                                meta_store_host, meta_store_port, meta_store_database
1677                            )
1678                        }
1679                        MetaBackend::Mysql => {
1680                            format!(
1681                                "jdbc:mysql://{}:{}/{}",
1682                                meta_store_host, meta_store_port, meta_store_database
1683                            )
1684                        }
1685                        MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1686                            bail!(
1687                                "Unsupported meta backend for iceberg engine table: {}",
1688                                meta_store_backend
1689                            );
1690                        }
1691                    };
1692
1693                    with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1694                    with_common.insert("catalog.uri".to_owned(), catalog_uri);
1695                    with_common.insert("catalog.jdbc.user".to_owned(), meta_store_user);
1696                    with_common.insert("catalog.jdbc.password".to_owned(), meta_store_password);
1697                    with_common.insert("catalog.name".to_owned(), iceberg_catalog_name);
1698                }
1699
1700                with_common
1701            } else {
1702                return Err(RwError::from(ErrorCode::InvalidParameterValue(
1703                    "Only iceberg connection could be used in iceberg engine".to_owned(),
1704                )));
1705            }
1706        } else {
1707            return Err(RwError::from(ErrorCode::InvalidParameterValue(
1708                "Private Link Service has been deprecated. Please create a new connection instead."
1709                    .to_owned(),
1710            )));
1711        }
1712    };
1713
1714    // Iceberg sinks require a primary key, if none is provided, we will use the _row_id column
1715    // Fetch primary key from columns
1716    let mut pks = table
1717        .pk_column_names()
1718        .iter()
1719        .map(|c| c.to_string())
1720        .collect::<Vec<String>>();
1721
1722    // For the table without primary key. We will use `_row_id` as primary key
1723    let sink_from = if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1724        pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1725        let [stmt]: [_; 1] = Parser::parse_sql(&format!(
1726            "select {} as {}, * from {}",
1727            ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, table_name
1728        ))
1729        .context("unable to parse query")?
1730        .try_into()
1731        .unwrap();
1732
1733        let Statement::Query(query) = &stmt else {
1734            panic!("unexpected statement: {:?}", stmt);
1735        };
1736        CreateSink::AsQuery(query.clone())
1737    } else {
1738        CreateSink::From(table_name.clone())
1739    };
1740
1741    let mut sink_name = table_name.clone();
1742    *sink_name.0.last_mut().unwrap() = Ident::from(
1743        (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1744    );
1745    let create_sink_stmt = CreateSinkStatement {
1746        if_not_exists: false,
1747        sink_name,
1748        with_properties: WithProperties(vec![]),
1749        sink_from,
1750        columns: vec![],
1751        emit_mode: None,
1752        sink_schema: None,
1753        into_table_name: None,
1754    };
1755
1756    let mut sink_handler_args = handler_args.clone();
1757
1758    let mut sink_with = with_common.clone();
1759
1760    if table.append_only {
1761        sink_with.insert("type".to_owned(), "append-only".to_owned());
1762    } else {
1763        sink_with.insert("primary_key".to_owned(), pks.join(","));
1764        sink_with.insert("type".to_owned(), "upsert".to_owned());
1765    }
1766    // sink_with.insert(SINK_SNAPSHOT_OPTION.to_owned(), "false".to_owned());
1767    //
1768    // Note: in theory, we don't need to backfill from the table to the sink,
1769    // but we don't have atomic DDL now https://github.com/risingwavelabs/risingwave/issues/21863
1770    // so it may have potential data loss problem on the first barrier.
1771    //
1772    // For non-append-only table, we can always solve it by the initial sink with backfill, since
1773    // data will be present in hummock table.
1774    //
1775    // For append-only table, we need to be more careful.
1776    //
1777    // The possible cases for a table:
1778    // - For table without connector: it doesn't matter, since there's no data before the table is created
1779    // - For table with connector: we workarounded it by setting SOURCE_RATE_LIMIT to 0
1780    //   + If we support blocking DDL for table with connector, we need to be careful.
1781    // - For table with an upstream job: Specifically, CDC table from shared CDC source.
1782    //   + Data may come from both upstream connector, and CDC table backfill, so we need to pause both of them.
1783    //   + For now we don't support APPEND ONLY CDC table, so it's safe.
1784    let commit_checkpoint_interval = handler_args
1785        .with_options
1786        .get(COMMIT_CHECKPOINT_INTERVAL)
1787        .map(|v| v.to_owned())
1788        .unwrap_or_else(|| "60".to_owned());
1789    let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1790        ErrorCode::InvalidInputSyntax(format!(
1791            "commit_checkpoint_interval must be greater than 0: {}",
1792            commit_checkpoint_interval
1793        ))
1794    })?;
1795
1796    if commit_checkpoint_interval == 0 {
1797        bail!("commit_checkpoint_interval must be greater than 0");
1798    }
1799
1800    // remove commit_checkpoint_interval from source options, otherwise it will be considered as an unknown field.
1801    source
1802        .as_mut()
1803        .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1804
1805    let sink_decouple = session.config().sink_decouple();
1806    if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1807        bail!(
1808            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1809        )
1810    }
1811
1812    sink_with.insert(
1813        COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1814        commit_checkpoint_interval.to_string(),
1815    );
1816    sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1817
1818    sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1819
1820    if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1821        match enable_compaction.to_lowercase().as_str() {
1822            "true" => {
1823                sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1824            }
1825            "false" => {
1826                sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1827            }
1828            _ => {
1829                return Err(ErrorCode::InvalidInputSyntax(format!(
1830                    "enable_compaction must be true or false: {}",
1831                    enable_compaction
1832                ))
1833                .into());
1834            }
1835        }
1836
1837        // remove enable_compaction from source options, otherwise it will be considered as an unknown field.
1838        source
1839            .as_mut()
1840            .map(|x| x.with_properties.remove("enable_compaction"));
1841    } else {
1842        sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1843    }
1844
1845    if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1846        let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1847            ErrorCode::InvalidInputSyntax(format!(
1848                "compaction_interval_sec must be greater than 0: {}",
1849                commit_checkpoint_interval
1850            ))
1851        })?;
1852        if compaction_interval_sec == 0 {
1853            bail!("compaction_interval_sec must be greater than 0");
1854        }
1855        sink_with.insert(
1856            "compaction_interval_sec".to_owned(),
1857            compaction_interval_sec.to_string(),
1858        );
1859        // remove compaction_interval_sec from source options, otherwise it will be considered as an unknown field.
1860        source
1861            .as_mut()
1862            .map(|x| x.with_properties.remove("compaction_interval_sec"));
1863    }
1864
1865    let has_enabled_snapshot_expiration = if let Some(enable_snapshot_expiration) =
1866        handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1867    {
1868        // remove enable_snapshot_expiration from source options, otherwise it will be considered as an unknown field.
1869        source
1870            .as_mut()
1871            .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1872        match enable_snapshot_expiration.to_lowercase().as_str() {
1873            "true" => {
1874                sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1875                true
1876            }
1877            "false" => {
1878                sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1879                false
1880            }
1881            _ => {
1882                return Err(ErrorCode::InvalidInputSyntax(format!(
1883                    "enable_snapshot_expiration must be true or false: {}",
1884                    enable_snapshot_expiration
1885                ))
1886                .into());
1887            }
1888        }
1889    } else {
1890        sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1891        true
1892    };
1893
1894    if has_enabled_snapshot_expiration {
1895        // configuration for snapshot expiration
1896        if let Some(snapshot_expiration_retain_last) = handler_args
1897            .with_options
1898            .get(SNAPSHOT_EXPIRATION_RETAIN_LAST)
1899        {
1900            sink_with.insert(
1901                SNAPSHOT_EXPIRATION_RETAIN_LAST.to_owned(),
1902                snapshot_expiration_retain_last.to_owned(),
1903            );
1904            // remove snapshot_expiration_retain_last from source options, otherwise it will be considered as an unknown field.
1905            source
1906                .as_mut()
1907                .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_RETAIN_LAST));
1908        }
1909
1910        if let Some(snapshot_expiration_max_age) = handler_args
1911            .with_options
1912            .get(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS)
1913        {
1914            sink_with.insert(
1915                SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS.to_owned(),
1916                snapshot_expiration_max_age.to_owned(),
1917            );
1918            // remove snapshot_expiration_max_age from source options, otherwise it will be considered as an unknown field.
1919            source
1920                .as_mut()
1921                .map(|x| x.with_properties.remove(SNAPSHOT_EXPIRATION_MAX_AGE_MILLIS));
1922        }
1923
1924        if let Some(snapshot_expiration_clear_expired_files) = handler_args
1925            .with_options
1926            .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1927        {
1928            sink_with.insert(
1929                SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES.to_owned(),
1930                snapshot_expiration_clear_expired_files.to_owned(),
1931            );
1932            // remove snapshot_expiration_clear_expired_files from source options, otherwise it will be considered as an unknown field.
1933            source.as_mut().map(|x| {
1934                x.with_properties
1935                    .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_FILES)
1936            });
1937        }
1938
1939        if let Some(snapshot_expiration_clear_expired_meta_data) = handler_args
1940            .with_options
1941            .get(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1942        {
1943            sink_with.insert(
1944                SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA.to_owned(),
1945                snapshot_expiration_clear_expired_meta_data.to_owned(),
1946            );
1947            // remove snapshot_expiration_clear_expired_meta_data from source options, otherwise it will be considered as an unknown field.
1948            source.as_mut().map(|x| {
1949                x.with_properties
1950                    .remove(SNAPSHOT_EXPIRATION_CLEAR_EXPIRED_META_DATA)
1951            });
1952        }
1953    }
1954
1955    if let Some(write_mode) = handler_args.with_options.get(WRITE_MODE) {
1956        let write_mode = IcebergWriteMode::try_from(write_mode.as_str()).map_err(|_| {
1957            ErrorCode::InvalidInputSyntax(format!(
1958                "invalid write_mode: {}, must be one of: {}, {}",
1959                write_mode, ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
1960            ))
1961        })?;
1962
1963        match write_mode {
1964            IcebergWriteMode::MergeOnRead => {
1965                sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
1966            }
1967
1968            IcebergWriteMode::CopyOnWrite => {
1969                if table.append_only {
1970                    return Err(ErrorCode::NotSupported(
1971                        "COPY ON WRITE is not supported for append-only iceberg table".to_owned(),
1972                        "Please use MERGE ON READ instead".to_owned(),
1973                    )
1974                    .into());
1975                }
1976
1977                sink_with.insert(WRITE_MODE.to_owned(), write_mode.as_str().to_owned());
1978            }
1979        }
1980
1981        // remove write_mode from source options, otherwise it will be considered as an unknown field.
1982        source
1983            .as_mut()
1984            .map(|x| x.with_properties.remove("write_mode"));
1985    } else {
1986        sink_with.insert(
1987            WRITE_MODE.to_owned(),
1988            ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
1989        );
1990    }
1991
1992    if let Some(max_snapshots_num_before_compaction) =
1993        handler_args.with_options.get(COMPACTION_MAX_SNAPSHOTS_NUM)
1994    {
1995        let max_snapshots_num_before_compaction = max_snapshots_num_before_compaction
1996            .parse::<u32>()
1997            .map_err(|_| {
1998                ErrorCode::InvalidInputSyntax(format!(
1999                    "{} must be greater than 0: {}",
2000                    COMPACTION_MAX_SNAPSHOTS_NUM, max_snapshots_num_before_compaction
2001                ))
2002            })?;
2003
2004        if max_snapshots_num_before_compaction == 0 {
2005            bail!(format!(
2006                "{} must be greater than 0",
2007                COMPACTION_MAX_SNAPSHOTS_NUM
2008            ));
2009        }
2010
2011        sink_with.insert(
2012            COMPACTION_MAX_SNAPSHOTS_NUM.to_owned(),
2013            max_snapshots_num_before_compaction.to_string(),
2014        );
2015
2016        // remove from source options, otherwise it will be considered as an unknown field.
2017        source
2018            .as_mut()
2019            .map(|x| x.with_properties.remove(COMPACTION_MAX_SNAPSHOTS_NUM));
2020    }
2021
2022    if let Some(small_files_threshold_mb) = handler_args
2023        .with_options
2024        .get(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2025    {
2026        let small_files_threshold_mb = small_files_threshold_mb.parse::<u64>().map_err(|_| {
2027            ErrorCode::InvalidInputSyntax(format!(
2028                "{} must be greater than 0: {}",
2029                COMPACTION_SMALL_FILES_THRESHOLD_MB, small_files_threshold_mb
2030            ))
2031        })?;
2032        if small_files_threshold_mb == 0 {
2033            bail!(format!(
2034                "{} must be a greater than 0",
2035                COMPACTION_SMALL_FILES_THRESHOLD_MB
2036            ));
2037        }
2038        sink_with.insert(
2039            COMPACTION_SMALL_FILES_THRESHOLD_MB.to_owned(),
2040            small_files_threshold_mb.to_string(),
2041        );
2042
2043        // remove from source options, otherwise it will be considered as an unknown field.
2044        source.as_mut().map(|x| {
2045            x.with_properties
2046                .remove(COMPACTION_SMALL_FILES_THRESHOLD_MB)
2047        });
2048    }
2049
2050    if let Some(delete_files_count_threshold) = handler_args
2051        .with_options
2052        .get(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2053    {
2054        let delete_files_count_threshold =
2055            delete_files_count_threshold.parse::<usize>().map_err(|_| {
2056                ErrorCode::InvalidInputSyntax(format!(
2057                    "{} must be greater than 0: {}",
2058                    COMPACTION_DELETE_FILES_COUNT_THRESHOLD, delete_files_count_threshold
2059                ))
2060            })?;
2061        if delete_files_count_threshold == 0 {
2062            bail!(format!(
2063                "{} must be greater than 0",
2064                COMPACTION_DELETE_FILES_COUNT_THRESHOLD
2065            ));
2066        }
2067        sink_with.insert(
2068            COMPACTION_DELETE_FILES_COUNT_THRESHOLD.to_owned(),
2069            delete_files_count_threshold.to_string(),
2070        );
2071
2072        // remove from source options, otherwise it will be considered as an unknown field.
2073        source.as_mut().map(|x| {
2074            x.with_properties
2075                .remove(COMPACTION_DELETE_FILES_COUNT_THRESHOLD)
2076        });
2077    }
2078
2079    if let Some(trigger_snapshot_count) = handler_args
2080        .with_options
2081        .get(COMPACTION_TRIGGER_SNAPSHOT_COUNT)
2082    {
2083        let trigger_snapshot_count = trigger_snapshot_count.parse::<usize>().map_err(|_| {
2084            ErrorCode::InvalidInputSyntax(format!(
2085                "{} must be greater than 0: {}",
2086                COMPACTION_TRIGGER_SNAPSHOT_COUNT, trigger_snapshot_count
2087            ))
2088        })?;
2089        if trigger_snapshot_count == 0 {
2090            bail!(format!(
2091                "{} must be greater than 0",
2092                COMPACTION_TRIGGER_SNAPSHOT_COUNT
2093            ));
2094        }
2095        sink_with.insert(
2096            COMPACTION_TRIGGER_SNAPSHOT_COUNT.to_owned(),
2097            trigger_snapshot_count.to_string(),
2098        );
2099
2100        // remove from source options, otherwise it will be considered as an unknown field.
2101        source
2102            .as_mut()
2103            .map(|x| x.with_properties.remove(COMPACTION_TRIGGER_SNAPSHOT_COUNT));
2104    }
2105
2106    if let Some(target_file_size_mb) = handler_args
2107        .with_options
2108        .get(COMPACTION_TARGET_FILE_SIZE_MB)
2109    {
2110        let target_file_size_mb = target_file_size_mb.parse::<u64>().map_err(|_| {
2111            ErrorCode::InvalidInputSyntax(format!(
2112                "{} must be greater than 0: {}",
2113                COMPACTION_TARGET_FILE_SIZE_MB, target_file_size_mb
2114            ))
2115        })?;
2116        if target_file_size_mb == 0 {
2117            bail!(format!(
2118                "{} must be greater than 0",
2119                COMPACTION_TARGET_FILE_SIZE_MB
2120            ));
2121        }
2122        sink_with.insert(
2123            COMPACTION_TARGET_FILE_SIZE_MB.to_owned(),
2124            target_file_size_mb.to_string(),
2125        );
2126        // remove from source options, otherwise it will be considered as an unknown field.
2127        source
2128            .as_mut()
2129            .map(|x| x.with_properties.remove(COMPACTION_TARGET_FILE_SIZE_MB));
2130    }
2131
2132    if let Some(compaction_type) = handler_args.with_options.get(COMPACTION_TYPE) {
2133        let compaction_type = CompactionType::try_from(compaction_type.as_str()).map_err(|_| {
2134            ErrorCode::InvalidInputSyntax(format!(
2135                "invalid compaction_type: {}, must be one of {:?}",
2136                compaction_type,
2137                &[
2138                    CompactionType::Full,
2139                    CompactionType::SmallFiles,
2140                    CompactionType::FilesWithDelete
2141                ]
2142            ))
2143        })?;
2144
2145        sink_with.insert(
2146            COMPACTION_TYPE.to_owned(),
2147            compaction_type.as_str().to_owned(),
2148        );
2149
2150        // remove from source options, otherwise it will be considered as an unknown field.
2151        source
2152            .as_mut()
2153            .map(|x| x.with_properties.remove(COMPACTION_TYPE));
2154    }
2155
2156    let partition_by = handler_args
2157        .with_options
2158        .get("partition_by")
2159        .map(|v| v.to_owned());
2160
2161    if let Some(partition_by) = &partition_by {
2162        let mut partition_columns = vec![];
2163        for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
2164            table
2165                .columns()
2166                .iter()
2167                .find(|col| col.name().eq_ignore_ascii_case(&column))
2168                .ok_or_else(|| {
2169                    ErrorCode::InvalidInputSyntax(format!(
2170                        "Partition source column does not exist in schema: {}",
2171                        column
2172                    ))
2173                })?;
2174
2175            partition_columns.push(column);
2176        }
2177
2178        ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
2179            |_| {
2180                ErrorCode::InvalidInputSyntax(
2181                    "The partition columns should be the prefix of the primary key".to_owned(),
2182                )
2183            },
2184        )?;
2185
2186        sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
2187
2188        // remove partition_by from source options, otherwise it will be considered as an unknown field.
2189        source
2190            .as_mut()
2191            .map(|x| x.with_properties.remove("partition_by"));
2192    }
2193
2194    sink_handler_args.with_options =
2195        WithOptions::new(sink_with, Default::default(), connection_ref.clone());
2196    let SinkPlanContext {
2197        sink_plan,
2198        sink_catalog,
2199        ..
2200    } = gen_sink_plan(sink_handler_args, create_sink_stmt, None, true).await?;
2201    let sink_graph = build_graph(sink_plan, Some(GraphJobType::Sink))?;
2202
2203    let mut source_name = table_name.clone();
2204    *source_name.0.last_mut().unwrap() = Ident::from(
2205        (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
2206    );
2207    let create_source_stmt = CreateSourceStatement {
2208        temporary: false,
2209        if_not_exists: false,
2210        columns: vec![],
2211        source_name,
2212        wildcard_idx: Some(0),
2213        constraints: vec![],
2214        with_properties: WithProperties(vec![]),
2215        format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
2216        source_watermarks: vec![],
2217        include_column_options: vec![],
2218    };
2219
2220    let mut source_handler_args = handler_args.clone();
2221    let source_with = with_common;
2222    source_handler_args.with_options =
2223        WithOptions::new(source_with, Default::default(), connection_ref);
2224
2225    let overwrite_options = OverwriteOptions::new(&mut source_handler_args);
2226    let format_encode = create_source_stmt.format_encode.into_v2_with_warning();
2227    let (with_properties, refresh_mode) =
2228        bind_connector_props(&source_handler_args, &format_encode, true)?;
2229
2230    // Create iceberg sink table, used for iceberg source column binding. See `bind_columns_from_source_for_non_cdc` for more details.
2231    // TODO: We can derive the columns directly from table definition in the future, so that we don't need to pre-create the table catalog.
2232    let (iceberg_catalog, table_identifier) = {
2233        let sink_param = SinkParam::try_from_sink_catalog(sink_catalog.clone())?;
2234        let iceberg_sink = IcebergSink::try_from(sink_param)?;
2235        iceberg_sink.create_table_if_not_exists().await?;
2236
2237        let iceberg_catalog = iceberg_sink.config.create_catalog().await?;
2238        let table_identifier = iceberg_sink.config.full_table_name()?;
2239        (iceberg_catalog, table_identifier)
2240    };
2241
2242    let create_source_type = CreateSourceType::for_newly_created(&session, &*with_properties);
2243    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
2244        &session,
2245        &format_encode,
2246        Either::Left(&with_properties),
2247        create_source_type,
2248    )
2249    .await?;
2250    let mut col_id_gen = ColumnIdGenerator::new_initial();
2251
2252    let iceberg_source_catalog = bind_create_source_or_table_with_connector(
2253        source_handler_args,
2254        create_source_stmt.source_name,
2255        format_encode,
2256        with_properties,
2257        &create_source_stmt.columns,
2258        create_source_stmt.constraints,
2259        create_source_stmt.wildcard_idx,
2260        create_source_stmt.source_watermarks,
2261        columns_from_resolve_source,
2262        source_info,
2263        create_source_stmt.include_column_options,
2264        &mut col_id_gen,
2265        create_source_type,
2266        overwrite_options.source_rate_limit,
2267        SqlColumnStrategy::FollowChecked,
2268        refresh_mode,
2269    )
2270    .await?;
2271
2272    // before we create the table, ensure the JVM is initialized as we use jdbc catalog right now.
2273    // If JVM isn't initialized successfully, current not atomic ddl will result in a partially created iceberg engine table.
2274    let _ = Jvm::get_or_init()?;
2275
2276    let catalog_writer = session.catalog_writer()?;
2277    let action = match job_type {
2278        TableJobType::SharedCdcSource => LongRunningNotificationAction::MonitorBackfillJob,
2279        _ => LongRunningNotificationAction::DiagnoseBarrierLatency,
2280    };
2281    let res = execute_with_long_running_notification(
2282        catalog_writer.create_iceberg_table(
2283            PbTableJobInfo {
2284                source,
2285                table: Some(table.to_prost()),
2286                fragment_graph: Some(graph),
2287                job_type: job_type as _,
2288            },
2289            PbSinkJobInfo {
2290                sink: Some(sink_catalog.to_proto()),
2291                fragment_graph: Some(sink_graph),
2292            },
2293            iceberg_source_catalog.to_prost(),
2294            if_not_exists,
2295        ),
2296        &session,
2297        "CREATE TABLE",
2298        action,
2299    )
2300    .await;
2301
2302    if res.is_err() {
2303        let _ = iceberg_catalog
2304            .drop_table(&table_identifier)
2305            .await
2306            .inspect_err(|err| {
2307                tracing::error!(
2308                    "failed to drop iceberg table {} after create iceberg engine table failed: {}",
2309                    table_identifier,
2310                    err.as_report()
2311                );
2312            });
2313        res?
2314    }
2315
2316    Ok(())
2317}
2318
2319pub fn check_create_table_with_source(
2320    with_options: &WithOptions,
2321    format_encode: Option<FormatEncodeOptions>,
2322    include_column_options: &IncludeOption,
2323    cdc_table_info: &Option<CdcTableInfo>,
2324) -> Result<Option<FormatEncodeOptions>> {
2325    // skip check for cdc table
2326    if cdc_table_info.is_some() {
2327        return Ok(format_encode);
2328    }
2329    let defined_source = with_options.is_source_connector();
2330
2331    if !include_column_options.is_empty() && !defined_source {
2332        return Err(ErrorCode::InvalidInputSyntax(
2333            "INCLUDE should be used with a connector".to_owned(),
2334        )
2335        .into());
2336    }
2337    if defined_source {
2338        format_encode.as_ref().ok_or_else(|| {
2339            ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
2340        })?;
2341    }
2342    Ok(format_encode)
2343}
2344
2345fn ensure_partition_columns_are_prefix_of_primary_key(
2346    partition_columns: &[String],
2347    primary_key_columns: &[String],
2348) -> std::result::Result<(), String> {
2349    if partition_columns.len() > primary_key_columns.len() {
2350        return Err("Partition columns cannot be longer than primary key columns.".to_owned());
2351    }
2352
2353    for (i, partition_col) in partition_columns.iter().enumerate() {
2354        if primary_key_columns.get(i) != Some(partition_col) {
2355            return Err(format!(
2356                "Partition column '{}' is not a prefix of the primary key.",
2357                partition_col
2358            ));
2359        }
2360    }
2361
2362    Ok(())
2363}
2364
2365#[allow(clippy::too_many_arguments)]
2366pub async fn generate_stream_graph_for_replace_table(
2367    _session: &Arc<SessionImpl>,
2368    table_name: ObjectName,
2369    original_catalog: &Arc<TableCatalog>,
2370    handler_args: HandlerArgs,
2371    statement: Statement,
2372    col_id_gen: ColumnIdGenerator,
2373    sql_column_strategy: SqlColumnStrategy,
2374) -> Result<(
2375    StreamFragmentGraph,
2376    TableCatalog,
2377    Option<SourceCatalog>,
2378    TableJobType,
2379)> {
2380    let Statement::CreateTable {
2381        columns,
2382        constraints,
2383        source_watermarks,
2384        append_only,
2385        on_conflict,
2386        with_version_columns,
2387        wildcard_idx,
2388        cdc_table_info,
2389        format_encode,
2390        include_column_options,
2391        engine,
2392        with_options,
2393        ..
2394    } = statement
2395    else {
2396        panic!("unexpected statement type: {:?}", statement);
2397    };
2398
2399    let format_encode = format_encode
2400        .clone()
2401        .map(|format_encode| format_encode.into_v2_with_warning());
2402
2403    let engine = match engine {
2404        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
2405        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
2406    };
2407
2408    let is_drop_connector =
2409        original_catalog.associated_source_id().is_some() && format_encode.is_none();
2410    if is_drop_connector {
2411        debug_assert!(
2412            source_watermarks.is_empty()
2413                && include_column_options.is_empty()
2414                && with_options
2415                    .iter()
2416                    .all(|opt| opt.name.real_value().to_lowercase() != "connector")
2417        );
2418    }
2419
2420    let props = CreateTableProps {
2421        definition: handler_args.normalized_sql.clone(),
2422        append_only,
2423        on_conflict: on_conflict.into(),
2424        with_version_columns: with_version_columns
2425            .iter()
2426            .map(|col| col.real_value())
2427            .collect(),
2428        webhook_info: original_catalog.webhook_info.clone(),
2429        engine,
2430    };
2431
2432    let ((plan, mut source, mut table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2433        (Some(format_encode), None) => (
2434            gen_create_table_plan_with_source(
2435                handler_args,
2436                ExplainOptions::default(),
2437                table_name,
2438                columns,
2439                wildcard_idx,
2440                constraints,
2441                format_encode,
2442                source_watermarks,
2443                col_id_gen,
2444                include_column_options,
2445                props,
2446                sql_column_strategy,
2447            )
2448            .await?,
2449            TableJobType::General,
2450        ),
2451        (None, None) => {
2452            let context = OptimizerContext::from_handler_args(handler_args);
2453            let (plan, table) = gen_create_table_plan(
2454                context,
2455                table_name,
2456                columns,
2457                constraints,
2458                col_id_gen,
2459                source_watermarks,
2460                props,
2461                true,
2462            )?;
2463            ((plan, None, table), TableJobType::General)
2464        }
2465        (None, Some(cdc_table)) => {
2466            let session = &handler_args.session;
2467            let (source, resolved_table_name) =
2468                get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2469
2470            let (cdc_with_options, normalized_external_table_name) =
2471                derive_with_options_for_cdc_table(
2472                    &source.with_properties,
2473                    cdc_table.external_table_name.clone(),
2474                )?;
2475
2476            let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2477
2478            let context: OptimizerContextRef =
2479                OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2480            let (plan, table) = gen_create_table_plan_for_cdc_table(
2481                context,
2482                source,
2483                normalized_external_table_name,
2484                columns,
2485                column_catalogs,
2486                pk_names,
2487                cdc_with_options,
2488                col_id_gen,
2489                on_conflict,
2490                with_version_columns
2491                    .iter()
2492                    .map(|col| col.real_value())
2493                    .collect(),
2494                include_column_options,
2495                table_name,
2496                resolved_table_name,
2497                original_catalog.database_id,
2498                original_catalog.schema_id,
2499                original_catalog.id(),
2500                engine,
2501            )?;
2502
2503            ((plan, None, table), TableJobType::SharedCdcSource)
2504        }
2505        (Some(_), Some(_)) => {
2506            return Err(ErrorCode::NotSupported(
2507                "Data format and encoding format doesn't apply to table created from a CDC source"
2508                    .into(),
2509                "Remove the FORMAT and ENCODE specification".into(),
2510            )
2511            .into());
2512        }
2513    };
2514
2515    if table.pk_column_ids() != original_catalog.pk_column_ids() {
2516        Err(ErrorCode::InvalidInputSyntax(
2517            "alter primary key of table is not supported".to_owned(),
2518        ))?
2519    }
2520
2521    let graph = build_graph(plan, Some(GraphJobType::Table))?;
2522
2523    // Fill the original table ID.
2524    table.id = original_catalog.id();
2525    if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2526        table.associated_source_id = Some(source_id);
2527
2528        let source = source.as_mut().unwrap();
2529        source.id = source_id;
2530        source.associated_table_id = Some(table.id());
2531    }
2532
2533    Ok((graph, table, source, job_type))
2534}
2535
2536fn get_source_and_resolved_table_name(
2537    session: &Arc<SessionImpl>,
2538    cdc_table: CdcTableInfo,
2539    table_name: ObjectName,
2540) -> Result<(Arc<SourceCatalog>, String)> {
2541    let db_name = &session.database();
2542    let (_, resolved_table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
2543
2544    let (source_schema, source_name) =
2545        Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
2546
2547    let source = {
2548        let catalog_reader = session.env().catalog_reader().read_guard();
2549        let schema_name = source_schema.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2550        let (source, _) = catalog_reader.get_source_by_name(
2551            db_name,
2552            SchemaPath::Name(schema_name.as_str()),
2553            source_name.as_str(),
2554        )?;
2555        source.clone()
2556    };
2557
2558    Ok((source, resolved_table_name))
2559}
2560
2561// validate the webhook_info and also bind the webhook_info to protobuf
2562fn bind_webhook_info(
2563    session: &Arc<SessionImpl>,
2564    columns_defs: &[ColumnDef],
2565    webhook_info: WebhookSourceInfo,
2566) -> Result<PbWebhookSourceInfo> {
2567    // validate columns
2568    if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &AstDataType::Jsonb
2569    {
2570        return Err(ErrorCode::InvalidInputSyntax(
2571            "Table with webhook source should have exactly one JSONB column".to_owned(),
2572        )
2573        .into());
2574    }
2575
2576    let WebhookSourceInfo {
2577        secret_ref,
2578        signature_expr,
2579        wait_for_persistence,
2580        is_batched,
2581    } = webhook_info;
2582
2583    // validate secret_ref
2584    let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2585        let db_name = &session.database();
2586        let (schema_name, secret_name) =
2587            Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
2588        let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2589        (
2590            Some(PbSecretRef {
2591                secret_id: secret_catalog.id,
2592                ref_as: match secret_ref.ref_as {
2593                    SecretRefAsType::Text => PbRefAsType::Text,
2594                    SecretRefAsType::File => PbRefAsType::File,
2595                }
2596                .into(),
2597            }),
2598            Some(secret_name),
2599        )
2600    } else {
2601        (None, None)
2602    };
2603
2604    let signature_expr = if let Some(signature_expr) = signature_expr {
2605        let secure_compare_context = SecureCompareContext {
2606            column_name: columns_defs[0].name.real_value(),
2607            secret_name,
2608        };
2609        let mut binder = Binder::new_for_ddl(session).with_secure_compare(secure_compare_context);
2610        let expr = binder.bind_expr(&signature_expr)?;
2611
2612        // validate expr, ensuring it is SECURE_COMPARE()
2613        if expr.as_function_call().is_none()
2614            || expr.as_function_call().unwrap().func_type()
2615                != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2616        {
2617            return Err(ErrorCode::InvalidInputSyntax(
2618                "The signature verification function must be SECURE_COMPARE()".to_owned(),
2619            )
2620            .into());
2621        }
2622
2623        Some(expr.to_expr_proto())
2624    } else {
2625        session.notice_to_user(
2626            "VALIDATE clause is strongly recommended for safety or production usages",
2627        );
2628        None
2629    };
2630
2631    let pb_webhook_info = PbWebhookSourceInfo {
2632        secret_ref: pb_secret_ref,
2633        signature_expr,
2634        wait_for_persistence,
2635        is_batched,
2636    };
2637
2638    Ok(pb_webhook_info)
2639}
2640
2641#[cfg(test)]
2642mod tests {
2643    use risingwave_common::catalog::{
2644        DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2645    };
2646    use risingwave_common::types::{DataType, StructType};
2647
2648    use super::*;
2649    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2650
2651    #[tokio::test]
2652    async fn test_create_table_handler() {
2653        let sql =
2654            "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2655        let frontend = LocalFrontend::new(Default::default()).await;
2656        frontend.run_sql(sql).await.unwrap();
2657
2658        let session = frontend.session_ref();
2659        let catalog_reader = session.env().catalog_reader().read_guard();
2660        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2661
2662        // Check table exists.
2663        let (table, _) = catalog_reader
2664            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2665            .unwrap();
2666        assert_eq!(table.name(), "t");
2667
2668        let columns = table
2669            .columns
2670            .iter()
2671            .map(|col| (col.name(), col.data_type().clone()))
2672            .collect::<HashMap<&str, DataType>>();
2673
2674        let expected_columns = maplit::hashmap! {
2675            ROW_ID_COLUMN_NAME => DataType::Serial,
2676            "v1" => DataType::Int16,
2677            "v2" => StructType::new(
2678                vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2679            )
2680            .with_ids([3, 4, 5].map(ColumnId::new))
2681            .into(),
2682            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2683        };
2684
2685        assert_eq!(columns, expected_columns, "{columns:#?}");
2686    }
2687
2688    #[test]
2689    fn test_bind_primary_key() {
2690        // Note: Column ID 0 is reserved for row ID column.
2691
2692        for (sql, expected) in [
2693            ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
2694            ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
2695            ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
2696            (
2697                "create table t (v1 int primary key, v2 int primary key)",
2698                Err("multiple primary keys are not allowed"),
2699            ),
2700            (
2701                "create table t (v1 int primary key primary key, v2 int)",
2702                Err("multiple primary keys are not allowed"),
2703            ),
2704            (
2705                "create table t (v1 int, v2 int, primary key (v1))",
2706                Ok(&[1]),
2707            ),
2708            (
2709                "create table t (v1 int, primary key (v2), v2 int)",
2710                Ok(&[2]),
2711            ),
2712            (
2713                "create table t (primary key (v2, v1), v1 int, v2 int)",
2714                Ok(&[2, 1]),
2715            ),
2716            (
2717                "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
2718                Err("multiple primary keys are not allowed"),
2719            ),
2720            (
2721                "create table t (v1 int primary key, primary key (v1), v2 int)",
2722                Err("multiple primary keys are not allowed"),
2723            ),
2724            (
2725                "create table t (v1 int, primary key (V3), v2 int)",
2726                Err("column \"v3\" named in key does not exist"),
2727            ),
2728        ] {
2729            let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
2730            let risingwave_sqlparser::ast::Statement::CreateTable {
2731                columns: column_defs,
2732                constraints,
2733                ..
2734            } = ast.remove(0)
2735            else {
2736                panic!("test case should be create table")
2737            };
2738            let actual: Result<_> = (|| {
2739                let mut columns = bind_sql_columns(&column_defs, false)?;
2740                let mut col_id_gen = ColumnIdGenerator::new_initial();
2741                for c in &mut columns {
2742                    col_id_gen.generate(c)?;
2743                }
2744
2745                let pk_names =
2746                    bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
2747                let (_, pk_column_ids, _) =
2748                    bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
2749                Ok(pk_column_ids)
2750            })();
2751            match (expected, actual) {
2752                (Ok(expected), Ok(actual)) => assert_eq!(
2753                    expected.iter().copied().map(ColumnId::new).collect_vec(),
2754                    actual,
2755                    "sql: {sql}"
2756                ),
2757                (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
2758                (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
2759                (Err(expected), Err(actual)) => assert!(
2760                    actual.to_string().contains(expected),
2761                    "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
2762                ),
2763            }
2764        }
2765    }
2766
2767    #[tokio::test]
2768    async fn test_duplicate_props_options() {
2769        let proto_file = create_proto_file(PROTO_FILE_DATA);
2770        let sql = format!(
2771            r#"CREATE TABLE t
2772    WITH (
2773        connector = 'kinesis',
2774        aws.region='user_test_topic',
2775        endpoint='172.10.1.1:9090,172.10.1.2:9090',
2776        aws.credentials.access_key_id = 'your_access_key_1',
2777        aws.credentials.secret_access_key = 'your_secret_key_1'
2778    )
2779    FORMAT PLAIN ENCODE PROTOBUF (
2780        message = '.test.TestRecord',
2781        aws.credentials.access_key_id = 'your_access_key_2',
2782        aws.credentials.secret_access_key = 'your_secret_key_2',
2783        schema.location = 'file://{}',
2784    )"#,
2785            proto_file.path().to_str().unwrap()
2786        );
2787        let frontend = LocalFrontend::new(Default::default()).await;
2788        frontend.run_sql(sql).await.unwrap();
2789
2790        let session = frontend.session_ref();
2791        let catalog_reader = session.env().catalog_reader().read_guard();
2792        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2793
2794        // Check source exists.
2795        let (source, _) = catalog_reader
2796            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2797            .unwrap();
2798        assert_eq!(source.name, "t");
2799
2800        // AwsAuth params exist in options.
2801        assert_eq!(
2802            source
2803                .info
2804                .format_encode_options
2805                .get("aws.credentials.access_key_id")
2806                .unwrap(),
2807            "your_access_key_2"
2808        );
2809        assert_eq!(
2810            source
2811                .info
2812                .format_encode_options
2813                .get("aws.credentials.secret_access_key")
2814                .unwrap(),
2815            "your_secret_key_2"
2816        );
2817
2818        // AwsAuth params exist in props.
2819        assert_eq!(
2820            source
2821                .with_properties
2822                .get("aws.credentials.access_key_id")
2823                .unwrap(),
2824            "your_access_key_1"
2825        );
2826        assert_eq!(
2827            source
2828                .with_properties
2829                .get("aws.credentials.secret_access_key")
2830                .unwrap(),
2831            "your_secret_key_1"
2832        );
2833
2834        // Options are not merged into props.
2835        assert!(!source.with_properties.contains_key("schema.location"));
2836    }
2837}