risingwave_frontend/handler/
create_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28    CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29    RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, TableId,
30};
31use risingwave_common::config::MetaBackend;
32use risingwave_common::session_config::sink_decouple::SinkDecouple;
33use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
34use risingwave_common::util::value_encoding::DatumToProtoExt;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::jvm_runtime::JVM;
37use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_connector::source::cdc::external::{
40    DATABASE_NAME_KEY, ExternalTableConfig, ExternalTableImpl, SCHEMA_NAME_KEY, TABLE_NAME_KEY,
41};
42use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, source};
43use risingwave_pb::catalog::connection::Info as ConnectionInfo;
44use risingwave_pb::catalog::connection_params::ConnectionType;
45use risingwave_pb::catalog::{PbSource, PbWebhookSourceInfo, WatermarkDesc};
46use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
47use risingwave_pb::meta::PbThrottleTarget;
48use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
49use risingwave_pb::plan_common::{
50    AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
51};
52use risingwave_pb::secret::PbSecretRef;
53use risingwave_pb::secret::secret_ref::PbRefAsType;
54use risingwave_pb::stream_plan::StreamFragmentGraph;
55use risingwave_sqlparser::ast::{
56    CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
57    CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
58    FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
59    Statement, TableConstraint, WebhookSourceInfo, WithProperties,
60};
61use risingwave_sqlparser::parser::{IncludeOption, Parser};
62
63use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
64use super::{RwPgResponse, alter_streaming_rate_limit, create_sink, create_source};
65use crate::binder::{Clause, SecureCompareContext, bind_data_type};
66use crate::catalog::root_catalog::SchemaPath;
67use crate::catalog::source_catalog::SourceCatalog;
68use crate::catalog::table_catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, TableVersion};
69use crate::catalog::{ColumnId, DatabaseId, SchemaId, check_column_name_not_reserved};
70use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
71use crate::expr::{Expr, ExprImpl, ExprRewriter};
72use crate::handler::HandlerArgs;
73use crate::handler::create_source::{
74    UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
75    bind_source_watermark, handle_addition_columns,
76};
77use crate::handler::util::SourceSchemaCompatExt;
78use crate::optimizer::plan_node::generic::{CdcScanOptions, SourceNodeKind};
79use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource};
80use crate::optimizer::property::{Order, RequiredDist};
81use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot};
82use crate::session::SessionImpl;
83use crate::session::current::notice_to_user;
84use crate::stream_fragmenter::{GraphJobType, build_graph};
85use crate::utils::OverwriteOptions;
86use crate::{Binder, Explain, TableCatalog, WithOptions};
87
88mod col_id_gen;
89pub use col_id_gen::*;
90use risingwave_connector::sink::iceberg::parse_partition_by_exprs;
91
92use crate::handler::drop_table::handle_drop_table;
93
94fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
95    for option_def in &c.options {
96        match option_def.option {
97            ColumnOption::GeneratedColumns(_) => {}
98            ColumnOption::DefaultValue(_) => {}
99            ColumnOption::DefaultValueInternal { .. } => {}
100            ColumnOption::Unique { is_primary: true } => {}
101            ColumnOption::Null => {}
102            ColumnOption::NotNull => {}
103            _ => bail_not_implemented!("column constraints \"{}\"", option_def),
104        }
105    }
106    Ok(())
107}
108
109/// Binds the column schemas declared in CREATE statement into `ColumnDesc`.
110/// If a column is marked as `primary key`, its `ColumnId` is also returned.
111/// This primary key is not combined with table constraints yet.
112pub fn bind_sql_columns(
113    column_defs: &[ColumnDef],
114    is_for_drop_table_connector: bool,
115) -> Result<Vec<ColumnCatalog>> {
116    let mut columns = Vec::with_capacity(column_defs.len());
117
118    for column in column_defs {
119        ensure_column_options_supported(column)?;
120        // Destruct to make sure all fields are properly handled rather than ignored.
121        // Do NOT use `..` to ignore fields you do not want to deal with.
122        // Reject them with a clear NotImplemented error.
123        let ColumnDef {
124            name,
125            data_type,
126            collation,
127            options,
128            ..
129        } = column;
130
131        let data_type = data_type
132            .clone()
133            .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
134        if let Some(collation) = collation {
135            // PostgreSQL will limit the datatypes that collate can work on.
136            // https://www.postgresql.org/docs/16/collation.html#COLLATION-CONCEPTS
137            //   > The built-in collatable data types are `text`, `varchar`, and `char`.
138            //
139            // But we don't support real collation, we simply ignore it here.
140            if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
141                bail_not_implemented!(
142                    "Collate collation other than `C` or `POSIX` is not implemented"
143                );
144            }
145
146            match data_type {
147                AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
148                _ => {
149                    return Err(ErrorCode::NotSupported(
150                        format!("{} is not a collatable data type", data_type),
151                        "The only built-in collatable data types are `varchar`, please check your type".into(),
152                    ).into());
153                }
154            }
155        }
156
157        if !is_for_drop_table_connector {
158            // additional column name may have prefix _rw
159            // When converting dropping the connector from table, the additional columns are converted to normal columns and keep the original name.
160            // Under this case, we loosen the check for _rw prefix.
161            check_column_name_not_reserved(&name.real_value())?;
162        }
163
164        let nullable: bool = !options
165            .iter()
166            .any(|def| matches!(def.option, ColumnOption::NotNull));
167
168        columns.push(ColumnCatalog {
169            column_desc: ColumnDesc {
170                data_type: bind_data_type(&data_type)?,
171                column_id: ColumnId::placeholder(),
172                name: name.real_value(),
173                generated_or_default_column: None,
174                description: None,
175                additional_column: AdditionalColumn { column_type: None },
176                version: ColumnDescVersion::LATEST,
177                system_column: None,
178                nullable,
179            },
180            is_hidden: false,
181        });
182    }
183
184    Ok(columns)
185}
186
187fn check_generated_column_constraints(
188    column_name: &String,
189    column_id: ColumnId,
190    expr: &ExprImpl,
191    column_catalogs: &[ColumnCatalog],
192    generated_column_names: &[String],
193    pk_column_ids: &[ColumnId],
194) -> Result<()> {
195    let input_refs = expr.collect_input_refs(column_catalogs.len());
196    for idx in input_refs.ones() {
197        let referred_generated_column = &column_catalogs[idx].column_desc.name;
198        if generated_column_names
199            .iter()
200            .any(|c| c == referred_generated_column)
201        {
202            return Err(ErrorCode::BindError(format!(
203                "Generated can not reference another generated column. \
204                But here generated column \"{}\" referenced another generated column \"{}\"",
205                column_name, referred_generated_column
206            ))
207            .into());
208        }
209    }
210
211    if pk_column_ids.contains(&column_id) && expr.is_impure() {
212        return Err(ErrorCode::BindError(format!(
213            "Generated columns with impure expressions should not be part of the primary key. \
214            Here column \"{}\" is defined as part of the primary key.",
215            column_name
216        ))
217        .into());
218    }
219
220    Ok(())
221}
222
223/// Binds constraints that can be only specified in column definitions,
224/// currently generated columns and default columns.
225pub fn bind_sql_column_constraints(
226    session: &SessionImpl,
227    table_name: String,
228    column_catalogs: &mut [ColumnCatalog],
229    columns: Vec<ColumnDef>,
230    pk_column_ids: &[ColumnId],
231) -> Result<()> {
232    let generated_column_names = {
233        let mut names = vec![];
234        for column in &columns {
235            for option_def in &column.options {
236                if let ColumnOption::GeneratedColumns(_) = option_def.option {
237                    names.push(column.name.real_value());
238                    break;
239                }
240            }
241        }
242        names
243    };
244
245    let mut binder = Binder::new_for_ddl(session);
246    binder.bind_columns_to_context(table_name.clone(), column_catalogs)?;
247
248    for column in columns {
249        let Some(idx) = column_catalogs
250            .iter()
251            .position(|c| c.name() == column.name.real_value())
252        else {
253            // It's possible that we don't follow the user defined columns in SQL but take the
254            // ones resolved from the source, thus missing some columns. Simply ignore them.
255            continue;
256        };
257
258        for option_def in column.options {
259            match option_def.option {
260                ColumnOption::GeneratedColumns(expr) => {
261                    binder.set_clause(Some(Clause::GeneratedColumn));
262
263                    let expr_impl = binder.bind_expr(expr).with_context(|| {
264                        format!(
265                            "fail to bind expression in generated column \"{}\"",
266                            column.name.real_value()
267                        )
268                    })?;
269
270                    check_generated_column_constraints(
271                        &column.name.real_value(),
272                        column_catalogs[idx].column_id(),
273                        &expr_impl,
274                        column_catalogs,
275                        &generated_column_names,
276                        pk_column_ids,
277                    )?;
278
279                    column_catalogs[idx].column_desc.generated_or_default_column = Some(
280                        GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
281                            expr: Some(expr_impl.to_expr_proto()),
282                        }),
283                    );
284                    binder.set_clause(None);
285                }
286                ColumnOption::DefaultValue(expr) => {
287                    let expr_impl = binder
288                        .bind_expr(expr)?
289                        .cast_assign(column_catalogs[idx].data_type().clone())?;
290
291                    // Rewrite expressions to evaluate a snapshot value, used for missing values in the case of
292                    // schema change.
293                    //
294                    // TODO: Currently we don't support impure expressions other than `now()` (like `random()`),
295                    // so the rewritten expression should almost always be pure and we directly call `fold_const`
296                    // here. Actually we do not require purity of the expression here since we're only to get a
297                    // snapshot value.
298                    let rewritten_expr_impl = session
299                        .pinned_snapshot()
300                        .inline_now_proc_time()
301                        .rewrite_expr(expr_impl.clone());
302
303                    if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
304                        let snapshot_value = snapshot_value?;
305
306                        column_catalogs[idx].column_desc.generated_or_default_column =
307                            Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
308                                snapshot_value: Some(snapshot_value.to_protobuf()),
309                                expr: Some(expr_impl.to_expr_proto()),
310                                // persist the original expression
311                            }));
312                    } else {
313                        return Err(ErrorCode::BindError(format!(
314                            "Default expression used in column `{}` cannot be evaluated. \
315                            Use generated columns instead if you mean to reference other columns.",
316                            column.name
317                        ))
318                        .into());
319                    }
320                }
321                ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
322                    // When a `DEFAULT INTERNAL` is used internally for schema change, the persisted value
323                    // should already be set during purifcation. So if we encounter an empty value here, it
324                    // means the user has specified it explicitly in the SQL statement, typically by
325                    // directly copying the result of `SHOW CREATE TABLE` and executing it.
326                    if persisted.is_empty() {
327                        bail_bind_error!(
328                            "DEFAULT INTERNAL is only used for internal purposes, \
329                             please specify a concrete default value"
330                        );
331                    }
332
333                    let desc = DefaultColumnDesc::decode(&*persisted)
334                        .expect("failed to decode persisted `DefaultColumnDesc`");
335
336                    column_catalogs[idx].column_desc.generated_or_default_column =
337                        Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
338                }
339                _ => {}
340            }
341        }
342    }
343    Ok(())
344}
345
346/// Currently we only support Primary key table constraint, so just return pk names if it exists
347pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
348    let mut pk_column_names = vec![];
349
350    for constraint in table_constraints {
351        match constraint {
352            TableConstraint::Unique {
353                name: _,
354                columns,
355                is_primary: true,
356            } => {
357                if !pk_column_names.is_empty() {
358                    return Err(multiple_pk_definition_err());
359                }
360                pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
361            }
362            _ => bail_not_implemented!("table constraint \"{}\"", constraint),
363        }
364    }
365    Ok(pk_column_names)
366}
367
368pub fn bind_sql_pk_names(
369    columns_defs: &[ColumnDef],
370    pk_names_from_table_constraints: Vec<String>,
371) -> Result<Vec<String>> {
372    let mut pk_column_names = pk_names_from_table_constraints;
373
374    for column in columns_defs {
375        for option_def in &column.options {
376            if let ColumnOption::Unique { is_primary: true } = option_def.option {
377                if !pk_column_names.is_empty() {
378                    return Err(multiple_pk_definition_err());
379                }
380                pk_column_names.push(column.name.real_value());
381            };
382        }
383    }
384
385    Ok(pk_column_names)
386}
387
388fn multiple_pk_definition_err() -> RwError {
389    ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
390}
391
392/// Binds primary keys defined in SQL.
393///
394/// It returns the columns together with `pk_column_ids`, and an optional row id column index if
395/// added.
396pub fn bind_pk_and_row_id_on_relation(
397    mut columns: Vec<ColumnCatalog>,
398    pk_names: Vec<String>,
399    must_need_pk: bool,
400) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
401    for c in &columns {
402        assert!(c.column_id() != ColumnId::placeholder());
403    }
404
405    // Mapping from column name to column id.
406    let name_to_id = columns
407        .iter()
408        .map(|c| (c.name(), c.column_id()))
409        .collect::<HashMap<_, _>>();
410
411    let mut pk_column_ids: Vec<_> = pk_names
412        .iter()
413        .map(|name| {
414            name_to_id.get(name.as_str()).copied().ok_or_else(|| {
415                ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
416            })
417        })
418        .try_collect()?;
419
420    // Add `_row_id` column if `pk_column_ids` is empty and must_need_pk
421    let need_row_id = pk_column_ids.is_empty() && must_need_pk;
422
423    let row_id_index = need_row_id.then(|| {
424        let column = ColumnCatalog::row_id_column();
425        let index = columns.len();
426        pk_column_ids = vec![column.column_id()];
427        columns.push(column);
428        index
429    });
430
431    if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
432        Err(ErrorCode::InvalidInputSyntax(format!(
433            "column \"{col}\" specified more than once"
434        )))?;
435    }
436
437    Ok((columns, pk_column_ids, row_id_index))
438}
439
440/// `gen_create_table_plan_with_source` generates the plan for creating a table with an external
441/// stream source.
442#[allow(clippy::too_many_arguments)]
443pub(crate) async fn gen_create_table_plan_with_source(
444    mut handler_args: HandlerArgs,
445    explain_options: ExplainOptions,
446    table_name: ObjectName,
447    column_defs: Vec<ColumnDef>,
448    wildcard_idx: Option<usize>,
449    constraints: Vec<TableConstraint>,
450    format_encode: FormatEncodeOptions,
451    source_watermarks: Vec<SourceWatermark>,
452    mut col_id_gen: ColumnIdGenerator,
453    include_column_options: IncludeOption,
454    props: CreateTableProps,
455    sql_column_strategy: SqlColumnStrategy,
456) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog)> {
457    if props.append_only
458        && format_encode.format != Format::Plain
459        && format_encode.format != Format::Native
460    {
461        return Err(ErrorCode::BindError(format!(
462            "Append only table does not support format {}.",
463            format_encode.format
464        ))
465        .into());
466    }
467
468    let session = &handler_args.session;
469    let with_properties = bind_connector_props(&handler_args, &format_encode, false)?;
470    if with_properties.is_shareable_cdc_connector() {
471        generated_columns_check_for_cdc_table(&column_defs)?;
472        not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
473    } else if column_defs.iter().any(|col| {
474        col.options
475            .iter()
476            .any(|def| matches!(def.option, ColumnOption::NotNull))
477    }) {
478        // if non-cdc source
479        notice_to_user(
480            "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
481        );
482    }
483
484    let db_name: &str = &session.database();
485    let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
486
487    // TODO: omit this step if `sql_column_strategy` is `Follow`.
488    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
489        session,
490        &format_encode,
491        Either::Left(&with_properties),
492        CreateSourceType::Table,
493    )
494    .await?;
495
496    let overwrite_options = OverwriteOptions::new(&mut handler_args);
497    let rate_limit = overwrite_options.source_rate_limit;
498    let source = bind_create_source_or_table_with_connector(
499        handler_args.clone(),
500        table_name,
501        format_encode,
502        with_properties,
503        &column_defs,
504        constraints,
505        wildcard_idx,
506        source_watermarks,
507        columns_from_resolve_source,
508        source_info,
509        include_column_options,
510        &mut col_id_gen,
511        CreateSourceType::Table,
512        rate_limit,
513        sql_column_strategy,
514    )
515    .await?;
516
517    let context = OptimizerContext::new(handler_args, explain_options);
518
519    let (plan, table) = gen_table_plan_with_source(
520        context.into(),
521        schema_name,
522        source.clone(),
523        col_id_gen.into_version(),
524        props,
525    )?;
526
527    Ok((plan, Some(source), table))
528}
529
530/// `gen_create_table_plan` generates the plan for creating a table without an external stream
531/// source.
532#[allow(clippy::too_many_arguments)]
533pub(crate) fn gen_create_table_plan(
534    context: OptimizerContext,
535    table_name: ObjectName,
536    column_defs: Vec<ColumnDef>,
537    constraints: Vec<TableConstraint>,
538    mut col_id_gen: ColumnIdGenerator,
539    source_watermarks: Vec<SourceWatermark>,
540    props: CreateTableProps,
541    is_for_replace_plan: bool,
542) -> Result<(PlanRef, TableCatalog)> {
543    let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
544    for c in &mut columns {
545        col_id_gen.generate(c)?;
546    }
547
548    let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
549    if !secret_refs.is_empty() || !connection_refs.is_empty() {
550        return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
551    }
552
553    gen_create_table_plan_without_source(
554        context,
555        table_name,
556        columns,
557        column_defs,
558        constraints,
559        source_watermarks,
560        col_id_gen.into_version(),
561        props,
562    )
563}
564
565#[allow(clippy::too_many_arguments)]
566pub(crate) fn gen_create_table_plan_without_source(
567    context: OptimizerContext,
568    table_name: ObjectName,
569    columns: Vec<ColumnCatalog>,
570    column_defs: Vec<ColumnDef>,
571    constraints: Vec<TableConstraint>,
572    source_watermarks: Vec<SourceWatermark>,
573    version: TableVersion,
574    props: CreateTableProps,
575) -> Result<(PlanRef, TableCatalog)> {
576    // XXX: Why not bind outside?
577    let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
578    let (mut columns, pk_column_ids, row_id_index) =
579        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
580
581    let watermark_descs: Vec<WatermarkDesc> = bind_source_watermark(
582        context.session_ctx(),
583        table_name.real_value(),
584        source_watermarks,
585        &columns,
586    )?;
587
588    bind_sql_column_constraints(
589        context.session_ctx(),
590        table_name.real_value(),
591        &mut columns,
592        column_defs,
593        &pk_column_ids,
594    )?;
595    let session = context.session_ctx().clone();
596
597    let db_name = &session.database();
598    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?;
599
600    let info = CreateTableInfo {
601        columns,
602        pk_column_ids,
603        row_id_index,
604        watermark_descs,
605        source_catalog: None,
606        version,
607    };
608
609    gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
610}
611
612fn gen_table_plan_with_source(
613    context: OptimizerContextRef,
614    schema_name: Option<String>,
615    source_catalog: SourceCatalog,
616    version: TableVersion,
617    props: CreateTableProps,
618) -> Result<(PlanRef, TableCatalog)> {
619    let table_name = source_catalog.name.clone();
620
621    let info = CreateTableInfo {
622        columns: source_catalog.columns.clone(),
623        pk_column_ids: source_catalog.pk_col_ids.clone(),
624        row_id_index: source_catalog.row_id_index,
625        watermark_descs: source_catalog.watermark_descs.clone(),
626        source_catalog: Some(source_catalog),
627        version,
628    };
629
630    gen_table_plan_inner(context, schema_name, table_name, info, props)
631}
632
633/// On-conflict behavior either from user input or existing table catalog.
634#[derive(Clone, Copy)]
635pub enum EitherOnConflict {
636    Ast(Option<OnConflict>),
637    Resolved(ConflictBehavior),
638}
639
640impl From<Option<OnConflict>> for EitherOnConflict {
641    fn from(v: Option<OnConflict>) -> Self {
642        Self::Ast(v)
643    }
644}
645
646impl From<ConflictBehavior> for EitherOnConflict {
647    fn from(v: ConflictBehavior) -> Self {
648        Self::Resolved(v)
649    }
650}
651
652impl EitherOnConflict {
653    /// Resolves the conflict behavior based on the given information.
654    pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
655        let conflict_behavior = match self {
656            EitherOnConflict::Ast(on_conflict) => {
657                if append_only {
658                    if row_id_as_pk {
659                        // Primary key will be generated, no conflict check needed.
660                        ConflictBehavior::NoCheck
661                    } else {
662                        // User defined PK on append-only table, enforce `DO NOTHING`.
663                        if let Some(on_conflict) = on_conflict
664                            && on_conflict != OnConflict::Nothing
665                        {
666                            return Err(ErrorCode::InvalidInputSyntax(
667                                "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
668                                     the ON CONFLICT behavior must be DO NOTHING."
669                                    .to_owned(),
670                            )
671                            .into());
672                        }
673                        ConflictBehavior::IgnoreConflict
674                    }
675                } else {
676                    // Default to `UPDATE FULL` for non-append-only tables.
677                    match on_conflict.unwrap_or(OnConflict::UpdateFull) {
678                        OnConflict::UpdateFull => ConflictBehavior::Overwrite,
679                        OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
680                        OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
681                    }
682                }
683            }
684            EitherOnConflict::Resolved(b) => b,
685        };
686
687        Ok(conflict_behavior)
688    }
689}
690
691/// Arguments of the functions that generate a table plan, part 1.
692///
693/// Compared to [`CreateTableProps`], this struct contains fields that need some work of binding
694/// or resolving based on the user input.
695pub struct CreateTableInfo {
696    pub columns: Vec<ColumnCatalog>,
697    pub pk_column_ids: Vec<ColumnId>,
698    pub row_id_index: Option<usize>,
699    pub watermark_descs: Vec<WatermarkDesc>,
700    pub source_catalog: Option<SourceCatalog>,
701    pub version: TableVersion,
702}
703
704/// Arguments of the functions that generate a table plan, part 2.
705///
706/// Compared to [`CreateTableInfo`], this struct contains fields that can be (relatively) simply
707/// obtained from the input or the context.
708pub struct CreateTableProps {
709    pub definition: String,
710    pub append_only: bool,
711    pub on_conflict: EitherOnConflict,
712    pub with_version_column: Option<String>,
713    pub webhook_info: Option<PbWebhookSourceInfo>,
714    pub engine: Engine,
715}
716
717#[allow(clippy::too_many_arguments)]
718fn gen_table_plan_inner(
719    context: OptimizerContextRef,
720    schema_name: Option<String>,
721    table_name: String,
722    info: CreateTableInfo,
723    props: CreateTableProps,
724) -> Result<(PlanRef, TableCatalog)> {
725    let CreateTableInfo {
726        ref columns,
727        row_id_index,
728        ref watermark_descs,
729        ref source_catalog,
730        ..
731    } = info;
732    let CreateTableProps { append_only, .. } = props;
733
734    let (database_id, schema_id) = context
735        .session_ctx()
736        .get_database_and_schema_id_for_create(schema_name)?;
737
738    let session = context.session_ctx().clone();
739    let retention_seconds = context.with_options().retention_seconds();
740
741    let source_node: PlanRef = LogicalSource::new(
742        source_catalog.clone().map(Rc::new),
743        columns.clone(),
744        row_id_index,
745        SourceNodeKind::CreateTable,
746        context.clone(),
747        None,
748    )?
749    .into();
750
751    let required_cols = FixedBitSet::with_capacity(columns.len());
752    let plan_root = PlanRoot::new_with_logical_plan(
753        source_node,
754        RequiredDist::Any,
755        Order::any(),
756        required_cols,
757        vec![],
758    );
759
760    if !append_only && !watermark_descs.is_empty() {
761        return Err(ErrorCode::NotSupported(
762            "Defining watermarks on table requires the table to be append only.".to_owned(),
763            "Use the key words `APPEND ONLY`".to_owned(),
764        )
765        .into());
766    }
767
768    if !append_only && retention_seconds.is_some() {
769        return Err(ErrorCode::NotSupported(
770            "Defining retention seconds on table requires the table to be append only.".to_owned(),
771            "Use the key words `APPEND ONLY`".to_owned(),
772        )
773        .into());
774    }
775
776    let materialize =
777        plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
778
779    let mut table = materialize.table().clone();
780    table.owner = session.user_id();
781
782    Ok((materialize.into(), table))
783}
784
785/// Generate stream plan for cdc table based on shared source.
786/// In replace workflow, the `table_id` is the id of the table to be replaced
787/// in create table workflow, the `table_id` is a placeholder will be filled in the Meta
788#[allow(clippy::too_many_arguments)]
789pub(crate) fn gen_create_table_plan_for_cdc_table(
790    context: OptimizerContextRef,
791    source: Arc<SourceCatalog>,
792    external_table_name: String,
793    column_defs: Vec<ColumnDef>,
794    mut columns: Vec<ColumnCatalog>,
795    pk_names: Vec<String>,
796    cdc_with_options: WithOptionsSecResolved,
797    mut col_id_gen: ColumnIdGenerator,
798    on_conflict: Option<OnConflict>,
799    with_version_column: Option<String>,
800    include_column_options: IncludeOption,
801    table_name: ObjectName,
802    resolved_table_name: String, // table name without schema prefix
803    database_id: DatabaseId,
804    schema_id: SchemaId,
805    table_id: TableId,
806    engine: Engine,
807) -> Result<(PlanRef, TableCatalog)> {
808    let session = context.session_ctx().clone();
809
810    // append additional columns to the end
811    handle_addition_columns(
812        None,
813        &cdc_with_options,
814        include_column_options,
815        &mut columns,
816        true,
817    )?;
818
819    for c in &mut columns {
820        col_id_gen.generate(c)?;
821    }
822
823    let (mut columns, pk_column_ids, _row_id_index) =
824        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
825
826    // NOTES: In auto schema change, default value is not provided in column definition.
827    bind_sql_column_constraints(
828        context.session_ctx(),
829        table_name.real_value(),
830        &mut columns,
831        column_defs,
832        &pk_column_ids,
833    )?;
834
835    let definition = context.normalized_sql().to_owned();
836
837    let pk_column_indices = {
838        let mut id_to_idx = HashMap::new();
839        columns.iter().enumerate().for_each(|(idx, c)| {
840            id_to_idx.insert(c.column_id(), idx);
841        });
842        // pk column id must exist in table columns.
843        pk_column_ids
844            .iter()
845            .map(|c| id_to_idx.get(c).copied().unwrap())
846            .collect_vec()
847    };
848    let table_pk = pk_column_indices
849        .iter()
850        .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
851        .collect();
852
853    let (options, secret_refs) = cdc_with_options.into_parts();
854
855    let non_generated_column_descs = columns
856        .iter()
857        .filter(|&c| (!c.is_generated()))
858        .map(|c| c.column_desc.clone())
859        .collect_vec();
860    let non_generated_column_num = non_generated_column_descs.len();
861
862    let cdc_table_desc = CdcTableDesc {
863        table_id,
864        source_id: source.id.into(), // id of cdc source streaming job
865        external_table_name: external_table_name.clone(),
866        pk: table_pk,
867        columns: non_generated_column_descs,
868        stream_key: pk_column_indices,
869        connect_properties: options,
870        secret_refs,
871    };
872
873    tracing::debug!(?cdc_table_desc, "create cdc table");
874
875    let options = CdcScanOptions::from_with_options(context.with_options())?;
876
877    let logical_scan = LogicalCdcScan::create(
878        external_table_name.clone(),
879        Rc::new(cdc_table_desc),
880        context.clone(),
881        options,
882    );
883
884    let scan_node: PlanRef = logical_scan.into();
885    let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
886    let plan_root = PlanRoot::new_with_logical_plan(
887        scan_node,
888        RequiredDist::Any,
889        Order::any(),
890        required_cols,
891        vec![],
892    );
893
894    let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
895    let materialize = plan_root.gen_table_plan(
896        context,
897        resolved_table_name,
898        database_id,
899        schema_id,
900        CreateTableInfo {
901            columns,
902            pk_column_ids,
903            row_id_index: None,
904            watermark_descs: vec![],
905            source_catalog: Some((*source).clone()),
906            version: col_id_gen.into_version(),
907        },
908        CreateTableProps {
909            definition,
910            append_only: false,
911            on_conflict: on_conflict.into(),
912            with_version_column,
913            webhook_info: None,
914            engine,
915        },
916    )?;
917
918    let mut table = materialize.table().clone();
919    table.owner = session.user_id();
920    table.cdc_table_id = Some(cdc_table_id);
921    table.dependent_relations = vec![source.id.into()];
922
923    Ok((materialize.into(), table))
924}
925
926fn derive_with_options_for_cdc_table(
927    source_with_properties: &WithOptionsSecResolved,
928    external_table_name: String,
929) -> Result<WithOptionsSecResolved> {
930    use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
931    // we should remove the prefix from `full_table_name`
932    let source_database_name: &str = source_with_properties
933        .get("database.name")
934        .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
935        .as_str();
936    let mut with_options = source_with_properties.clone();
937    if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
938        match connector.as_str() {
939            MYSQL_CDC_CONNECTOR => {
940                // MySQL doesn't allow '.' in database name and table name, so we can split the
941                // external table name by '.' to get the table name
942                let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
943                    anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
944                })?;
945                // We allow multiple database names in the source definition
946                if !source_database_name
947                    .split(',')
948                    .map(|s| s.trim())
949                    .any(|name| name == db_name)
950                {
951                    return Err(anyhow!(
952                        "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
953                        db_name,
954                        source_database_name
955                    ).into());
956                }
957                with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
958                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
959            }
960            POSTGRES_CDC_CONNECTOR => {
961                let (schema_name, table_name) = external_table_name
962                    .split_once('.')
963                    .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
964
965                // insert 'schema.name' into connect properties
966                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
967                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
968            }
969            SQL_SERVER_CDC_CONNECTOR => {
970                // SQL Server external table name can be in different formats:
971                // 1. 'databaseName.schemaName.tableName' (full format)
972                // 2. 'schemaName.tableName' (schema and table only)
973                // 3. 'tableName' (table only, will use default schema 'dbo')
974                // We will auto-fill missing parts from source configuration
975                let parts: Vec<&str> = external_table_name.split('.').collect();
976                let (_, schema_name, table_name) = match parts.len() {
977                    3 => {
978                        // Full format: database.schema.table
979                        let db_name = parts[0];
980                        let schema_name = parts[1];
981                        let table_name = parts[2];
982
983                        // Verify database name matches source configuration
984                        if db_name != source_database_name {
985                            return Err(anyhow!(
986                                "The database name `{}` in the FROM clause is not the same as the database name `{}` in source definition",
987                                db_name,
988                                source_database_name
989                            ).into());
990                        }
991                        (db_name, schema_name, table_name)
992                    }
993                    2 => {
994                        // Schema and table only: schema.table
995                        let schema_name = parts[0];
996                        let table_name = parts[1];
997                        (source_database_name, schema_name, table_name)
998                    }
999                    1 => {
1000                        // Table only: table (use default schema 'dbo')
1001                        let table_name = parts[0];
1002                        (source_database_name, "dbo", table_name)
1003                    }
1004                    _ => {
1005                        return Err(anyhow!(
1006                            "The upstream table name must be in one of these formats: 'database.schema.table', 'schema.table', or 'table'"
1007                        ).into());
1008                    }
1009                };
1010
1011                // insert 'schema.name' into connect properties
1012                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1013                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1014            }
1015            _ => {
1016                return Err(RwError::from(anyhow!(
1017                    "connector {} is not supported for cdc table",
1018                    connector
1019                )));
1020            }
1021        };
1022    }
1023    Ok(with_options)
1024}
1025
1026#[allow(clippy::too_many_arguments)]
1027pub(super) async fn handle_create_table_plan(
1028    handler_args: HandlerArgs,
1029    explain_options: ExplainOptions,
1030    format_encode: Option<FormatEncodeOptions>,
1031    cdc_table_info: Option<CdcTableInfo>,
1032    table_name: ObjectName,
1033    column_defs: Vec<ColumnDef>,
1034    wildcard_idx: Option<usize>,
1035    constraints: Vec<TableConstraint>,
1036    source_watermarks: Vec<SourceWatermark>,
1037    append_only: bool,
1038    on_conflict: Option<OnConflict>,
1039    with_version_column: Option<String>,
1040    include_column_options: IncludeOption,
1041    webhook_info: Option<WebhookSourceInfo>,
1042    engine: Engine,
1043) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog, TableJobType)> {
1044    let col_id_gen = ColumnIdGenerator::new_initial();
1045    let format_encode = check_create_table_with_source(
1046        &handler_args.with_options,
1047        format_encode,
1048        &include_column_options,
1049        &cdc_table_info,
1050    )?;
1051    let webhook_info = webhook_info
1052        .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1053        .transpose()?;
1054
1055    let props = CreateTableProps {
1056        definition: handler_args.normalized_sql.clone(),
1057        append_only,
1058        on_conflict: on_conflict.into(),
1059        with_version_column: with_version_column.clone(),
1060        webhook_info,
1061        engine,
1062    };
1063
1064    let ((plan, source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
1065        (Some(format_encode), None) => (
1066            gen_create_table_plan_with_source(
1067                handler_args,
1068                explain_options,
1069                table_name.clone(),
1070                column_defs,
1071                wildcard_idx,
1072                constraints,
1073                format_encode,
1074                source_watermarks,
1075                col_id_gen,
1076                include_column_options,
1077                props,
1078                SqlColumnStrategy::FollowChecked,
1079            )
1080            .await?,
1081            TableJobType::General,
1082        ),
1083        (None, None) => {
1084            let context = OptimizerContext::new(handler_args, explain_options);
1085            let (plan, table) = gen_create_table_plan(
1086                context,
1087                table_name.clone(),
1088                column_defs,
1089                constraints,
1090                col_id_gen,
1091                source_watermarks,
1092                props,
1093                false,
1094            )?;
1095
1096            ((plan, None, table), TableJobType::General)
1097        }
1098
1099        (None, Some(cdc_table)) => {
1100            sanity_check_for_table_on_cdc_source(
1101                append_only,
1102                &column_defs,
1103                &wildcard_idx,
1104                &constraints,
1105                &source_watermarks,
1106            )?;
1107
1108            generated_columns_check_for_cdc_table(&column_defs)?;
1109            not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1110
1111            let session = &handler_args.session;
1112            let db_name = &session.database();
1113            let user_name = &session.user_name();
1114            let search_path = session.config().search_path();
1115            let (schema_name, resolved_table_name) =
1116                Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
1117            let (database_id, schema_id) =
1118                session.get_database_and_schema_id_for_create(schema_name.clone())?;
1119
1120            // cdc table cannot be append-only
1121            let (format_encode, source_name) =
1122                Binder::resolve_schema_qualified_name(db_name, cdc_table.source_name.clone())?;
1123
1124            let source = {
1125                let catalog_reader = session.env().catalog_reader().read_guard();
1126                let schema_path =
1127                    SchemaPath::new(format_encode.as_deref(), &search_path, user_name);
1128
1129                let (source, _) = catalog_reader.get_source_by_name(
1130                    db_name,
1131                    schema_path,
1132                    source_name.as_str(),
1133                )?;
1134                source.clone()
1135            };
1136            let cdc_with_options: WithOptionsSecResolved = derive_with_options_for_cdc_table(
1137                &source.with_properties,
1138                cdc_table.external_table_name.clone(),
1139            )?;
1140
1141            let (columns, pk_names) = match wildcard_idx {
1142                Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1143                None => {
1144                    for column_def in &column_defs {
1145                        for option_def in &column_def.options {
1146                            if let ColumnOption::DefaultValue(_)
1147                            | ColumnOption::DefaultValueInternal { .. } = option_def.option
1148                            {
1149                                return Err(ErrorCode::NotSupported(
1150                                            "Default value for columns defined on the table created from a CDC source".into(),
1151                                            "Remove the default value expression in the column definitions".into(),
1152                                        )
1153                                            .into());
1154                            }
1155                        }
1156                    }
1157
1158                    let (columns, pk_names) =
1159                        bind_cdc_table_schema(&column_defs, &constraints, false)?;
1160                    // read default value definition from external db
1161                    let (options, secret_refs) = cdc_with_options.clone().into_parts();
1162                    let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1163                        .context("failed to extract external table config")?;
1164
1165                    // NOTE: if the external table has a default column, we will only treat it as a normal column.
1166                    (columns, pk_names)
1167                }
1168            };
1169
1170            let context: OptimizerContextRef =
1171                OptimizerContext::new(handler_args, explain_options).into();
1172            let (plan, table) = gen_create_table_plan_for_cdc_table(
1173                context,
1174                source,
1175                cdc_table.external_table_name.clone(),
1176                column_defs,
1177                columns,
1178                pk_names,
1179                cdc_with_options,
1180                col_id_gen,
1181                on_conflict,
1182                with_version_column,
1183                include_column_options,
1184                table_name,
1185                resolved_table_name,
1186                database_id,
1187                schema_id,
1188                TableId::placeholder(),
1189                engine,
1190            )?;
1191
1192            ((plan, None, table), TableJobType::SharedCdcSource)
1193        }
1194        (Some(_), Some(_)) => {
1195            return Err(ErrorCode::NotSupported(
1196                "Data format and encoding format doesn't apply to table created from a CDC source"
1197                    .into(),
1198                "Remove the FORMAT and ENCODE specification".into(),
1199            )
1200            .into());
1201        }
1202    };
1203    Ok((plan, source, table, job_type))
1204}
1205
1206// For both table from cdc source and table with cdc connector
1207fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1208    let mut found_generated_column = false;
1209    for column in columns {
1210        let mut is_generated = false;
1211
1212        for option_def in &column.options {
1213            if let ColumnOption::GeneratedColumns(_) = option_def.option {
1214                is_generated = true;
1215                break;
1216            }
1217        }
1218
1219        if is_generated {
1220            found_generated_column = true;
1221        } else if found_generated_column {
1222            return Err(ErrorCode::NotSupported(
1223                "Non-generated column found after a generated column.".into(),
1224                "Ensure that all generated columns appear at the end of the cdc table definition."
1225                    .into(),
1226            )
1227            .into());
1228        }
1229    }
1230    Ok(())
1231}
1232
1233// For both table from cdc source and table with cdc connector
1234fn not_null_check_for_cdc_table(
1235    wildcard_idx: &Option<usize>,
1236    column_defs: &Vec<ColumnDef>,
1237) -> Result<()> {
1238    if !wildcard_idx.is_some()
1239        && column_defs.iter().any(|col| {
1240            col.options
1241                .iter()
1242                .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1243        })
1244    {
1245        return Err(ErrorCode::NotSupported(
1246            "CDC table with NOT NULL constraint is not supported".to_owned(),
1247            "Please remove the NOT NULL constraint for columns".to_owned(),
1248        )
1249        .into());
1250    }
1251    Ok(())
1252}
1253
1254// Only for table from cdc source
1255fn sanity_check_for_table_on_cdc_source(
1256    append_only: bool,
1257    column_defs: &Vec<ColumnDef>,
1258    wildcard_idx: &Option<usize>,
1259    constraints: &Vec<TableConstraint>,
1260    source_watermarks: &Vec<SourceWatermark>,
1261) -> Result<()> {
1262    // wildcard cannot be used with column definitions
1263    if wildcard_idx.is_some() && !column_defs.is_empty() {
1264        return Err(ErrorCode::NotSupported(
1265            "wildcard(*) and column definitions cannot be used together".to_owned(),
1266            "Remove the wildcard or column definitions".to_owned(),
1267        )
1268        .into());
1269    }
1270
1271    // cdc table must have primary key constraint or primary key column
1272    if !wildcard_idx.is_some()
1273        && !constraints.iter().any(|c| {
1274            matches!(
1275                c,
1276                TableConstraint::Unique {
1277                    is_primary: true,
1278                    ..
1279                }
1280            )
1281        })
1282        && !column_defs.iter().any(|col| {
1283            col.options
1284                .iter()
1285                .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1286        })
1287    {
1288        return Err(ErrorCode::NotSupported(
1289            "CDC table without primary key constraint is not supported".to_owned(),
1290            "Please define a primary key".to_owned(),
1291        )
1292        .into());
1293    }
1294
1295    if append_only {
1296        return Err(ErrorCode::NotSupported(
1297            "append only modifier on the table created from a CDC source".into(),
1298            "Remove the APPEND ONLY clause".into(),
1299        )
1300        .into());
1301    }
1302
1303    if !source_watermarks.is_empty() {
1304        return Err(ErrorCode::NotSupported(
1305            "watermark defined on the table created from a CDC source".into(),
1306            "Remove the Watermark definitions".into(),
1307        )
1308        .into());
1309    }
1310
1311    Ok(())
1312}
1313
1314/// Derive schema for cdc table when create a new Table or alter an existing Table
1315async fn bind_cdc_table_schema_externally(
1316    cdc_with_options: WithOptionsSecResolved,
1317) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1318    // read cdc table schema from external db or parsing the schema from SQL definitions
1319    let (options, secret_refs) = cdc_with_options.into_parts();
1320    let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1321        .context("failed to extract external table config")?;
1322
1323    let table = ExternalTableImpl::connect(config)
1324        .await
1325        .context("failed to auto derive table schema")?;
1326    Ok((
1327        table
1328            .column_descs()
1329            .iter()
1330            .cloned()
1331            .map(|column_desc| ColumnCatalog {
1332                column_desc,
1333                is_hidden: false,
1334            })
1335            .collect(),
1336        table.pk_names().clone(),
1337    ))
1338}
1339
1340/// Derive schema for cdc table when create a new Table or alter an existing Table
1341fn bind_cdc_table_schema(
1342    column_defs: &Vec<ColumnDef>,
1343    constraints: &Vec<TableConstraint>,
1344    is_for_replace_plan: bool,
1345) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1346    let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1347
1348    let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1349    Ok((columns, pk_names))
1350}
1351
1352#[allow(clippy::too_many_arguments)]
1353pub async fn handle_create_table(
1354    mut handler_args: HandlerArgs,
1355    table_name: ObjectName,
1356    column_defs: Vec<ColumnDef>,
1357    wildcard_idx: Option<usize>,
1358    constraints: Vec<TableConstraint>,
1359    if_not_exists: bool,
1360    format_encode: Option<FormatEncodeOptions>,
1361    source_watermarks: Vec<SourceWatermark>,
1362    append_only: bool,
1363    on_conflict: Option<OnConflict>,
1364    with_version_column: Option<String>,
1365    cdc_table_info: Option<CdcTableInfo>,
1366    include_column_options: IncludeOption,
1367    webhook_info: Option<WebhookSourceInfo>,
1368    ast_engine: risingwave_sqlparser::ast::Engine,
1369) -> Result<RwPgResponse> {
1370    let session = handler_args.session.clone();
1371
1372    if append_only {
1373        session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1374    }
1375
1376    session.check_cluster_limits().await?;
1377
1378    let engine = match ast_engine {
1379        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1380        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1381    };
1382    if engine == Engine::Iceberg && handler_args.with_options.get_connector().is_some() {
1383        // HACK: since we don't have atomic DDL, table with connector may lose data.
1384        // FIXME: remove this after https://github.com/risingwavelabs/risingwave/issues/21863
1385        if let Some(_rate_limit) = handler_args.with_options.insert(
1386            OverwriteOptions::SOURCE_RATE_LIMIT_KEY.to_owned(),
1387            "0".to_owned(),
1388        ) {
1389            // prevent user specified rate limit
1390            return Err(ErrorCode::NotSupported(
1391                "source_rate_limit for iceberg table engine during table creation".to_owned(),
1392                "Please remove source_rate_limit from WITH options.".to_owned(),
1393            )
1394            .into());
1395        }
1396    }
1397
1398    if let Either::Right(resp) = session.check_relation_name_duplicated(
1399        table_name.clone(),
1400        StatementType::CREATE_TABLE,
1401        if_not_exists,
1402    )? {
1403        return Ok(resp);
1404    }
1405
1406    let (graph, source, hummock_table, job_type) = {
1407        let (plan, source, table, job_type) = handle_create_table_plan(
1408            handler_args.clone(),
1409            ExplainOptions::default(),
1410            format_encode,
1411            cdc_table_info,
1412            table_name.clone(),
1413            column_defs.clone(),
1414            wildcard_idx,
1415            constraints.clone(),
1416            source_watermarks,
1417            append_only,
1418            on_conflict,
1419            with_version_column,
1420            include_column_options,
1421            webhook_info,
1422            engine,
1423        )
1424        .await?;
1425        tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1426
1427        let graph = build_graph(plan, Some(GraphJobType::Table))?;
1428
1429        (graph, source, table, job_type)
1430    };
1431
1432    tracing::trace!(
1433        "name={}, graph=\n{}",
1434        table_name,
1435        serde_json::to_string_pretty(&graph).unwrap()
1436    );
1437
1438    // Handle engine
1439    match engine {
1440        Engine::Hummock => {
1441            let catalog_writer = session.catalog_writer()?;
1442            catalog_writer
1443                .create_table(
1444                    source.map(|s| s.to_prost()),
1445                    hummock_table.to_prost(),
1446                    graph,
1447                    job_type,
1448                    if_not_exists,
1449                )
1450                .await?;
1451        }
1452        Engine::Iceberg => {
1453            create_iceberg_engine_table(
1454                session,
1455                handler_args,
1456                source.map(|s| s.to_prost()),
1457                hummock_table,
1458                graph,
1459                table_name,
1460                job_type,
1461                if_not_exists,
1462            )
1463            .await?;
1464        }
1465    }
1466
1467    Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1468}
1469
1470/// Iceberg table engine is composed of hummock table, iceberg sink and iceberg source.
1471///
1472/// 1. fetch iceberg engine options from the meta node. Or use iceberg engine connection provided by users.
1473/// 2. create a hummock table
1474/// 3. create an iceberg sink
1475/// 4. create an iceberg source
1476///
1477/// See <https://github.com/risingwavelabs/risingwave/issues/21586> for an architecture diagram.
1478#[allow(clippy::too_many_arguments)]
1479pub async fn create_iceberg_engine_table(
1480    session: Arc<SessionImpl>,
1481    handler_args: HandlerArgs,
1482    mut source: Option<PbSource>,
1483    table: TableCatalog,
1484    graph: StreamFragmentGraph,
1485    table_name: ObjectName,
1486    job_type: PbTableJobType,
1487    if_not_exists: bool,
1488) -> Result<()> {
1489    let meta_client = session.env().meta_client();
1490    let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1491
1492    let meta_store_endpoint = url::Url::parse(&meta_store_endpoint).map_err(|_| {
1493        ErrorCode::InternalError("failed to parse the meta store endpoint".to_owned())
1494    })?;
1495    let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1496    let meta_store_user = meta_store_endpoint.username().to_owned();
1497    let meta_store_password = match meta_store_endpoint.password() {
1498        Some(password) => percent_decode_str(password)
1499            .decode_utf8()
1500            .map_err(|_| {
1501                ErrorCode::InternalError(
1502                    "failed to parse password from meta store endpoint".to_owned(),
1503                )
1504            })?
1505            .into_owned(),
1506        None => "".to_owned(),
1507    };
1508    let meta_store_host = meta_store_endpoint
1509        .host_str()
1510        .ok_or_else(|| {
1511            ErrorCode::InternalError("failed to parse host from meta store endpoint".to_owned())
1512        })?
1513        .to_owned();
1514    let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1515        ErrorCode::InternalError("failed to parse port from meta store endpoint".to_owned())
1516    })?;
1517    let meta_store_database = meta_store_endpoint
1518        .path()
1519        .trim_start_matches('/')
1520        .to_owned();
1521
1522    let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1523        bail!("failed to parse meta backend: {}", meta_store_backend);
1524    };
1525
1526    let catalog_uri = match meta_backend {
1527        MetaBackend::Postgres => {
1528            format!(
1529                "jdbc:postgresql://{}:{}/{}",
1530                meta_store_host.clone(),
1531                meta_store_port.clone(),
1532                meta_store_database.clone()
1533            )
1534        }
1535        MetaBackend::Mysql => {
1536            format!(
1537                "jdbc:mysql://{}:{}/{}",
1538                meta_store_host.clone(),
1539                meta_store_port.clone(),
1540                meta_store_database.clone()
1541            )
1542        }
1543        MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1544            bail!(
1545                "Unsupported meta backend for iceberg engine table: {}",
1546                meta_store_backend
1547            );
1548        }
1549    };
1550
1551    let rw_db_name = session
1552        .env()
1553        .catalog_reader()
1554        .read_guard()
1555        .get_database_by_id(&table.database_id)?
1556        .name()
1557        .to_owned();
1558    let rw_schema_name = session
1559        .env()
1560        .catalog_reader()
1561        .read_guard()
1562        .get_schema_by_id(&table.database_id, &table.schema_id)?
1563        .name()
1564        .clone();
1565    let iceberg_catalog_name = rw_db_name.clone();
1566    let iceberg_database_name = rw_schema_name.clone();
1567    let iceberg_table_name = table_name.0.last().unwrap().real_value();
1568
1569    let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1570    let sink_decouple = session.config().sink_decouple();
1571    if matches!(sink_decouple, SinkDecouple::Disable) {
1572        bail!(
1573            "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1574        );
1575    }
1576
1577    let mut connection_ref = BTreeMap::new();
1578    let with_common = if iceberg_engine_connection.is_empty() {
1579        bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1580    } else {
1581        let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1582        assert_eq!(parts.len(), 2);
1583        let connection_catalog =
1584            session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1585        if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1586            if params.connection_type == ConnectionType::Iceberg as i32 {
1587                // With iceberg engine connection:
1588                connection_ref.insert(
1589                    "connection".to_owned(),
1590                    ConnectionRefValue {
1591                        connection_name: ObjectName::from(vec![
1592                            Ident::from(parts[0]),
1593                            Ident::from(parts[1]),
1594                        ]),
1595                    },
1596                );
1597
1598                let mut with_common = BTreeMap::new();
1599                with_common.insert("connector".to_owned(), "iceberg".to_owned());
1600                with_common.insert("database.name".to_owned(), iceberg_database_name.to_owned());
1601                with_common.insert("table.name".to_owned(), iceberg_table_name.to_owned());
1602
1603                if let Some(s) = params.properties.get("hosted_catalog") {
1604                    if s.eq_ignore_ascii_case("true") {
1605                        with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1606                        with_common.insert("catalog.uri".to_owned(), catalog_uri.to_owned());
1607                        with_common
1608                            .insert("catalog.jdbc.user".to_owned(), meta_store_user.to_owned());
1609                        with_common.insert(
1610                            "catalog.jdbc.password".to_owned(),
1611                            meta_store_password.clone(),
1612                        );
1613                        with_common
1614                            .insert("catalog.name".to_owned(), iceberg_catalog_name.to_owned());
1615                    }
1616                }
1617
1618                with_common
1619            } else {
1620                return Err(RwError::from(ErrorCode::InvalidParameterValue(
1621                    "Only iceberg connection could be used in iceberg engine".to_owned(),
1622                )));
1623            }
1624        } else {
1625            return Err(RwError::from(ErrorCode::InvalidParameterValue(
1626                "Private Link Service has been deprecated. Please create a new connection instead."
1627                    .to_owned(),
1628            )));
1629        }
1630    };
1631
1632    // Iceberg sinks require a primary key, if none is provided, we will use the _row_id column
1633    // Fetch primary key from columns
1634    let mut pks = table
1635        .pk_column_names()
1636        .iter()
1637        .map(|c| c.to_string())
1638        .collect::<Vec<String>>();
1639
1640    // For the table without primary key. We will use `_row_id` as primary key
1641    let sink_from = if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1642        pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1643        let [stmt]: [_; 1] = Parser::parse_sql(&format!(
1644            "select {} as {}, * from {}",
1645            ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, table_name
1646        ))
1647        .context("unable to parse query")?
1648        .try_into()
1649        .unwrap();
1650
1651        let Statement::Query(query) = &stmt else {
1652            panic!("unexpected statement: {:?}", stmt);
1653        };
1654        CreateSink::AsQuery(query.clone())
1655    } else {
1656        CreateSink::From(table_name.clone())
1657    };
1658
1659    let mut sink_name = table_name.clone();
1660    *sink_name.0.last_mut().unwrap() = Ident::from(
1661        (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1662    );
1663    let create_sink_stmt = CreateSinkStatement {
1664        if_not_exists: false,
1665        sink_name,
1666        with_properties: WithProperties(vec![]),
1667        sink_from,
1668        columns: vec![],
1669        emit_mode: None,
1670        sink_schema: None,
1671        into_table_name: None,
1672    };
1673
1674    let mut sink_handler_args = handler_args.clone();
1675
1676    let mut sink_with = with_common.clone();
1677
1678    sink_with.insert("primary_key".to_owned(), pks.join(","));
1679    sink_with.insert("type".to_owned(), "upsert".to_owned());
1680    // sink_with.insert(SINK_SNAPSHOT_OPTION.to_owned(), "false".to_owned());
1681    //
1682    // Note: in theory, we don't need to backfill from the table to the sink,
1683    // but we don't have atomic DDL now https://github.com/risingwavelabs/risingwave/issues/21863
1684    // so it may have potential data loss problem on the first barrier.
1685    //
1686    // For non-append-only table, we can always solve it by the initial sink with backfill, since
1687    // data will be present in hummock table.
1688    //
1689    // For append-only table, we need to be more careful.
1690    //
1691    // The possible cases for a table:
1692    // - For table without connector: it doesn't matter, since there's no data before the table is created
1693    // - For table with connector: we workarounded it by setting SOURCE_RATE_LIMIT to 0
1694    //   + If we support blocking DDL for table with connector, we need to be careful.
1695    // - For table with an upstream job: Specifically, CDC table from shared CDC source.
1696    //   + Data may come from both upstream connector, and CDC table backfill, so we need to pause both of them.
1697    //   + For now we don't support APPEND ONLY CDC table, so it's safe.
1698    let commit_checkpoint_interval = handler_args
1699        .with_options
1700        .get(COMMIT_CHECKPOINT_INTERVAL)
1701        .map(|v| v.to_owned())
1702        .unwrap_or_else(|| "60".to_owned());
1703    let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1704        ErrorCode::InvalidInputSyntax(format!(
1705            "commit_checkpoint_interval must be a positive integer: {}",
1706            commit_checkpoint_interval
1707        ))
1708    })?;
1709
1710    if commit_checkpoint_interval == 0 {
1711        bail!("commit_checkpoint_interval must be a positive integer: 0");
1712    }
1713
1714    // remove commit_checkpoint_interval from source options, otherwise it will be considered as an unknown field.
1715    source
1716        .as_mut()
1717        .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1718
1719    let sink_decouple = session.config().sink_decouple();
1720    if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1721        bail!(
1722            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1723        )
1724    }
1725
1726    sink_with.insert(
1727        COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1728        commit_checkpoint_interval.to_string(),
1729    );
1730    sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1731
1732    sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1733
1734    const ENABLE_COMPACTION: &str = "enable_compaction";
1735    const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
1736    const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
1737
1738    if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1739        match enable_compaction.to_lowercase().as_str() {
1740            "true" => {
1741                risingwave_common::license::Feature::IcebergCompaction
1742                    .check_available()
1743                    .map_err(|e| anyhow::anyhow!(e))?;
1744
1745                sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1746            }
1747            "false" => {
1748                sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1749            }
1750            _ => {
1751                return Err(ErrorCode::InvalidInputSyntax(format!(
1752                    "enable_compaction must be true or false: {}",
1753                    enable_compaction
1754                ))
1755                .into());
1756            }
1757        }
1758
1759        // remove enable_compaction from source options, otherwise it will be considered as an unknown field.
1760        source
1761            .as_mut()
1762            .map(|x| x.with_properties.remove("enable_compaction"));
1763    } else {
1764        sink_with.insert(
1765            ENABLE_COMPACTION.to_owned(),
1766            risingwave_common::license::Feature::IcebergCompaction
1767                .check_available()
1768                .is_ok()
1769                .to_string(),
1770        );
1771    }
1772
1773    if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1774        let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1775            ErrorCode::InvalidInputSyntax(format!(
1776                "compaction_interval_sec must be a positive integer: {}",
1777                commit_checkpoint_interval
1778            ))
1779        })?;
1780        if compaction_interval_sec == 0 {
1781            bail!("compaction_interval_sec must be a positive integer: 0");
1782        }
1783        sink_with.insert(
1784            "compaction_interval_sec".to_owned(),
1785            compaction_interval_sec.to_string(),
1786        );
1787        // remove compaction_interval_sec from source options, otherwise it will be considered as an unknown field.
1788        source
1789            .as_mut()
1790            .map(|x| x.with_properties.remove("compaction_interval_sec"));
1791    }
1792
1793    if let Some(enable_snapshot_expiration) =
1794        handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1795    {
1796        match enable_snapshot_expiration.to_lowercase().as_str() {
1797            "true" => {
1798                risingwave_common::license::Feature::IcebergCompaction
1799                    .check_available()
1800                    .map_err(|e| anyhow::anyhow!(e))?;
1801                sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1802            }
1803            "false" => {
1804                sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1805            }
1806            _ => {
1807                return Err(ErrorCode::InvalidInputSyntax(format!(
1808                    "enable_snapshot_expiration must be true or false: {}",
1809                    enable_snapshot_expiration
1810                ))
1811                .into());
1812            }
1813        }
1814
1815        // remove enable_snapshot_expiration from source options, otherwise it will be considered as an unknown field.
1816        source
1817            .as_mut()
1818            .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1819    } else {
1820        sink_with.insert(
1821            ENABLE_SNAPSHOT_EXPIRATION.to_owned(),
1822            risingwave_common::license::Feature::IcebergCompaction
1823                .check_available()
1824                .is_ok()
1825                .to_string(),
1826        );
1827    }
1828
1829    let partition_by = handler_args
1830        .with_options
1831        .get("partition_by")
1832        .map(|v| v.to_owned());
1833
1834    if let Some(partition_by) = &partition_by {
1835        let mut partition_columns = vec![];
1836        for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
1837            table
1838                .columns()
1839                .iter()
1840                .find(|col| col.name().eq_ignore_ascii_case(&column))
1841                .ok_or_else(|| {
1842                    ErrorCode::InvalidInputSyntax(format!(
1843                        "Partition source column does not exist in schema: {}",
1844                        column
1845                    ))
1846                })?;
1847
1848            partition_columns.push(column.to_owned());
1849        }
1850
1851        ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
1852            |_| {
1853                ErrorCode::InvalidInputSyntax(
1854                    "The partition columns should be the prefix of the primary key".to_owned(),
1855                )
1856            },
1857        )?;
1858
1859        sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
1860
1861        // remove partition_by from source options, otherwise it will be considered as an unknown field.
1862        source
1863            .as_mut()
1864            .map(|x| x.with_properties.remove("partition_by"));
1865    }
1866
1867    sink_handler_args.with_options =
1868        WithOptions::new(sink_with, Default::default(), connection_ref.clone());
1869
1870    let mut source_name = table_name.clone();
1871    *source_name.0.last_mut().unwrap() = Ident::from(
1872        (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
1873    );
1874    let create_source_stmt = CreateSourceStatement {
1875        temporary: false,
1876        if_not_exists: false,
1877        columns: vec![],
1878        source_name,
1879        wildcard_idx: Some(0),
1880        constraints: vec![],
1881        with_properties: WithProperties(vec![]),
1882        format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
1883        source_watermarks: vec![],
1884        include_column_options: vec![],
1885    };
1886
1887    let mut source_handler_args = handler_args.clone();
1888    let source_with = with_common;
1889    source_handler_args.with_options =
1890        WithOptions::new(source_with, Default::default(), connection_ref);
1891
1892    // before we create the table, ensure the JVM is initialized as we use jdbc catalog right now.
1893    // If JVM isn't initialized successfully, current not atomic ddl will result in a partially created iceberg engine table.
1894    let _ = JVM.get_or_init()?;
1895
1896    let catalog_writer = session.catalog_writer()?;
1897    // TODO(iceberg): make iceberg engine table creation ddl atomic
1898    let has_connector = source.is_some();
1899    catalog_writer
1900        .create_table(source, table.to_prost(), graph, job_type, if_not_exists)
1901        .await?;
1902    let res = create_sink::handle_create_sink(sink_handler_args, create_sink_stmt, true).await;
1903    if res.is_err() {
1904        // Since we don't support ddl atomicity, we need to drop the partial created table.
1905        handle_drop_table(handler_args.clone(), table_name.clone(), true, true).await?;
1906        res?;
1907    }
1908    let res = create_source::handle_create_source(source_handler_args, create_source_stmt).await;
1909    if res.is_err() {
1910        // Since we don't support ddl atomicity, we need to drop the partial created table.
1911        handle_drop_table(handler_args.clone(), table_name.clone(), true, true).await?;
1912        res?;
1913    }
1914
1915    if has_connector {
1916        alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1917            handler_args,
1918            PbThrottleTarget::TableWithSource,
1919            table_name,
1920            -1,
1921        )
1922        .await?;
1923    }
1924
1925    Ok(())
1926}
1927
1928pub fn check_create_table_with_source(
1929    with_options: &WithOptions,
1930    format_encode: Option<FormatEncodeOptions>,
1931    include_column_options: &IncludeOption,
1932    cdc_table_info: &Option<CdcTableInfo>,
1933) -> Result<Option<FormatEncodeOptions>> {
1934    // skip check for cdc table
1935    if cdc_table_info.is_some() {
1936        return Ok(format_encode);
1937    }
1938    let defined_source = with_options.is_source_connector();
1939
1940    if !include_column_options.is_empty() && !defined_source {
1941        return Err(ErrorCode::InvalidInputSyntax(
1942            "INCLUDE should be used with a connector".to_owned(),
1943        )
1944        .into());
1945    }
1946    if defined_source {
1947        format_encode.as_ref().ok_or_else(|| {
1948            ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
1949        })?;
1950    }
1951    Ok(format_encode)
1952}
1953
1954fn ensure_partition_columns_are_prefix_of_primary_key(
1955    partition_columns: &[String],
1956    primary_key_columns: &[String],
1957) -> std::result::Result<(), String> {
1958    if partition_columns.len() > primary_key_columns.len() {
1959        return Err("Partition columns cannot be longer than primary key columns.".to_owned());
1960    }
1961
1962    for (i, partition_col) in partition_columns.iter().enumerate() {
1963        if primary_key_columns.get(i) != Some(partition_col) {
1964            return Err(format!(
1965                "Partition column '{}' is not a prefix of the primary key.",
1966                partition_col
1967            ));
1968        }
1969    }
1970
1971    Ok(())
1972}
1973
1974#[allow(clippy::too_many_arguments)]
1975pub async fn generate_stream_graph_for_replace_table(
1976    _session: &Arc<SessionImpl>,
1977    table_name: ObjectName,
1978    original_catalog: &Arc<TableCatalog>,
1979    handler_args: HandlerArgs,
1980    statement: Statement,
1981    col_id_gen: ColumnIdGenerator,
1982    sql_column_strategy: SqlColumnStrategy,
1983) -> Result<(
1984    StreamFragmentGraph,
1985    TableCatalog,
1986    Option<SourceCatalog>,
1987    TableJobType,
1988)> {
1989    let Statement::CreateTable {
1990        columns,
1991        constraints,
1992        source_watermarks,
1993        append_only,
1994        on_conflict,
1995        with_version_column,
1996        wildcard_idx,
1997        cdc_table_info,
1998        format_encode,
1999        include_column_options,
2000        engine,
2001        with_options,
2002        ..
2003    } = statement
2004    else {
2005        panic!("unexpected statement type: {:?}", statement);
2006    };
2007
2008    let format_encode = format_encode
2009        .clone()
2010        .map(|format_encode| format_encode.into_v2_with_warning());
2011
2012    let engine = match engine {
2013        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
2014        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
2015    };
2016
2017    let is_drop_connector =
2018        original_catalog.associated_source_id().is_some() && format_encode.is_none();
2019    if is_drop_connector {
2020        debug_assert!(
2021            source_watermarks.is_empty()
2022                && include_column_options.is_empty()
2023                && with_options
2024                    .iter()
2025                    .all(|opt| opt.name.real_value().to_lowercase() != "connector")
2026        );
2027    }
2028
2029    let props = CreateTableProps {
2030        definition: handler_args.normalized_sql.clone(),
2031        append_only,
2032        on_conflict: on_conflict.into(),
2033        with_version_column: with_version_column.as_ref().map(|x| x.real_value()),
2034        webhook_info: original_catalog.webhook_info.clone(),
2035        engine,
2036    };
2037
2038    let ((plan, mut source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2039        (Some(format_encode), None) => (
2040            gen_create_table_plan_with_source(
2041                handler_args,
2042                ExplainOptions::default(),
2043                table_name,
2044                columns,
2045                wildcard_idx,
2046                constraints,
2047                format_encode,
2048                source_watermarks,
2049                col_id_gen,
2050                include_column_options,
2051                props,
2052                sql_column_strategy,
2053            )
2054            .await?,
2055            TableJobType::General,
2056        ),
2057        (None, None) => {
2058            let context = OptimizerContext::from_handler_args(handler_args);
2059            let (plan, table) = gen_create_table_plan(
2060                context,
2061                table_name,
2062                columns,
2063                constraints,
2064                col_id_gen,
2065                source_watermarks,
2066                props,
2067                true,
2068            )?;
2069            ((plan, None, table), TableJobType::General)
2070        }
2071        (None, Some(cdc_table)) => {
2072            let session = &handler_args.session;
2073            let (source, resolved_table_name, database_id, schema_id) =
2074                get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2075
2076            let cdc_with_options = derive_with_options_for_cdc_table(
2077                &source.with_properties,
2078                cdc_table.external_table_name.clone(),
2079            )?;
2080
2081            let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2082
2083            let context: OptimizerContextRef =
2084                OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2085            let (plan, table) = gen_create_table_plan_for_cdc_table(
2086                context,
2087                source,
2088                cdc_table.external_table_name.clone(),
2089                columns,
2090                column_catalogs,
2091                pk_names,
2092                cdc_with_options,
2093                col_id_gen,
2094                on_conflict,
2095                with_version_column.map(|x| x.real_value()),
2096                IncludeOption::default(),
2097                table_name,
2098                resolved_table_name,
2099                database_id,
2100                schema_id,
2101                original_catalog.id(),
2102                engine,
2103            )?;
2104
2105            ((plan, None, table), TableJobType::SharedCdcSource)
2106        }
2107        (Some(_), Some(_)) => {
2108            return Err(ErrorCode::NotSupported(
2109                "Data format and encoding format doesn't apply to table created from a CDC source"
2110                    .into(),
2111                "Remove the FORMAT and ENCODE specification".into(),
2112            )
2113            .into());
2114        }
2115    };
2116
2117    if table.pk_column_ids() != original_catalog.pk_column_ids() {
2118        Err(ErrorCode::InvalidInputSyntax(
2119            "alter primary key of table is not supported".to_owned(),
2120        ))?
2121    }
2122
2123    let graph = build_graph(plan, Some(GraphJobType::Table))?;
2124
2125    // Fill the original table ID.
2126    let mut table = TableCatalog {
2127        id: original_catalog.id(),
2128        ..table
2129    };
2130    if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2131        table.associated_source_id = Some(source_id);
2132
2133        let source = source.as_mut().unwrap();
2134        source.id = source_id.table_id;
2135        source.associated_table_id = Some(table.id());
2136    }
2137
2138    Ok((graph, table, source, job_type))
2139}
2140
2141fn get_source_and_resolved_table_name(
2142    session: &Arc<SessionImpl>,
2143    cdc_table: CdcTableInfo,
2144    table_name: ObjectName,
2145) -> Result<(Arc<SourceCatalog>, String, DatabaseId, SchemaId)> {
2146    let db_name = &session.database();
2147    let (schema_name, resolved_table_name) =
2148        Binder::resolve_schema_qualified_name(db_name, table_name)?;
2149    let (database_id, schema_id) =
2150        session.get_database_and_schema_id_for_create(schema_name.clone())?;
2151
2152    let (format_encode, source_name) =
2153        Binder::resolve_schema_qualified_name(db_name, cdc_table.source_name.clone())?;
2154
2155    let source = {
2156        let catalog_reader = session.env().catalog_reader().read_guard();
2157        let schema_name = format_encode.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2158        let (source, _) = catalog_reader.get_source_by_name(
2159            db_name,
2160            SchemaPath::Name(schema_name.as_str()),
2161            source_name.as_str(),
2162        )?;
2163        source.clone()
2164    };
2165
2166    Ok((source, resolved_table_name, database_id, schema_id))
2167}
2168
2169// validate the webhook_info and also bind the webhook_info to protobuf
2170fn bind_webhook_info(
2171    session: &Arc<SessionImpl>,
2172    columns_defs: &[ColumnDef],
2173    webhook_info: WebhookSourceInfo,
2174) -> Result<PbWebhookSourceInfo> {
2175    // validate columns
2176    if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &AstDataType::Jsonb
2177    {
2178        return Err(ErrorCode::InvalidInputSyntax(
2179            "Table with webhook source should have exactly one JSONB column".to_owned(),
2180        )
2181        .into());
2182    }
2183
2184    let WebhookSourceInfo {
2185        secret_ref,
2186        signature_expr,
2187        wait_for_persistence,
2188        is_batched,
2189    } = webhook_info;
2190
2191    // validate secret_ref
2192    let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2193        let db_name = &session.database();
2194        let (schema_name, secret_name) =
2195            Binder::resolve_schema_qualified_name(db_name, secret_ref.secret_name.clone())?;
2196        let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2197        (
2198            Some(PbSecretRef {
2199                secret_id: secret_catalog.id.secret_id(),
2200                ref_as: match secret_ref.ref_as {
2201                    SecretRefAsType::Text => PbRefAsType::Text,
2202                    SecretRefAsType::File => PbRefAsType::File,
2203                }
2204                .into(),
2205            }),
2206            Some(secret_name),
2207        )
2208    } else {
2209        (None, None)
2210    };
2211
2212    let secure_compare_context = SecureCompareContext {
2213        column_name: columns_defs[0].name.real_value(),
2214        secret_name,
2215    };
2216    let mut binder = Binder::new_for_ddl_with_secure_compare(session, secure_compare_context);
2217    let expr = binder.bind_expr(signature_expr.clone())?;
2218
2219    // validate expr, ensuring it is SECURE_COMPARE()
2220    if expr.as_function_call().is_none()
2221        || expr.as_function_call().unwrap().func_type()
2222            != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2223    {
2224        return Err(ErrorCode::InvalidInputSyntax(
2225            "The signature verification function must be SECURE_COMPARE()".to_owned(),
2226        )
2227        .into());
2228    }
2229
2230    let pb_webhook_info = PbWebhookSourceInfo {
2231        secret_ref: pb_secret_ref,
2232        signature_expr: Some(expr.to_expr_proto()),
2233        wait_for_persistence,
2234        is_batched,
2235    };
2236
2237    Ok(pb_webhook_info)
2238}
2239
2240#[cfg(test)]
2241mod tests {
2242    use risingwave_common::catalog::{
2243        DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2244    };
2245    use risingwave_common::types::{DataType, StructType};
2246
2247    use super::*;
2248    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2249
2250    #[tokio::test]
2251    async fn test_create_table_handler() {
2252        let sql =
2253            "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2254        let frontend = LocalFrontend::new(Default::default()).await;
2255        frontend.run_sql(sql).await.unwrap();
2256
2257        let session = frontend.session_ref();
2258        let catalog_reader = session.env().catalog_reader().read_guard();
2259        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2260
2261        // Check table exists.
2262        let (table, _) = catalog_reader
2263            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2264            .unwrap();
2265        assert_eq!(table.name(), "t");
2266
2267        let columns = table
2268            .columns
2269            .iter()
2270            .map(|col| (col.name(), col.data_type().clone()))
2271            .collect::<HashMap<&str, DataType>>();
2272
2273        let expected_columns = maplit::hashmap! {
2274            ROW_ID_COLUMN_NAME => DataType::Serial,
2275            "v1" => DataType::Int16,
2276            "v2" => StructType::new(
2277                vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2278            )
2279            .with_ids([3, 4, 5].map(ColumnId::new))
2280            .into(),
2281            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2282        };
2283
2284        assert_eq!(columns, expected_columns, "{columns:#?}");
2285    }
2286
2287    #[test]
2288    fn test_bind_primary_key() {
2289        // Note: Column ID 0 is reserved for row ID column.
2290
2291        for (sql, expected) in [
2292            ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
2293            ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
2294            ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
2295            (
2296                "create table t (v1 int primary key, v2 int primary key)",
2297                Err("multiple primary keys are not allowed"),
2298            ),
2299            (
2300                "create table t (v1 int primary key primary key, v2 int)",
2301                Err("multiple primary keys are not allowed"),
2302            ),
2303            (
2304                "create table t (v1 int, v2 int, primary key (v1))",
2305                Ok(&[1]),
2306            ),
2307            (
2308                "create table t (v1 int, primary key (v2), v2 int)",
2309                Ok(&[2]),
2310            ),
2311            (
2312                "create table t (primary key (v2, v1), v1 int, v2 int)",
2313                Ok(&[2, 1]),
2314            ),
2315            (
2316                "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
2317                Err("multiple primary keys are not allowed"),
2318            ),
2319            (
2320                "create table t (v1 int primary key, primary key (v1), v2 int)",
2321                Err("multiple primary keys are not allowed"),
2322            ),
2323            (
2324                "create table t (v1 int, primary key (V3), v2 int)",
2325                Err("column \"v3\" named in key does not exist"),
2326            ),
2327        ] {
2328            let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
2329            let risingwave_sqlparser::ast::Statement::CreateTable {
2330                columns: column_defs,
2331                constraints,
2332                ..
2333            } = ast.remove(0)
2334            else {
2335                panic!("test case should be create table")
2336            };
2337            let actual: Result<_> = (|| {
2338                let mut columns = bind_sql_columns(&column_defs, false)?;
2339                let mut col_id_gen = ColumnIdGenerator::new_initial();
2340                for c in &mut columns {
2341                    col_id_gen.generate(c)?;
2342                }
2343
2344                let pk_names =
2345                    bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
2346                let (_, pk_column_ids, _) =
2347                    bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
2348                Ok(pk_column_ids)
2349            })();
2350            match (expected, actual) {
2351                (Ok(expected), Ok(actual)) => assert_eq!(
2352                    expected.iter().copied().map(ColumnId::new).collect_vec(),
2353                    actual,
2354                    "sql: {sql}"
2355                ),
2356                (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
2357                (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
2358                (Err(expected), Err(actual)) => assert!(
2359                    actual.to_string().contains(expected),
2360                    "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
2361                ),
2362            }
2363        }
2364    }
2365
2366    #[tokio::test]
2367    async fn test_duplicate_props_options() {
2368        let proto_file = create_proto_file(PROTO_FILE_DATA);
2369        let sql = format!(
2370            r#"CREATE TABLE t
2371    WITH (
2372        connector = 'kinesis',
2373        aws.region='user_test_topic',
2374        endpoint='172.10.1.1:9090,172.10.1.2:9090',
2375        aws.credentials.access_key_id = 'your_access_key_1',
2376        aws.credentials.secret_access_key = 'your_secret_key_1'
2377    )
2378    FORMAT PLAIN ENCODE PROTOBUF (
2379        message = '.test.TestRecord',
2380        aws.credentials.access_key_id = 'your_access_key_2',
2381        aws.credentials.secret_access_key = 'your_secret_key_2',
2382        schema.location = 'file://{}',
2383    )"#,
2384            proto_file.path().to_str().unwrap()
2385        );
2386        let frontend = LocalFrontend::new(Default::default()).await;
2387        frontend.run_sql(sql).await.unwrap();
2388
2389        let session = frontend.session_ref();
2390        let catalog_reader = session.env().catalog_reader().read_guard();
2391        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2392
2393        // Check source exists.
2394        let (source, _) = catalog_reader
2395            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2396            .unwrap();
2397        assert_eq!(source.name, "t");
2398
2399        // AwsAuth params exist in options.
2400        assert_eq!(
2401            source
2402                .info
2403                .format_encode_options
2404                .get("aws.credentials.access_key_id")
2405                .unwrap(),
2406            "your_access_key_2"
2407        );
2408        assert_eq!(
2409            source
2410                .info
2411                .format_encode_options
2412                .get("aws.credentials.secret_access_key")
2413                .unwrap(),
2414            "your_secret_key_2"
2415        );
2416
2417        // AwsAuth params exist in props.
2418        assert_eq!(
2419            source
2420                .with_properties
2421                .get("aws.credentials.access_key_id")
2422                .unwrap(),
2423            "your_access_key_1"
2424        );
2425        assert_eq!(
2426            source
2427                .with_properties
2428                .get("aws.credentials.secret_access_key")
2429                .unwrap(),
2430            "your_secret_key_1"
2431        );
2432
2433        // Options are not merged into props.
2434        assert!(!source.with_properties.contains_key("schema.location"));
2435    }
2436}