risingwave_frontend/handler/
create_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use pgwire::pg_response::{PgResponse, StatementType};
25use prost::Message as _;
26use risingwave_common::catalog::{
27    CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
28    RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, TableId,
29};
30use risingwave_common::config::MetaBackend;
31use risingwave_common::license::Feature;
32use risingwave_common::session_config::sink_decouple::SinkDecouple;
33use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
34use risingwave_common::util::value_encoding::DatumToProtoExt;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::jvm_runtime::JVM;
37use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_connector::source::cdc::external::{
40    DATABASE_NAME_KEY, ExternalTableConfig, ExternalTableImpl, SCHEMA_NAME_KEY, TABLE_NAME_KEY,
41};
42use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, source};
43use risingwave_pb::catalog::connection::Info as ConnectionInfo;
44use risingwave_pb::catalog::connection_params::ConnectionType;
45use risingwave_pb::catalog::source::OptionalAssociatedTableId;
46use risingwave_pb::catalog::{PbSource, PbTable, PbWebhookSourceInfo, Table, WatermarkDesc};
47use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
48use risingwave_pb::meta::PbThrottleTarget;
49use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
50use risingwave_pb::plan_common::{
51    AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
52};
53use risingwave_pb::secret::PbSecretRef;
54use risingwave_pb::secret::secret_ref::PbRefAsType;
55use risingwave_pb::stream_plan::StreamFragmentGraph;
56use risingwave_sqlparser::ast::{
57    CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
58    CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
59    FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
60    Statement, TableConstraint, WebhookSourceInfo, WithProperties,
61};
62use risingwave_sqlparser::parser::{IncludeOption, Parser};
63use thiserror_ext::AsReport;
64
65use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
66use super::{RwPgResponse, alter_streaming_rate_limit, create_sink, create_source};
67use crate::binder::{Clause, SecureCompareContext, bind_data_type};
68use crate::catalog::root_catalog::SchemaPath;
69use crate::catalog::source_catalog::SourceCatalog;
70use crate::catalog::table_catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, TableVersion};
71use crate::catalog::{ColumnId, DatabaseId, SchemaId, check_column_name_not_reserved};
72use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
73use crate::expr::{Expr, ExprImpl, ExprRewriter};
74use crate::handler::HandlerArgs;
75use crate::handler::create_source::{
76    UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
77    bind_source_watermark, handle_addition_columns,
78};
79use crate::handler::util::SourceSchemaCompatExt;
80use crate::optimizer::plan_node::generic::{CdcScanOptions, SourceNodeKind};
81use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource};
82use crate::optimizer::property::{Order, RequiredDist};
83use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot};
84use crate::session::SessionImpl;
85use crate::session::current::notice_to_user;
86use crate::stream_fragmenter::{GraphJobType, build_graph};
87use crate::utils::OverwriteOptions;
88use crate::{Binder, Explain, TableCatalog, WithOptions};
89
90mod col_id_gen;
91pub use col_id_gen::*;
92use risingwave_connector::sink::iceberg::parse_partition_by_exprs;
93
94use crate::handler::drop_table::handle_drop_table;
95
96fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
97    for option_def in &c.options {
98        match option_def.option {
99            ColumnOption::GeneratedColumns(_) => {}
100            ColumnOption::DefaultValue(_) => {}
101            ColumnOption::DefaultValueInternal { .. } => {}
102            ColumnOption::Unique { is_primary: true } => {}
103            ColumnOption::Null => {}
104            ColumnOption::NotNull => {}
105            _ => bail_not_implemented!("column constraints \"{}\"", option_def),
106        }
107    }
108    Ok(())
109}
110
111/// Binds the column schemas declared in CREATE statement into `ColumnDesc`.
112/// If a column is marked as `primary key`, its `ColumnId` is also returned.
113/// This primary key is not combined with table constraints yet.
114pub fn bind_sql_columns(
115    column_defs: &[ColumnDef],
116    is_for_drop_table_connector: bool,
117) -> Result<Vec<ColumnCatalog>> {
118    let mut columns = Vec::with_capacity(column_defs.len());
119
120    for column in column_defs {
121        ensure_column_options_supported(column)?;
122        // Destruct to make sure all fields are properly handled rather than ignored.
123        // Do NOT use `..` to ignore fields you do not want to deal with.
124        // Reject them with a clear NotImplemented error.
125        let ColumnDef {
126            name,
127            data_type,
128            collation,
129            options,
130            ..
131        } = column;
132
133        let data_type = data_type
134            .clone()
135            .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
136        if let Some(collation) = collation {
137            // PostgreSQL will limit the datatypes that collate can work on.
138            // https://www.postgresql.org/docs/16/collation.html#COLLATION-CONCEPTS
139            //   > The built-in collatable data types are `text`, `varchar`, and `char`.
140            //
141            // But we don't support real collation, we simply ignore it here.
142            if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
143                bail_not_implemented!(
144                    "Collate collation other than `C` or `POSIX` is not implemented"
145                );
146            }
147
148            match data_type {
149                AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
150                _ => {
151                    return Err(ErrorCode::NotSupported(
152                        format!("{} is not a collatable data type", data_type),
153                        "The only built-in collatable data types are `varchar`, please check your type".into(),
154                    ).into());
155                }
156            }
157        }
158
159        if !is_for_drop_table_connector {
160            // additional column name may have prefix _rw
161            // When converting dropping the connector from table, the additional columns are converted to normal columns and keep the original name.
162            // Under this case, we loosen the check for _rw prefix.
163            check_column_name_not_reserved(&name.real_value())?;
164        }
165
166        let nullable: bool = !options
167            .iter()
168            .any(|def| matches!(def.option, ColumnOption::NotNull));
169
170        columns.push(ColumnCatalog {
171            column_desc: ColumnDesc {
172                data_type: bind_data_type(&data_type)?,
173                column_id: ColumnId::placeholder(),
174                name: name.real_value(),
175                generated_or_default_column: None,
176                description: None,
177                additional_column: AdditionalColumn { column_type: None },
178                version: ColumnDescVersion::LATEST,
179                system_column: None,
180                nullable,
181            },
182            is_hidden: false,
183        });
184    }
185
186    Ok(columns)
187}
188
189fn check_generated_column_constraints(
190    column_name: &String,
191    column_id: ColumnId,
192    expr: &ExprImpl,
193    column_catalogs: &[ColumnCatalog],
194    generated_column_names: &[String],
195    pk_column_ids: &[ColumnId],
196) -> Result<()> {
197    let input_refs = expr.collect_input_refs(column_catalogs.len());
198    for idx in input_refs.ones() {
199        let referred_generated_column = &column_catalogs[idx].column_desc.name;
200        if generated_column_names
201            .iter()
202            .any(|c| c == referred_generated_column)
203        {
204            return Err(ErrorCode::BindError(format!(
205                "Generated can not reference another generated column. \
206                But here generated column \"{}\" referenced another generated column \"{}\"",
207                column_name, referred_generated_column
208            ))
209            .into());
210        }
211    }
212
213    if pk_column_ids.contains(&column_id) && expr.is_impure() {
214        return Err(ErrorCode::BindError(format!(
215            "Generated columns with impure expressions should not be part of the primary key. \
216            Here column \"{}\" is defined as part of the primary key.",
217            column_name
218        ))
219        .into());
220    }
221
222    Ok(())
223}
224
225/// Binds constraints that can be only specified in column definitions,
226/// currently generated columns and default columns.
227pub fn bind_sql_column_constraints(
228    session: &SessionImpl,
229    table_name: String,
230    column_catalogs: &mut [ColumnCatalog],
231    columns: Vec<ColumnDef>,
232    pk_column_ids: &[ColumnId],
233) -> Result<()> {
234    let generated_column_names = {
235        let mut names = vec![];
236        for column in &columns {
237            for option_def in &column.options {
238                if let ColumnOption::GeneratedColumns(_) = option_def.option {
239                    names.push(column.name.real_value());
240                    break;
241                }
242            }
243        }
244        names
245    };
246
247    let mut binder = Binder::new_for_ddl(session);
248    binder.bind_columns_to_context(table_name.clone(), column_catalogs)?;
249
250    for column in columns {
251        let Some(idx) = column_catalogs
252            .iter()
253            .position(|c| c.name() == column.name.real_value())
254        else {
255            // It's possible that we don't follow the user defined columns in SQL but take the
256            // ones resolved from the source, thus missing some columns. Simply ignore them.
257            continue;
258        };
259
260        for option_def in column.options {
261            match option_def.option {
262                ColumnOption::GeneratedColumns(expr) => {
263                    binder.set_clause(Some(Clause::GeneratedColumn));
264
265                    let expr_impl = binder.bind_expr(expr).with_context(|| {
266                        format!(
267                            "fail to bind expression in generated column \"{}\"",
268                            column.name.real_value()
269                        )
270                    })?;
271
272                    check_generated_column_constraints(
273                        &column.name.real_value(),
274                        column_catalogs[idx].column_id(),
275                        &expr_impl,
276                        column_catalogs,
277                        &generated_column_names,
278                        pk_column_ids,
279                    )?;
280
281                    column_catalogs[idx].column_desc.generated_or_default_column = Some(
282                        GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
283                            expr: Some(expr_impl.to_expr_proto()),
284                        }),
285                    );
286                    binder.set_clause(None);
287                }
288                ColumnOption::DefaultValue(expr) => {
289                    let expr_impl = binder
290                        .bind_expr(expr)?
291                        .cast_assign(column_catalogs[idx].data_type().clone())?;
292
293                    // Rewrite expressions to evaluate a snapshot value, used for missing values in the case of
294                    // schema change.
295                    //
296                    // TODO: Currently we don't support impure expressions other than `now()` (like `random()`),
297                    // so the rewritten expression should almost always be pure and we directly call `fold_const`
298                    // here. Actually we do not require purity of the expression here since we're only to get a
299                    // snapshot value.
300                    let rewritten_expr_impl = session
301                        .pinned_snapshot()
302                        .inline_now_proc_time()
303                        .rewrite_expr(expr_impl.clone());
304
305                    if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
306                        let snapshot_value = snapshot_value?;
307
308                        column_catalogs[idx].column_desc.generated_or_default_column =
309                            Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
310                                snapshot_value: Some(snapshot_value.to_protobuf()),
311                                expr: Some(expr_impl.to_expr_proto()),
312                                // persist the original expression
313                            }));
314                    } else {
315                        return Err(ErrorCode::BindError(format!(
316                            "Default expression used in column `{}` cannot be evaluated. \
317                            Use generated columns instead if you mean to reference other columns.",
318                            column.name
319                        ))
320                        .into());
321                    }
322                }
323                ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
324                    // When a `DEFAULT INTERNAL` is used internally for schema change, the persisted value
325                    // should already be set during purifcation. So if we encounter an empty value here, it
326                    // means the user has specified it explicitly in the SQL statement, typically by
327                    // directly copying the result of `SHOW CREATE TABLE` and executing it.
328                    if persisted.is_empty() {
329                        bail_bind_error!(
330                            "DEFAULT INTERNAL is only used for internal purposes, \
331                             please specify a concrete default value"
332                        );
333                    }
334
335                    let desc = DefaultColumnDesc::decode(&*persisted)
336                        .expect("failed to decode persisted `DefaultColumnDesc`");
337
338                    column_catalogs[idx].column_desc.generated_or_default_column =
339                        Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
340                }
341                _ => {}
342            }
343        }
344    }
345    Ok(())
346}
347
348/// Currently we only support Primary key table constraint, so just return pk names if it exists
349pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
350    let mut pk_column_names = vec![];
351
352    for constraint in table_constraints {
353        match constraint {
354            TableConstraint::Unique {
355                name: _,
356                columns,
357                is_primary: true,
358            } => {
359                if !pk_column_names.is_empty() {
360                    return Err(multiple_pk_definition_err());
361                }
362                pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
363            }
364            _ => bail_not_implemented!("table constraint \"{}\"", constraint),
365        }
366    }
367    Ok(pk_column_names)
368}
369
370pub fn bind_sql_pk_names(
371    columns_defs: &[ColumnDef],
372    pk_names_from_table_constraints: Vec<String>,
373) -> Result<Vec<String>> {
374    let mut pk_column_names = pk_names_from_table_constraints;
375
376    for column in columns_defs {
377        for option_def in &column.options {
378            if let ColumnOption::Unique { is_primary: true } = option_def.option {
379                if !pk_column_names.is_empty() {
380                    return Err(multiple_pk_definition_err());
381                }
382                pk_column_names.push(column.name.real_value());
383            };
384        }
385    }
386
387    Ok(pk_column_names)
388}
389
390fn multiple_pk_definition_err() -> RwError {
391    ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
392}
393
394/// Binds primary keys defined in SQL.
395///
396/// It returns the columns together with `pk_column_ids`, and an optional row id column index if
397/// added.
398pub fn bind_pk_and_row_id_on_relation(
399    mut columns: Vec<ColumnCatalog>,
400    pk_names: Vec<String>,
401    must_need_pk: bool,
402) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
403    for c in &columns {
404        assert!(c.column_id() != ColumnId::placeholder());
405    }
406
407    // Mapping from column name to column id.
408    let name_to_id = columns
409        .iter()
410        .map(|c| (c.name(), c.column_id()))
411        .collect::<HashMap<_, _>>();
412
413    let mut pk_column_ids: Vec<_> = pk_names
414        .iter()
415        .map(|name| {
416            name_to_id.get(name.as_str()).copied().ok_or_else(|| {
417                ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
418            })
419        })
420        .try_collect()?;
421
422    // Add `_row_id` column if `pk_column_ids` is empty and must_need_pk
423    let need_row_id = pk_column_ids.is_empty() && must_need_pk;
424
425    let row_id_index = need_row_id.then(|| {
426        let column = ColumnCatalog::row_id_column();
427        let index = columns.len();
428        pk_column_ids = vec![column.column_id()];
429        columns.push(column);
430        index
431    });
432
433    if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
434        Err(ErrorCode::InvalidInputSyntax(format!(
435            "column \"{col}\" specified more than once"
436        )))?;
437    }
438
439    Ok((columns, pk_column_ids, row_id_index))
440}
441
442/// `gen_create_table_plan_with_source` generates the plan for creating a table with an external
443/// stream source.
444#[allow(clippy::too_many_arguments)]
445pub(crate) async fn gen_create_table_plan_with_source(
446    mut handler_args: HandlerArgs,
447    explain_options: ExplainOptions,
448    table_name: ObjectName,
449    column_defs: Vec<ColumnDef>,
450    wildcard_idx: Option<usize>,
451    constraints: Vec<TableConstraint>,
452    format_encode: FormatEncodeOptions,
453    source_watermarks: Vec<SourceWatermark>,
454    mut col_id_gen: ColumnIdGenerator,
455    include_column_options: IncludeOption,
456    props: CreateTableProps,
457    sql_column_strategy: SqlColumnStrategy,
458) -> Result<(PlanRef, Option<PbSource>, PbTable)> {
459    if props.append_only
460        && format_encode.format != Format::Plain
461        && format_encode.format != Format::Native
462    {
463        return Err(ErrorCode::BindError(format!(
464            "Append only table does not support format {}.",
465            format_encode.format
466        ))
467        .into());
468    }
469
470    let session = &handler_args.session;
471    let with_properties = bind_connector_props(&handler_args, &format_encode, false)?;
472    if with_properties.is_shareable_cdc_connector() {
473        generated_columns_check_for_cdc_table(&column_defs)?;
474        not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
475    } else if column_defs.iter().any(|col| {
476        col.options
477            .iter()
478            .any(|def| matches!(def.option, ColumnOption::NotNull))
479    }) {
480        // if non-cdc source
481        notice_to_user(
482            "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
483        );
484    }
485
486    let db_name: &str = &session.database();
487    let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
488
489    // TODO: omit this step if `sql_column_strategy` is `Follow`.
490    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
491        session,
492        &format_encode,
493        Either::Left(&with_properties),
494        CreateSourceType::Table,
495    )
496    .await?;
497
498    let overwrite_options = OverwriteOptions::new(&mut handler_args);
499    let rate_limit = overwrite_options.source_rate_limit;
500    let source_catalog = bind_create_source_or_table_with_connector(
501        handler_args.clone(),
502        table_name,
503        format_encode,
504        with_properties,
505        &column_defs,
506        constraints,
507        wildcard_idx,
508        source_watermarks,
509        columns_from_resolve_source,
510        source_info,
511        include_column_options,
512        &mut col_id_gen,
513        CreateSourceType::Table,
514        rate_limit,
515        sql_column_strategy,
516    )
517    .await?;
518
519    let pb_source = source_catalog.to_prost();
520
521    let context = OptimizerContext::new(handler_args, explain_options);
522
523    let (plan, table) = gen_table_plan_with_source(
524        context.into(),
525        schema_name,
526        source_catalog,
527        col_id_gen.into_version(),
528        props,
529    )?;
530
531    Ok((plan, Some(pb_source), table))
532}
533
534/// `gen_create_table_plan` generates the plan for creating a table without an external stream
535/// source.
536#[allow(clippy::too_many_arguments)]
537pub(crate) fn gen_create_table_plan(
538    context: OptimizerContext,
539    table_name: ObjectName,
540    column_defs: Vec<ColumnDef>,
541    constraints: Vec<TableConstraint>,
542    mut col_id_gen: ColumnIdGenerator,
543    source_watermarks: Vec<SourceWatermark>,
544    props: CreateTableProps,
545    is_for_replace_plan: bool,
546) -> Result<(PlanRef, PbTable)> {
547    let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
548    for c in &mut columns {
549        col_id_gen.generate(c)?;
550    }
551
552    let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
553    if !secret_refs.is_empty() || !connection_refs.is_empty() {
554        return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
555    }
556
557    gen_create_table_plan_without_source(
558        context,
559        table_name,
560        columns,
561        column_defs,
562        constraints,
563        source_watermarks,
564        col_id_gen.into_version(),
565        props,
566    )
567}
568
569#[allow(clippy::too_many_arguments)]
570pub(crate) fn gen_create_table_plan_without_source(
571    context: OptimizerContext,
572    table_name: ObjectName,
573    columns: Vec<ColumnCatalog>,
574    column_defs: Vec<ColumnDef>,
575    constraints: Vec<TableConstraint>,
576    source_watermarks: Vec<SourceWatermark>,
577    version: TableVersion,
578    props: CreateTableProps,
579) -> Result<(PlanRef, PbTable)> {
580    // XXX: Why not bind outside?
581    let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
582    let (mut columns, pk_column_ids, row_id_index) =
583        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
584
585    let watermark_descs: Vec<WatermarkDesc> = bind_source_watermark(
586        context.session_ctx(),
587        table_name.real_value(),
588        source_watermarks,
589        &columns,
590    )?;
591
592    bind_sql_column_constraints(
593        context.session_ctx(),
594        table_name.real_value(),
595        &mut columns,
596        column_defs,
597        &pk_column_ids,
598    )?;
599    let session = context.session_ctx().clone();
600
601    let db_name = &session.database();
602    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?;
603
604    let info = CreateTableInfo {
605        columns,
606        pk_column_ids,
607        row_id_index,
608        watermark_descs,
609        source_catalog: None,
610        version,
611    };
612
613    gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
614}
615
616fn gen_table_plan_with_source(
617    context: OptimizerContextRef,
618    schema_name: Option<String>,
619    source_catalog: SourceCatalog,
620    version: TableVersion,
621    props: CreateTableProps,
622) -> Result<(PlanRef, PbTable)> {
623    let table_name = source_catalog.name.clone();
624
625    let info = CreateTableInfo {
626        columns: source_catalog.columns.clone(),
627        pk_column_ids: source_catalog.pk_col_ids.clone(),
628        row_id_index: source_catalog.row_id_index,
629        watermark_descs: source_catalog.watermark_descs.clone(),
630        source_catalog: Some(source_catalog),
631        version,
632    };
633
634    gen_table_plan_inner(context, schema_name, table_name, info, props)
635}
636
637/// On-conflict behavior either from user input or existing table catalog.
638#[derive(Clone, Copy)]
639pub enum EitherOnConflict {
640    Ast(Option<OnConflict>),
641    Resolved(ConflictBehavior),
642}
643
644impl From<Option<OnConflict>> for EitherOnConflict {
645    fn from(v: Option<OnConflict>) -> Self {
646        Self::Ast(v)
647    }
648}
649
650impl From<ConflictBehavior> for EitherOnConflict {
651    fn from(v: ConflictBehavior) -> Self {
652        Self::Resolved(v)
653    }
654}
655
656impl EitherOnConflict {
657    /// Resolves the conflict behavior based on the given information.
658    pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
659        let conflict_behavior = match self {
660            EitherOnConflict::Ast(on_conflict) => {
661                if append_only {
662                    if row_id_as_pk {
663                        // Primary key will be generated, no conflict check needed.
664                        ConflictBehavior::NoCheck
665                    } else {
666                        // User defined PK on append-only table, enforce `DO NOTHING`.
667                        if let Some(on_conflict) = on_conflict
668                            && on_conflict != OnConflict::Nothing
669                        {
670                            return Err(ErrorCode::InvalidInputSyntax(
671                                "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
672                                     the ON CONFLICT behavior must be DO NOTHING."
673                                    .to_owned(),
674                            )
675                            .into());
676                        }
677                        ConflictBehavior::IgnoreConflict
678                    }
679                } else {
680                    // Default to `UPDATE FULL` for non-append-only tables.
681                    match on_conflict.unwrap_or(OnConflict::UpdateFull) {
682                        OnConflict::UpdateFull => ConflictBehavior::Overwrite,
683                        OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
684                        OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
685                    }
686                }
687            }
688            EitherOnConflict::Resolved(b) => b,
689        };
690
691        Ok(conflict_behavior)
692    }
693}
694
695/// Arguments of the functions that generate a table plan, part 1.
696///
697/// Compared to [`CreateTableProps`], this struct contains fields that need some work of binding
698/// or resolving based on the user input.
699pub struct CreateTableInfo {
700    pub columns: Vec<ColumnCatalog>,
701    pub pk_column_ids: Vec<ColumnId>,
702    pub row_id_index: Option<usize>,
703    pub watermark_descs: Vec<WatermarkDesc>,
704    pub source_catalog: Option<SourceCatalog>,
705    pub version: TableVersion,
706}
707
708/// Arguments of the functions that generate a table plan, part 2.
709///
710/// Compared to [`CreateTableInfo`], this struct contains fields that can be (relatively) simply
711/// obtained from the input or the context.
712pub struct CreateTableProps {
713    pub definition: String,
714    pub append_only: bool,
715    pub on_conflict: EitherOnConflict,
716    pub with_version_column: Option<String>,
717    pub webhook_info: Option<PbWebhookSourceInfo>,
718    pub engine: Engine,
719}
720
721#[allow(clippy::too_many_arguments)]
722fn gen_table_plan_inner(
723    context: OptimizerContextRef,
724    schema_name: Option<String>,
725    table_name: String,
726    info: CreateTableInfo,
727    props: CreateTableProps,
728) -> Result<(PlanRef, PbTable)> {
729    let CreateTableInfo {
730        ref columns,
731        row_id_index,
732        ref watermark_descs,
733        ref source_catalog,
734        ..
735    } = info;
736    let CreateTableProps { append_only, .. } = props;
737
738    let (database_id, schema_id) = context
739        .session_ctx()
740        .get_database_and_schema_id_for_create(schema_name)?;
741
742    let session = context.session_ctx().clone();
743    let retention_seconds = context.with_options().retention_seconds();
744
745    let source_node: PlanRef = LogicalSource::new(
746        source_catalog.clone().map(Rc::new),
747        columns.clone(),
748        row_id_index,
749        SourceNodeKind::CreateTable,
750        context.clone(),
751        None,
752    )?
753    .into();
754
755    let required_cols = FixedBitSet::with_capacity(columns.len());
756    let plan_root = PlanRoot::new_with_logical_plan(
757        source_node,
758        RequiredDist::Any,
759        Order::any(),
760        required_cols,
761        vec![],
762    );
763
764    if !append_only && !watermark_descs.is_empty() {
765        return Err(ErrorCode::NotSupported(
766            "Defining watermarks on table requires the table to be append only.".to_owned(),
767            "Use the key words `APPEND ONLY`".to_owned(),
768        )
769        .into());
770    }
771
772    if !append_only && retention_seconds.is_some() {
773        return Err(ErrorCode::NotSupported(
774            "Defining retention seconds on table requires the table to be append only.".to_owned(),
775            "Use the key words `APPEND ONLY`".to_owned(),
776        )
777        .into());
778    }
779
780    let materialize =
781        plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
782
783    let mut table = materialize.table().to_prost();
784
785    table.owner = session.user_id();
786    Ok((materialize.into(), table))
787}
788
789/// Generate stream plan for cdc table based on shared source.
790/// In replace workflow, the `table_id` is the id of the table to be replaced
791/// in create table workflow, the `table_id` is a placeholder will be filled in the Meta
792#[allow(clippy::too_many_arguments)]
793pub(crate) fn gen_create_table_plan_for_cdc_table(
794    context: OptimizerContextRef,
795    source: Arc<SourceCatalog>,
796    external_table_name: String,
797    column_defs: Vec<ColumnDef>,
798    mut columns: Vec<ColumnCatalog>,
799    pk_names: Vec<String>,
800    cdc_with_options: WithOptionsSecResolved,
801    mut col_id_gen: ColumnIdGenerator,
802    on_conflict: Option<OnConflict>,
803    with_version_column: Option<String>,
804    include_column_options: IncludeOption,
805    table_name: ObjectName,
806    resolved_table_name: String, // table name without schema prefix
807    database_id: DatabaseId,
808    schema_id: SchemaId,
809    table_id: TableId,
810    engine: Engine,
811) -> Result<(PlanRef, PbTable)> {
812    let session = context.session_ctx().clone();
813
814    // append additional columns to the end
815    handle_addition_columns(
816        None,
817        &cdc_with_options,
818        include_column_options,
819        &mut columns,
820        true,
821    )?;
822
823    for c in &mut columns {
824        col_id_gen.generate(c)?;
825    }
826
827    let (mut columns, pk_column_ids, _row_id_index) =
828        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
829
830    // NOTES: In auto schema change, default value is not provided in column definition.
831    bind_sql_column_constraints(
832        context.session_ctx(),
833        table_name.real_value(),
834        &mut columns,
835        column_defs,
836        &pk_column_ids,
837    )?;
838
839    let definition = context.normalized_sql().to_owned();
840
841    let pk_column_indices = {
842        let mut id_to_idx = HashMap::new();
843        columns.iter().enumerate().for_each(|(idx, c)| {
844            id_to_idx.insert(c.column_id(), idx);
845        });
846        // pk column id must exist in table columns.
847        pk_column_ids
848            .iter()
849            .map(|c| id_to_idx.get(c).copied().unwrap())
850            .collect_vec()
851    };
852    let table_pk = pk_column_indices
853        .iter()
854        .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
855        .collect();
856
857    let (options, secret_refs) = cdc_with_options.into_parts();
858
859    let non_generated_column_descs = columns
860        .iter()
861        .filter(|&c| (!c.is_generated()))
862        .map(|c| c.column_desc.clone())
863        .collect_vec();
864    let non_generated_column_num = non_generated_column_descs.len();
865
866    let cdc_table_desc = CdcTableDesc {
867        table_id,
868        source_id: source.id.into(), // id of cdc source streaming job
869        external_table_name: external_table_name.clone(),
870        pk: table_pk,
871        columns: non_generated_column_descs,
872        stream_key: pk_column_indices,
873        connect_properties: options,
874        secret_refs,
875    };
876
877    tracing::debug!(?cdc_table_desc, "create cdc table");
878
879    let options = CdcScanOptions::from_with_options(context.with_options())?;
880
881    let logical_scan = LogicalCdcScan::create(
882        external_table_name.clone(),
883        Rc::new(cdc_table_desc),
884        context.clone(),
885        options,
886    );
887
888    let scan_node: PlanRef = logical_scan.into();
889    let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
890    let plan_root = PlanRoot::new_with_logical_plan(
891        scan_node,
892        RequiredDist::Any,
893        Order::any(),
894        required_cols,
895        vec![],
896    );
897
898    let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
899    let materialize = plan_root.gen_table_plan(
900        context,
901        resolved_table_name,
902        database_id,
903        schema_id,
904        CreateTableInfo {
905            columns,
906            pk_column_ids,
907            row_id_index: None,
908            watermark_descs: vec![],
909            source_catalog: Some((*source).clone()),
910            version: col_id_gen.into_version(),
911        },
912        CreateTableProps {
913            definition,
914            append_only: false,
915            on_conflict: on_conflict.into(),
916            with_version_column,
917            webhook_info: None,
918            engine,
919        },
920    )?;
921
922    let mut table = materialize.table().to_prost();
923    table.owner = session.user_id();
924    table.cdc_table_id = Some(cdc_table_id);
925    table.dependent_relations = vec![source.id];
926
927    Ok((materialize.into(), table))
928}
929
930fn derive_with_options_for_cdc_table(
931    source_with_properties: &WithOptionsSecResolved,
932    external_table_name: String,
933) -> Result<WithOptionsSecResolved> {
934    use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
935    // we should remove the prefix from `full_table_name`
936    let source_database_name: &str = source_with_properties
937        .get("database.name")
938        .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
939        .as_str();
940    let mut with_options = source_with_properties.clone();
941    if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
942        match connector.as_str() {
943            MYSQL_CDC_CONNECTOR => {
944                // MySQL doesn't allow '.' in database name and table name, so we can split the
945                // external table name by '.' to get the table name
946                let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
947                    anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
948                })?;
949                // We allow multiple database names in the source definition
950                if !source_database_name
951                    .split(',')
952                    .map(|s| s.trim())
953                    .any(|name| name == db_name)
954                {
955                    return Err(anyhow!(
956                        "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
957                        db_name,
958                        source_database_name
959                    ).into());
960                }
961                with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
962                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
963            }
964            POSTGRES_CDC_CONNECTOR => {
965                let (schema_name, table_name) = external_table_name
966                    .split_once('.')
967                    .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
968
969                // insert 'schema.name' into connect properties
970                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
971                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
972            }
973            SQL_SERVER_CDC_CONNECTOR => {
974                // SQL Server external table name is in 'databaseName.schemaName.tableName' pattern,
975                // we remove the database name prefix and split the schema name and table name
976                let (db_name, schema_table_name) =
977                    external_table_name.split_once('.').ok_or_else(|| {
978                        anyhow!("The upstream table name must be in 'database.schema.table' format")
979                    })?;
980
981                // Currently SQL Server only supports single database name in the source definition
982                if db_name != source_database_name {
983                    return Err(anyhow!(
984                            "The database name `{}` in the FROM clause is not the same as the database name `{}` in source definition",
985                            db_name,
986                            source_database_name
987                        ).into());
988                }
989
990                let (schema_name, table_name) =
991                    schema_table_name.split_once('.').ok_or_else(|| {
992                        anyhow!("The table name must contain schema name prefix, e.g. 'dbo.table'")
993                    })?;
994
995                // insert 'schema.name' into connect properties
996                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
997                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
998            }
999            _ => {
1000                return Err(RwError::from(anyhow!(
1001                    "connector {} is not supported for cdc table",
1002                    connector
1003                )));
1004            }
1005        };
1006    }
1007    Ok(with_options)
1008}
1009
1010#[allow(clippy::too_many_arguments)]
1011pub(super) async fn handle_create_table_plan(
1012    handler_args: HandlerArgs,
1013    explain_options: ExplainOptions,
1014    format_encode: Option<FormatEncodeOptions>,
1015    cdc_table_info: Option<CdcTableInfo>,
1016    table_name: ObjectName,
1017    column_defs: Vec<ColumnDef>,
1018    wildcard_idx: Option<usize>,
1019    constraints: Vec<TableConstraint>,
1020    source_watermarks: Vec<SourceWatermark>,
1021    append_only: bool,
1022    on_conflict: Option<OnConflict>,
1023    with_version_column: Option<String>,
1024    include_column_options: IncludeOption,
1025    webhook_info: Option<WebhookSourceInfo>,
1026    engine: Engine,
1027) -> Result<(PlanRef, Option<PbSource>, PbTable, TableJobType)> {
1028    let col_id_gen = ColumnIdGenerator::new_initial();
1029    let format_encode = check_create_table_with_source(
1030        &handler_args.with_options,
1031        format_encode,
1032        &include_column_options,
1033        &cdc_table_info,
1034    )?;
1035    let webhook_info = webhook_info
1036        .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1037        .transpose()?;
1038
1039    let props = CreateTableProps {
1040        definition: handler_args.normalized_sql.clone(),
1041        append_only,
1042        on_conflict: on_conflict.into(),
1043        with_version_column: with_version_column.clone(),
1044        webhook_info,
1045        engine,
1046    };
1047
1048    let ((plan, source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
1049        (Some(format_encode), None) => (
1050            gen_create_table_plan_with_source(
1051                handler_args,
1052                explain_options,
1053                table_name.clone(),
1054                column_defs,
1055                wildcard_idx,
1056                constraints,
1057                format_encode,
1058                source_watermarks,
1059                col_id_gen,
1060                include_column_options,
1061                props,
1062                SqlColumnStrategy::FollowChecked,
1063            )
1064            .await?,
1065            TableJobType::General,
1066        ),
1067        (None, None) => {
1068            let context = OptimizerContext::new(handler_args, explain_options);
1069            let (plan, table) = gen_create_table_plan(
1070                context,
1071                table_name.clone(),
1072                column_defs,
1073                constraints,
1074                col_id_gen,
1075                source_watermarks,
1076                props,
1077                false,
1078            )?;
1079
1080            ((plan, None, table), TableJobType::General)
1081        }
1082
1083        (None, Some(cdc_table)) => {
1084            sanity_check_for_table_on_cdc_source(
1085                append_only,
1086                &column_defs,
1087                &wildcard_idx,
1088                &constraints,
1089                &source_watermarks,
1090            )?;
1091
1092            generated_columns_check_for_cdc_table(&column_defs)?;
1093            not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1094
1095            let session = &handler_args.session;
1096            let db_name = &session.database();
1097            let user_name = &session.user_name();
1098            let search_path = session.config().search_path();
1099            let (schema_name, resolved_table_name) =
1100                Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
1101            let (database_id, schema_id) =
1102                session.get_database_and_schema_id_for_create(schema_name.clone())?;
1103
1104            // cdc table cannot be append-only
1105            let (format_encode, source_name) =
1106                Binder::resolve_schema_qualified_name(db_name, cdc_table.source_name.clone())?;
1107
1108            let source = {
1109                let catalog_reader = session.env().catalog_reader().read_guard();
1110                let schema_path =
1111                    SchemaPath::new(format_encode.as_deref(), &search_path, user_name);
1112
1113                let (source, _) = catalog_reader.get_source_by_name(
1114                    db_name,
1115                    schema_path,
1116                    source_name.as_str(),
1117                )?;
1118                source.clone()
1119            };
1120            let cdc_with_options: WithOptionsSecResolved = derive_with_options_for_cdc_table(
1121                &source.with_properties,
1122                cdc_table.external_table_name.clone(),
1123            )?;
1124
1125            let (columns, pk_names) = match wildcard_idx {
1126                Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1127                None => {
1128                    for column_def in &column_defs {
1129                        for option_def in &column_def.options {
1130                            if let ColumnOption::DefaultValue(_)
1131                            | ColumnOption::DefaultValueInternal { .. } = option_def.option
1132                            {
1133                                return Err(ErrorCode::NotSupported(
1134                                            "Default value for columns defined on the table created from a CDC source".into(),
1135                                            "Remove the default value expression in the column definitions".into(),
1136                                        )
1137                                            .into());
1138                            }
1139                        }
1140                    }
1141
1142                    let (columns, pk_names) =
1143                        bind_cdc_table_schema(&column_defs, &constraints, false)?;
1144                    // read default value definition from external db
1145                    let (options, secret_refs) = cdc_with_options.clone().into_parts();
1146                    let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1147                        .context("failed to extract external table config")?;
1148
1149                    // NOTE: if the external table has a default column, we will only treat it as a normal column.
1150                    (columns, pk_names)
1151                }
1152            };
1153
1154            let context: OptimizerContextRef =
1155                OptimizerContext::new(handler_args, explain_options).into();
1156            let (plan, table) = gen_create_table_plan_for_cdc_table(
1157                context,
1158                source,
1159                cdc_table.external_table_name.clone(),
1160                column_defs,
1161                columns,
1162                pk_names,
1163                cdc_with_options,
1164                col_id_gen,
1165                on_conflict,
1166                with_version_column,
1167                include_column_options,
1168                table_name,
1169                resolved_table_name,
1170                database_id,
1171                schema_id,
1172                TableId::placeholder(),
1173                engine,
1174            )?;
1175
1176            ((plan, None, table), TableJobType::SharedCdcSource)
1177        }
1178        (Some(_), Some(_)) => {
1179            return Err(ErrorCode::NotSupported(
1180                "Data format and encoding format doesn't apply to table created from a CDC source"
1181                    .into(),
1182                "Remove the FORMAT and ENCODE specification".into(),
1183            )
1184            .into());
1185        }
1186    };
1187    Ok((plan, source, table, job_type))
1188}
1189
1190// For both table from cdc source and table with cdc connector
1191fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1192    let mut found_generated_column = false;
1193    for column in columns {
1194        let mut is_generated = false;
1195
1196        for option_def in &column.options {
1197            if let ColumnOption::GeneratedColumns(_) = option_def.option {
1198                is_generated = true;
1199                break;
1200            }
1201        }
1202
1203        if is_generated {
1204            found_generated_column = true;
1205        } else if found_generated_column {
1206            return Err(ErrorCode::NotSupported(
1207                "Non-generated column found after a generated column.".into(),
1208                "Ensure that all generated columns appear at the end of the cdc table definition."
1209                    .into(),
1210            )
1211            .into());
1212        }
1213    }
1214    Ok(())
1215}
1216
1217// For both table from cdc source and table with cdc connector
1218fn not_null_check_for_cdc_table(
1219    wildcard_idx: &Option<usize>,
1220    column_defs: &Vec<ColumnDef>,
1221) -> Result<()> {
1222    if !wildcard_idx.is_some()
1223        && column_defs.iter().any(|col| {
1224            col.options
1225                .iter()
1226                .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1227        })
1228    {
1229        return Err(ErrorCode::NotSupported(
1230            "CDC table with NOT NULL constraint is not supported".to_owned(),
1231            "Please remove the NOT NULL constraint for columns".to_owned(),
1232        )
1233        .into());
1234    }
1235    Ok(())
1236}
1237
1238// Only for table from cdc source
1239fn sanity_check_for_table_on_cdc_source(
1240    append_only: bool,
1241    column_defs: &Vec<ColumnDef>,
1242    wildcard_idx: &Option<usize>,
1243    constraints: &Vec<TableConstraint>,
1244    source_watermarks: &Vec<SourceWatermark>,
1245) -> Result<()> {
1246    // wildcard cannot be used with column definitions
1247    if wildcard_idx.is_some() && !column_defs.is_empty() {
1248        return Err(ErrorCode::NotSupported(
1249            "wildcard(*) and column definitions cannot be used together".to_owned(),
1250            "Remove the wildcard or column definitions".to_owned(),
1251        )
1252        .into());
1253    }
1254
1255    // cdc table must have primary key constraint or primary key column
1256    if !wildcard_idx.is_some()
1257        && !constraints.iter().any(|c| {
1258            matches!(
1259                c,
1260                TableConstraint::Unique {
1261                    is_primary: true,
1262                    ..
1263                }
1264            )
1265        })
1266        && !column_defs.iter().any(|col| {
1267            col.options
1268                .iter()
1269                .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1270        })
1271    {
1272        return Err(ErrorCode::NotSupported(
1273            "CDC table without primary key constraint is not supported".to_owned(),
1274            "Please define a primary key".to_owned(),
1275        )
1276        .into());
1277    }
1278
1279    if append_only {
1280        return Err(ErrorCode::NotSupported(
1281            "append only modifier on the table created from a CDC source".into(),
1282            "Remove the APPEND ONLY clause".into(),
1283        )
1284        .into());
1285    }
1286
1287    if !source_watermarks.is_empty() {
1288        return Err(ErrorCode::NotSupported(
1289            "watermark defined on the table created from a CDC source".into(),
1290            "Remove the Watermark definitions".into(),
1291        )
1292        .into());
1293    }
1294
1295    Ok(())
1296}
1297
1298/// Derive schema for cdc table when create a new Table or alter an existing Table
1299async fn bind_cdc_table_schema_externally(
1300    cdc_with_options: WithOptionsSecResolved,
1301) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1302    // read cdc table schema from external db or parsing the schema from SQL definitions
1303    Feature::CdcTableSchemaMap.check_available().map_err(
1304        |err: risingwave_common::license::FeatureNotAvailable| {
1305            ErrorCode::NotSupported(
1306                err.to_report_string(),
1307                "Please define the schema manually".to_owned(),
1308            )
1309        },
1310    )?;
1311    let (options, secret_refs) = cdc_with_options.into_parts();
1312    let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1313        .context("failed to extract external table config")?;
1314
1315    let table = ExternalTableImpl::connect(config)
1316        .await
1317        .context("failed to auto derive table schema")?;
1318    Ok((
1319        table
1320            .column_descs()
1321            .iter()
1322            .cloned()
1323            .map(|column_desc| ColumnCatalog {
1324                column_desc,
1325                is_hidden: false,
1326            })
1327            .collect(),
1328        table.pk_names().clone(),
1329    ))
1330}
1331
1332/// Derive schema for cdc table when create a new Table or alter an existing Table
1333fn bind_cdc_table_schema(
1334    column_defs: &Vec<ColumnDef>,
1335    constraints: &Vec<TableConstraint>,
1336    is_for_replace_plan: bool,
1337) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1338    let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1339
1340    let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1341    Ok((columns, pk_names))
1342}
1343
1344#[allow(clippy::too_many_arguments)]
1345pub async fn handle_create_table(
1346    mut handler_args: HandlerArgs,
1347    table_name: ObjectName,
1348    column_defs: Vec<ColumnDef>,
1349    wildcard_idx: Option<usize>,
1350    constraints: Vec<TableConstraint>,
1351    if_not_exists: bool,
1352    format_encode: Option<FormatEncodeOptions>,
1353    source_watermarks: Vec<SourceWatermark>,
1354    append_only: bool,
1355    on_conflict: Option<OnConflict>,
1356    with_version_column: Option<String>,
1357    cdc_table_info: Option<CdcTableInfo>,
1358    include_column_options: IncludeOption,
1359    webhook_info: Option<WebhookSourceInfo>,
1360    ast_engine: risingwave_sqlparser::ast::Engine,
1361) -> Result<RwPgResponse> {
1362    let session = handler_args.session.clone();
1363
1364    if append_only {
1365        session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1366    }
1367
1368    session.check_cluster_limits().await?;
1369
1370    let engine = match ast_engine {
1371        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1372        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1373    };
1374    if engine == Engine::Iceberg && handler_args.with_options.get_connector().is_some() {
1375        // HACK: since we don't have atomic DDL, table with connector may lose data.
1376        // FIXME: remove this after https://github.com/risingwavelabs/risingwave/issues/21863
1377        if let Some(_rate_limit) = handler_args.with_options.insert(
1378            OverwriteOptions::SOURCE_RATE_LIMIT_KEY.to_owned(),
1379            "0".to_owned(),
1380        ) {
1381            // prevent user specified rate limit
1382            return Err(ErrorCode::NotSupported(
1383                "source_rate_limit for iceberg table engine during table creation".to_owned(),
1384                "Please remove source_rate_limit from WITH options.".to_owned(),
1385            )
1386            .into());
1387        }
1388    }
1389
1390    if let Either::Right(resp) = session.check_relation_name_duplicated(
1391        table_name.clone(),
1392        StatementType::CREATE_TABLE,
1393        if_not_exists,
1394    )? {
1395        return Ok(resp);
1396    }
1397
1398    let (graph, source, hummock_table, job_type) = {
1399        let (plan, source, table, job_type) = handle_create_table_plan(
1400            handler_args.clone(),
1401            ExplainOptions::default(),
1402            format_encode,
1403            cdc_table_info,
1404            table_name.clone(),
1405            column_defs.clone(),
1406            wildcard_idx,
1407            constraints.clone(),
1408            source_watermarks,
1409            append_only,
1410            on_conflict,
1411            with_version_column,
1412            include_column_options,
1413            webhook_info,
1414            engine,
1415        )
1416        .await?;
1417        tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1418
1419        let graph = build_graph(plan, Some(GraphJobType::Table))?;
1420
1421        (graph, source, table, job_type)
1422    };
1423
1424    tracing::trace!(
1425        "name={}, graph=\n{}",
1426        table_name,
1427        serde_json::to_string_pretty(&graph).unwrap()
1428    );
1429
1430    // Handle engine
1431    match engine {
1432        Engine::Hummock => {
1433            let catalog_writer = session.catalog_writer()?;
1434            catalog_writer
1435                .create_table(source, hummock_table, graph, job_type)
1436                .await?;
1437        }
1438        Engine::Iceberg => {
1439            create_iceberg_engine_table(
1440                session,
1441                handler_args,
1442                source,
1443                hummock_table,
1444                graph,
1445                table_name,
1446                job_type,
1447            )
1448            .await?;
1449        }
1450    }
1451
1452    Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1453}
1454
1455/// Iceberg table engine is composed of hummock table, iceberg sink and iceberg source.
1456///
1457/// 1. fetch iceberg engine options from the meta node. Or use iceberg engine connection provided by users.
1458/// 2. create a hummock table
1459/// 3. create an iceberg sink
1460/// 4. create an iceberg source
1461///
1462/// See <https://github.com/risingwavelabs/risingwave/issues/21586> for an architecture diagram.
1463#[allow(clippy::too_many_arguments)]
1464pub async fn create_iceberg_engine_table(
1465    session: Arc<SessionImpl>,
1466    handler_args: HandlerArgs,
1467    mut source: Option<PbSource>,
1468    table: PbTable,
1469    graph: StreamFragmentGraph,
1470    table_name: ObjectName,
1471    job_type: PbTableJobType,
1472) -> Result<()> {
1473    let meta_client = session.env().meta_client();
1474    let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1475
1476    let meta_store_endpoint = url::Url::parse(&meta_store_endpoint).map_err(|_| {
1477        ErrorCode::InternalError("failed to parse the meta store endpoint".to_owned())
1478    })?;
1479    let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1480    let meta_store_user = meta_store_endpoint.username().to_owned();
1481    let meta_store_password = meta_store_endpoint
1482        .password()
1483        .ok_or_else(|| {
1484            ErrorCode::InternalError("failed to parse password from meta store endpoint".to_owned())
1485        })?
1486        .to_owned();
1487    let meta_store_host = meta_store_endpoint
1488        .host_str()
1489        .ok_or_else(|| {
1490            ErrorCode::InternalError("failed to parse host from meta store endpoint".to_owned())
1491        })?
1492        .to_owned();
1493    let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1494        ErrorCode::InternalError("failed to parse port from meta store endpoint".to_owned())
1495    })?;
1496    let meta_store_database = meta_store_endpoint
1497        .path()
1498        .trim_start_matches('/')
1499        .to_owned();
1500
1501    let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1502        bail!("failed to parse meta backend: {}", meta_store_backend);
1503    };
1504
1505    let catalog_uri = match meta_backend {
1506        MetaBackend::Postgres => {
1507            format!(
1508                "jdbc:postgresql://{}:{}/{}",
1509                meta_store_host.clone(),
1510                meta_store_port.clone(),
1511                meta_store_database.clone()
1512            )
1513        }
1514        MetaBackend::Mysql => {
1515            format!(
1516                "jdbc:mysql://{}:{}/{}",
1517                meta_store_host.clone(),
1518                meta_store_port.clone(),
1519                meta_store_database.clone()
1520            )
1521        }
1522        MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1523            bail!(
1524                "Unsupported meta backend for iceberg engine table: {}",
1525                meta_store_backend
1526            );
1527        }
1528    };
1529
1530    let rw_db_name = session
1531        .env()
1532        .catalog_reader()
1533        .read_guard()
1534        .get_database_by_id(&table.database_id)?
1535        .name()
1536        .to_owned();
1537    let rw_schema_name = session
1538        .env()
1539        .catalog_reader()
1540        .read_guard()
1541        .get_schema_by_id(&table.database_id, &table.schema_id)?
1542        .name()
1543        .clone();
1544    let iceberg_catalog_name = rw_db_name.clone();
1545    let iceberg_database_name = rw_schema_name.clone();
1546    let iceberg_table_name = table_name.0.last().unwrap().real_value();
1547
1548    let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1549    let sink_decouple = session.config().sink_decouple();
1550    if matches!(sink_decouple, SinkDecouple::Disable) {
1551        bail!(
1552            "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1553        );
1554    }
1555
1556    let mut connection_ref = BTreeMap::new();
1557    let with_common = if iceberg_engine_connection.is_empty() {
1558        bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1559    } else {
1560        let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1561        assert_eq!(parts.len(), 2);
1562        let connection_catalog =
1563            session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1564        if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1565            if params.connection_type == ConnectionType::Iceberg as i32 {
1566                // With iceberg engine connection:
1567                connection_ref.insert(
1568                    "connection".to_owned(),
1569                    ConnectionRefValue {
1570                        connection_name: ObjectName::from(vec![
1571                            Ident::from(parts[0]),
1572                            Ident::from(parts[1]),
1573                        ]),
1574                    },
1575                );
1576
1577                let mut with_common = BTreeMap::new();
1578                with_common.insert("connector".to_owned(), "iceberg".to_owned());
1579                with_common.insert("database.name".to_owned(), iceberg_database_name.to_owned());
1580                with_common.insert("table.name".to_owned(), iceberg_table_name.to_owned());
1581
1582                if let Some(s) = params.properties.get("hosted_catalog") {
1583                    if s.eq_ignore_ascii_case("true") {
1584                        with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1585                        with_common.insert("catalog.uri".to_owned(), catalog_uri.to_owned());
1586                        with_common
1587                            .insert("catalog.jdbc.user".to_owned(), meta_store_user.to_owned());
1588                        with_common.insert(
1589                            "catalog.jdbc.password".to_owned(),
1590                            meta_store_password.clone(),
1591                        );
1592                        with_common
1593                            .insert("catalog.name".to_owned(), iceberg_catalog_name.to_owned());
1594                    }
1595                }
1596
1597                with_common
1598            } else {
1599                return Err(RwError::from(ErrorCode::InvalidParameterValue(
1600                    "Only iceberg connection could be used in iceberg engine".to_owned(),
1601                )));
1602            }
1603        } else {
1604            return Err(RwError::from(ErrorCode::InvalidParameterValue(
1605                "Private Link Service has been deprecated. Please create a new connection instead."
1606                    .to_owned(),
1607            )));
1608        }
1609    };
1610
1611    let table_catalog = TableCatalog::from(table.clone());
1612
1613    // Iceberg sinks require a primary key, if none is provided, we will use the _row_id column
1614    // Fetch primary key from columns
1615    let mut pks = table_catalog
1616        .pk_column_names()
1617        .iter()
1618        .map(|c| c.to_string())
1619        .collect::<Vec<String>>();
1620
1621    // For the table without primary key. We will use `_row_id` as primary key
1622    let sink_from = if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1623        pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1624        let [stmt]: [_; 1] = Parser::parse_sql(&format!(
1625            "select {} as {}, * from {}",
1626            ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, table_name
1627        ))
1628        .context("unable to parse query")?
1629        .try_into()
1630        .unwrap();
1631
1632        let Statement::Query(query) = &stmt else {
1633            panic!("unexpected statement: {:?}", stmt);
1634        };
1635        CreateSink::AsQuery(query.clone())
1636    } else {
1637        CreateSink::From(table_name.clone())
1638    };
1639
1640    let mut sink_name = table_name.clone();
1641    *sink_name.0.last_mut().unwrap() = Ident::from(
1642        (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1643    );
1644    let create_sink_stmt = CreateSinkStatement {
1645        if_not_exists: false,
1646        sink_name,
1647        with_properties: WithProperties(vec![]),
1648        sink_from,
1649        columns: vec![],
1650        emit_mode: None,
1651        sink_schema: None,
1652        into_table_name: None,
1653    };
1654
1655    let mut sink_handler_args = handler_args.clone();
1656
1657    let mut sink_with = with_common.clone();
1658
1659    sink_with.insert("primary_key".to_owned(), pks.join(","));
1660    sink_with.insert("type".to_owned(), "upsert".to_owned());
1661    // sink_with.insert(SINK_SNAPSHOT_OPTION.to_owned(), "false".to_owned());
1662    //
1663    // Note: in theory, we don't need to backfill from the table to the sink,
1664    // but we don't have atomic DDL now https://github.com/risingwavelabs/risingwave/issues/21863
1665    // so it may have potential data loss problem on the first barrier.
1666    //
1667    // For non-append-only table, we can always solve it by the initial sink with backfill, since
1668    // data will be present in hummock table.
1669    //
1670    // For append-only table, we need to be more careful.
1671    //
1672    // The possible cases for a table:
1673    // - For table without connector: it doesn't matter, since there's no data before the table is created
1674    // - For table with connector: we workarounded it by setting SOURCE_RATE_LIMIT to 0
1675    //   + If we support blocking DDL for table with connector, we need to be careful.
1676    // - For table with an upstream job: Specifically, CDC table from shared CDC source.
1677    //   + Data may come from both upstream connector, and CDC table backfill, so we need to pause both of them.
1678    //   + For now we don't support APPEND ONLY CDC table, so it's safe.
1679    let commit_checkpoint_interval = handler_args
1680        .with_options
1681        .get(COMMIT_CHECKPOINT_INTERVAL)
1682        .map(|v| v.to_owned())
1683        .unwrap_or_else(|| "60".to_owned());
1684    let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1685        ErrorCode::InvalidInputSyntax(format!(
1686            "commit_checkpoint_interval must be a positive integer: {}",
1687            commit_checkpoint_interval
1688        ))
1689    })?;
1690
1691    if commit_checkpoint_interval == 0 {
1692        bail!("commit_checkpoint_interval must be a positive integer: 0");
1693    }
1694
1695    // remove commit_checkpoint_interval from source options, otherwise it will be considered as an unknown field.
1696    source
1697        .as_mut()
1698        .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1699
1700    let sink_decouple = session.config().sink_decouple();
1701    if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1702        bail!(
1703            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1704        )
1705    }
1706
1707    sink_with.insert(
1708        COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1709        commit_checkpoint_interval.to_string(),
1710    );
1711    sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1712
1713    sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1714
1715    let partition_by = handler_args
1716        .with_options
1717        .get("partition_by")
1718        .map(|v| v.to_owned());
1719
1720    if let Some(partition_by) = &partition_by {
1721        let mut partition_columns = vec![];
1722        for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
1723            table_catalog
1724                .columns()
1725                .iter()
1726                .find(|col| col.name().eq_ignore_ascii_case(&column))
1727                .ok_or_else(|| {
1728                    ErrorCode::InvalidInputSyntax(format!(
1729                        "Partition source column does not exist in schema: {}",
1730                        column
1731                    ))
1732                })?;
1733
1734            partition_columns.push(column.to_owned());
1735        }
1736
1737        ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
1738            |_| {
1739                ErrorCode::InvalidInputSyntax(
1740                    "The partition columns should be the prefix of the primary key".to_owned(),
1741                )
1742            },
1743        )?;
1744
1745        sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
1746
1747        // remove partition_by from source options, otherwise it will be considered as an unknown field.
1748        source
1749            .as_mut()
1750            .map(|x| x.with_properties.remove("partition_by"));
1751    }
1752
1753    sink_handler_args.with_options =
1754        WithOptions::new(sink_with, Default::default(), connection_ref.clone());
1755
1756    let mut source_name = table_name.clone();
1757    *source_name.0.last_mut().unwrap() = Ident::from(
1758        (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
1759    );
1760    let create_source_stmt = CreateSourceStatement {
1761        temporary: false,
1762        if_not_exists: false,
1763        columns: vec![],
1764        source_name,
1765        wildcard_idx: Some(0),
1766        constraints: vec![],
1767        with_properties: WithProperties(vec![]),
1768        format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
1769        source_watermarks: vec![],
1770        include_column_options: vec![],
1771    };
1772
1773    let mut source_handler_args = handler_args.clone();
1774    let source_with = with_common;
1775    source_handler_args.with_options =
1776        WithOptions::new(source_with, Default::default(), connection_ref);
1777
1778    // before we create the table, ensure the JVM is initialized as we use jdbc catalog right now.
1779    // If JVM isn't initialized successfully, current not atomic ddl will result in a partially created iceberg engine table.
1780    let _ = JVM.get_or_init()?;
1781
1782    let catalog_writer = session.catalog_writer()?;
1783    // TODO(iceberg): make iceberg engine table creation ddl atomic
1784    let has_connector = source.is_some();
1785    catalog_writer
1786        .create_table(source, table, graph, job_type)
1787        .await?;
1788    let res = create_sink::handle_create_sink(sink_handler_args, create_sink_stmt, true).await;
1789    if res.is_err() {
1790        // Since we don't support ddl atomicity, we need to drop the partial created table.
1791        handle_drop_table(handler_args.clone(), table_name.clone(), true, true).await?;
1792        res?;
1793    }
1794    let res = create_source::handle_create_source(source_handler_args, create_source_stmt).await;
1795    if res.is_err() {
1796        // Since we don't support ddl atomicity, we need to drop the partial created table.
1797        handle_drop_table(handler_args.clone(), table_name.clone(), true, true).await?;
1798        res?;
1799    }
1800
1801    if has_connector {
1802        alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1803            handler_args,
1804            PbThrottleTarget::TableWithSource,
1805            table_name,
1806            -1,
1807        )
1808        .await?;
1809    }
1810
1811    Ok(())
1812}
1813
1814pub fn check_create_table_with_source(
1815    with_options: &WithOptions,
1816    format_encode: Option<FormatEncodeOptions>,
1817    include_column_options: &IncludeOption,
1818    cdc_table_info: &Option<CdcTableInfo>,
1819) -> Result<Option<FormatEncodeOptions>> {
1820    // skip check for cdc table
1821    if cdc_table_info.is_some() {
1822        return Ok(format_encode);
1823    }
1824    let defined_source = with_options.is_source_connector();
1825
1826    if !include_column_options.is_empty() && !defined_source {
1827        return Err(ErrorCode::InvalidInputSyntax(
1828            "INCLUDE should be used with a connector".to_owned(),
1829        )
1830        .into());
1831    }
1832    if defined_source {
1833        format_encode.as_ref().ok_or_else(|| {
1834            ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
1835        })?;
1836    }
1837    Ok(format_encode)
1838}
1839
1840fn ensure_partition_columns_are_prefix_of_primary_key(
1841    partition_columns: &[String],
1842    primary_key_columns: &[String],
1843) -> std::result::Result<(), String> {
1844    if partition_columns.len() > primary_key_columns.len() {
1845        return Err("Partition columns cannot be longer than primary key columns.".to_owned());
1846    }
1847
1848    for (i, partition_col) in partition_columns.iter().enumerate() {
1849        if primary_key_columns.get(i) != Some(partition_col) {
1850            return Err(format!(
1851                "Partition column '{}' is not a prefix of the primary key.",
1852                partition_col
1853            ));
1854        }
1855    }
1856
1857    Ok(())
1858}
1859
1860#[allow(clippy::too_many_arguments)]
1861pub async fn generate_stream_graph_for_replace_table(
1862    _session: &Arc<SessionImpl>,
1863    table_name: ObjectName,
1864    original_catalog: &Arc<TableCatalog>,
1865    handler_args: HandlerArgs,
1866    statement: Statement,
1867    col_id_gen: ColumnIdGenerator,
1868    sql_column_strategy: SqlColumnStrategy,
1869) -> Result<(StreamFragmentGraph, Table, Option<PbSource>, TableJobType)> {
1870    use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
1871
1872    let Statement::CreateTable {
1873        columns,
1874        constraints,
1875        source_watermarks,
1876        append_only,
1877        on_conflict,
1878        with_version_column,
1879        wildcard_idx,
1880        cdc_table_info,
1881        format_encode,
1882        include_column_options,
1883        engine,
1884        with_options,
1885        ..
1886    } = statement
1887    else {
1888        panic!("unexpected statement type: {:?}", statement);
1889    };
1890
1891    let format_encode = format_encode
1892        .clone()
1893        .map(|format_encode| format_encode.into_v2_with_warning());
1894
1895    let engine = match engine {
1896        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1897        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1898    };
1899
1900    let is_drop_connector =
1901        original_catalog.associated_source_id().is_some() && format_encode.is_none();
1902    if is_drop_connector {
1903        debug_assert!(
1904            source_watermarks.is_empty()
1905                && include_column_options.is_empty()
1906                && with_options
1907                    .iter()
1908                    .all(|opt| opt.name.real_value().to_lowercase() != "connector")
1909        );
1910    }
1911
1912    let props = CreateTableProps {
1913        definition: handler_args.normalized_sql.clone(),
1914        append_only,
1915        on_conflict: on_conflict.into(),
1916        with_version_column: with_version_column.as_ref().map(|x| x.real_value()),
1917        webhook_info: original_catalog.webhook_info.clone(),
1918        engine,
1919    };
1920
1921    let ((plan, mut source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
1922        (Some(format_encode), None) => (
1923            gen_create_table_plan_with_source(
1924                handler_args,
1925                ExplainOptions::default(),
1926                table_name,
1927                columns,
1928                wildcard_idx,
1929                constraints,
1930                format_encode,
1931                source_watermarks,
1932                col_id_gen,
1933                include_column_options,
1934                props,
1935                sql_column_strategy,
1936            )
1937            .await?,
1938            TableJobType::General,
1939        ),
1940        (None, None) => {
1941            let context = OptimizerContext::from_handler_args(handler_args);
1942            let (plan, table) = gen_create_table_plan(
1943                context,
1944                table_name,
1945                columns,
1946                constraints,
1947                col_id_gen,
1948                source_watermarks,
1949                props,
1950                true,
1951            )?;
1952            ((plan, None, table), TableJobType::General)
1953        }
1954        (None, Some(cdc_table)) => {
1955            let session = &handler_args.session;
1956            let (source, resolved_table_name, database_id, schema_id) =
1957                get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
1958
1959            let cdc_with_options = derive_with_options_for_cdc_table(
1960                &source.with_properties,
1961                cdc_table.external_table_name.clone(),
1962            )?;
1963
1964            let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
1965
1966            let context: OptimizerContextRef =
1967                OptimizerContext::new(handler_args, ExplainOptions::default()).into();
1968            let (plan, table) = gen_create_table_plan_for_cdc_table(
1969                context,
1970                source,
1971                cdc_table.external_table_name.clone(),
1972                columns,
1973                column_catalogs,
1974                pk_names,
1975                cdc_with_options,
1976                col_id_gen,
1977                on_conflict,
1978                with_version_column.map(|x| x.real_value()),
1979                IncludeOption::default(),
1980                table_name,
1981                resolved_table_name,
1982                database_id,
1983                schema_id,
1984                original_catalog.id(),
1985                engine,
1986            )?;
1987
1988            ((plan, None, table), TableJobType::SharedCdcSource)
1989        }
1990        (Some(_), Some(_)) => {
1991            return Err(ErrorCode::NotSupported(
1992                "Data format and encoding format doesn't apply to table created from a CDC source"
1993                    .into(),
1994                "Remove the FORMAT and ENCODE specification".into(),
1995            )
1996            .into());
1997        }
1998    };
1999
2000    // TODO: avoid this backward conversion.
2001    if TableCatalog::from(&table).pk_column_ids() != original_catalog.pk_column_ids() {
2002        Err(ErrorCode::InvalidInputSyntax(
2003            "alter primary key of table is not supported".to_owned(),
2004        ))?
2005    }
2006
2007    let graph = build_graph(plan, Some(GraphJobType::Table))?;
2008
2009    // Fill the original table ID.
2010    let mut table = Table {
2011        id: original_catalog.id().table_id(),
2012        ..table
2013    };
2014    if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2015        table.optional_associated_source_id = Some(OptionalAssociatedSourceId::AssociatedSourceId(
2016            source_id.table_id,
2017        ));
2018        source.as_mut().unwrap().id = source_id.table_id;
2019        source.as_mut().unwrap().optional_associated_table_id =
2020            Some(OptionalAssociatedTableId::AssociatedTableId(table.id))
2021    }
2022
2023    Ok((graph, table, source, job_type))
2024}
2025
2026fn get_source_and_resolved_table_name(
2027    session: &Arc<SessionImpl>,
2028    cdc_table: CdcTableInfo,
2029    table_name: ObjectName,
2030) -> Result<(Arc<SourceCatalog>, String, DatabaseId, SchemaId)> {
2031    let db_name = &session.database();
2032    let (schema_name, resolved_table_name) =
2033        Binder::resolve_schema_qualified_name(db_name, table_name)?;
2034    let (database_id, schema_id) =
2035        session.get_database_and_schema_id_for_create(schema_name.clone())?;
2036
2037    let (format_encode, source_name) =
2038        Binder::resolve_schema_qualified_name(db_name, cdc_table.source_name.clone())?;
2039
2040    let source = {
2041        let catalog_reader = session.env().catalog_reader().read_guard();
2042        let schema_name = format_encode.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2043        let (source, _) = catalog_reader.get_source_by_name(
2044            db_name,
2045            SchemaPath::Name(schema_name.as_str()),
2046            source_name.as_str(),
2047        )?;
2048        source.clone()
2049    };
2050
2051    Ok((source, resolved_table_name, database_id, schema_id))
2052}
2053
2054// validate the webhook_info and also bind the webhook_info to protobuf
2055fn bind_webhook_info(
2056    session: &Arc<SessionImpl>,
2057    columns_defs: &[ColumnDef],
2058    webhook_info: WebhookSourceInfo,
2059) -> Result<PbWebhookSourceInfo> {
2060    // validate columns
2061    if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &AstDataType::Jsonb
2062    {
2063        return Err(ErrorCode::InvalidInputSyntax(
2064            "Table with webhook source should have exactly one JSONB column".to_owned(),
2065        )
2066        .into());
2067    }
2068
2069    let WebhookSourceInfo {
2070        secret_ref,
2071        signature_expr,
2072        wait_for_persistence,
2073    } = webhook_info;
2074
2075    // validate secret_ref
2076    let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2077        let db_name = &session.database();
2078        let (schema_name, secret_name) =
2079            Binder::resolve_schema_qualified_name(db_name, secret_ref.secret_name.clone())?;
2080        let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2081        (
2082            Some(PbSecretRef {
2083                secret_id: secret_catalog.id.secret_id(),
2084                ref_as: match secret_ref.ref_as {
2085                    SecretRefAsType::Text => PbRefAsType::Text,
2086                    SecretRefAsType::File => PbRefAsType::File,
2087                }
2088                .into(),
2089            }),
2090            Some(secret_name),
2091        )
2092    } else {
2093        (None, None)
2094    };
2095
2096    let secure_compare_context = SecureCompareContext {
2097        column_name: columns_defs[0].name.real_value(),
2098        secret_name,
2099    };
2100    let mut binder = Binder::new_for_ddl_with_secure_compare(session, secure_compare_context);
2101    let expr = binder.bind_expr(signature_expr.clone())?;
2102
2103    // validate expr, ensuring it is SECURE_COMPARE()
2104    if expr.as_function_call().is_none()
2105        || expr.as_function_call().unwrap().func_type()
2106            != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2107    {
2108        return Err(ErrorCode::InvalidInputSyntax(
2109            "The signature verification function must be SECURE_COMPARE()".to_owned(),
2110        )
2111        .into());
2112    }
2113
2114    let pb_webhook_info = PbWebhookSourceInfo {
2115        secret_ref: pb_secret_ref,
2116        signature_expr: Some(expr.to_expr_proto()),
2117        wait_for_persistence,
2118    };
2119
2120    Ok(pb_webhook_info)
2121}
2122
2123#[cfg(test)]
2124mod tests {
2125    use risingwave_common::catalog::{
2126        DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2127    };
2128    use risingwave_common::types::{DataType, StructType};
2129
2130    use super::*;
2131    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2132
2133    #[tokio::test]
2134    async fn test_create_table_handler() {
2135        let sql =
2136            "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2137        let frontend = LocalFrontend::new(Default::default()).await;
2138        frontend.run_sql(sql).await.unwrap();
2139
2140        let session = frontend.session_ref();
2141        let catalog_reader = session.env().catalog_reader().read_guard();
2142        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2143
2144        // Check table exists.
2145        let (table, _) = catalog_reader
2146            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2147            .unwrap();
2148        assert_eq!(table.name(), "t");
2149
2150        let columns = table
2151            .columns
2152            .iter()
2153            .map(|col| (col.name(), col.data_type().clone()))
2154            .collect::<HashMap<&str, DataType>>();
2155
2156        let expected_columns = maplit::hashmap! {
2157            ROW_ID_COLUMN_NAME => DataType::Serial,
2158            "v1" => DataType::Int16,
2159            "v2" => StructType::new(
2160                vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2161            )
2162            .with_ids([3, 4, 5].map(ColumnId::new))
2163            .into(),
2164            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2165        };
2166
2167        assert_eq!(columns, expected_columns, "{columns:#?}");
2168    }
2169
2170    #[test]
2171    fn test_bind_primary_key() {
2172        // Note: Column ID 0 is reserved for row ID column.
2173
2174        for (sql, expected) in [
2175            ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
2176            ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
2177            ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
2178            (
2179                "create table t (v1 int primary key, v2 int primary key)",
2180                Err("multiple primary keys are not allowed"),
2181            ),
2182            (
2183                "create table t (v1 int primary key primary key, v2 int)",
2184                Err("multiple primary keys are not allowed"),
2185            ),
2186            (
2187                "create table t (v1 int, v2 int, primary key (v1))",
2188                Ok(&[1]),
2189            ),
2190            (
2191                "create table t (v1 int, primary key (v2), v2 int)",
2192                Ok(&[2]),
2193            ),
2194            (
2195                "create table t (primary key (v2, v1), v1 int, v2 int)",
2196                Ok(&[2, 1]),
2197            ),
2198            (
2199                "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
2200                Err("multiple primary keys are not allowed"),
2201            ),
2202            (
2203                "create table t (v1 int primary key, primary key (v1), v2 int)",
2204                Err("multiple primary keys are not allowed"),
2205            ),
2206            (
2207                "create table t (v1 int, primary key (V3), v2 int)",
2208                Err("column \"v3\" named in key does not exist"),
2209            ),
2210        ] {
2211            let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
2212            let risingwave_sqlparser::ast::Statement::CreateTable {
2213                columns: column_defs,
2214                constraints,
2215                ..
2216            } = ast.remove(0)
2217            else {
2218                panic!("test case should be create table")
2219            };
2220            let actual: Result<_> = (|| {
2221                let mut columns = bind_sql_columns(&column_defs, false)?;
2222                let mut col_id_gen = ColumnIdGenerator::new_initial();
2223                for c in &mut columns {
2224                    col_id_gen.generate(c)?;
2225                }
2226
2227                let pk_names =
2228                    bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
2229                let (_, pk_column_ids, _) =
2230                    bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
2231                Ok(pk_column_ids)
2232            })();
2233            match (expected, actual) {
2234                (Ok(expected), Ok(actual)) => assert_eq!(
2235                    expected.iter().copied().map(ColumnId::new).collect_vec(),
2236                    actual,
2237                    "sql: {sql}"
2238                ),
2239                (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
2240                (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
2241                (Err(expected), Err(actual)) => assert!(
2242                    actual.to_string().contains(expected),
2243                    "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
2244                ),
2245            }
2246        }
2247    }
2248
2249    #[tokio::test]
2250    async fn test_duplicate_props_options() {
2251        let proto_file = create_proto_file(PROTO_FILE_DATA);
2252        let sql = format!(
2253            r#"CREATE TABLE t
2254    WITH (
2255        connector = 'kinesis',
2256        aws.region='user_test_topic',
2257        endpoint='172.10.1.1:9090,172.10.1.2:9090',
2258        aws.credentials.access_key_id = 'your_access_key_1',
2259        aws.credentials.secret_access_key = 'your_secret_key_1'
2260    )
2261    FORMAT PLAIN ENCODE PROTOBUF (
2262        message = '.test.TestRecord',
2263        aws.credentials.access_key_id = 'your_access_key_2',
2264        aws.credentials.secret_access_key = 'your_secret_key_2',
2265        schema.location = 'file://{}',
2266    )"#,
2267            proto_file.path().to_str().unwrap()
2268        );
2269        let frontend = LocalFrontend::new(Default::default()).await;
2270        frontend.run_sql(sql).await.unwrap();
2271
2272        let session = frontend.session_ref();
2273        let catalog_reader = session.env().catalog_reader().read_guard();
2274        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2275
2276        // Check source exists.
2277        let (source, _) = catalog_reader
2278            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2279            .unwrap();
2280        assert_eq!(source.name, "t");
2281
2282        // AwsAuth params exist in options.
2283        assert_eq!(
2284            source
2285                .info
2286                .format_encode_options
2287                .get("aws.credentials.access_key_id")
2288                .unwrap(),
2289            "your_access_key_2"
2290        );
2291        assert_eq!(
2292            source
2293                .info
2294                .format_encode_options
2295                .get("aws.credentials.secret_access_key")
2296                .unwrap(),
2297            "your_secret_key_2"
2298        );
2299
2300        // AwsAuth params exist in props.
2301        assert_eq!(
2302            source
2303                .with_properties
2304                .get("aws.credentials.access_key_id")
2305                .unwrap(),
2306            "your_access_key_1"
2307        );
2308        assert_eq!(
2309            source
2310                .with_properties
2311                .get("aws.credentials.secret_access_key")
2312                .unwrap(),
2313            "your_secret_key_1"
2314        );
2315
2316        // Options are not merged into props.
2317        assert!(!source.with_properties.contains_key("schema.location"));
2318    }
2319}