risingwave_frontend/handler/
create_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28    CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29    RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, TableId,
30};
31use risingwave_common::config::MetaBackend;
32use risingwave_common::session_config::sink_decouple::SinkDecouple;
33use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
34use risingwave_common::util::value_encoding::DatumToProtoExt;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::jvm_runtime::JVM;
37use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_connector::source::cdc::external::{
40    DATABASE_NAME_KEY, ExternalTableConfig, ExternalTableImpl, SCHEMA_NAME_KEY, TABLE_NAME_KEY,
41};
42use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, source};
43use risingwave_pb::catalog::connection::Info as ConnectionInfo;
44use risingwave_pb::catalog::connection_params::ConnectionType;
45use risingwave_pb::catalog::source::OptionalAssociatedTableId;
46use risingwave_pb::catalog::{PbSource, PbTable, PbWebhookSourceInfo, Table, WatermarkDesc};
47use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
48use risingwave_pb::meta::PbThrottleTarget;
49use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
50use risingwave_pb::plan_common::{
51    AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
52};
53use risingwave_pb::secret::PbSecretRef;
54use risingwave_pb::secret::secret_ref::PbRefAsType;
55use risingwave_pb::stream_plan::StreamFragmentGraph;
56use risingwave_sqlparser::ast::{
57    CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
58    CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
59    FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
60    Statement, TableConstraint, WebhookSourceInfo, WithProperties,
61};
62use risingwave_sqlparser::parser::{IncludeOption, Parser};
63
64use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
65use super::{RwPgResponse, alter_streaming_rate_limit, create_sink, create_source};
66use crate::binder::{Clause, SecureCompareContext, bind_data_type};
67use crate::catalog::root_catalog::SchemaPath;
68use crate::catalog::source_catalog::SourceCatalog;
69use crate::catalog::table_catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, TableVersion};
70use crate::catalog::{ColumnId, DatabaseId, SchemaId, check_column_name_not_reserved};
71use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
72use crate::expr::{Expr, ExprImpl, ExprRewriter};
73use crate::handler::HandlerArgs;
74use crate::handler::create_source::{
75    UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
76    bind_source_watermark, handle_addition_columns,
77};
78use crate::handler::util::SourceSchemaCompatExt;
79use crate::optimizer::plan_node::generic::{CdcScanOptions, SourceNodeKind};
80use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource};
81use crate::optimizer::property::{Order, RequiredDist};
82use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot};
83use crate::session::SessionImpl;
84use crate::session::current::notice_to_user;
85use crate::stream_fragmenter::{GraphJobType, build_graph};
86use crate::utils::OverwriteOptions;
87use crate::{Binder, Explain, TableCatalog, WithOptions};
88
89mod col_id_gen;
90pub use col_id_gen::*;
91use risingwave_connector::sink::iceberg::parse_partition_by_exprs;
92
93use crate::handler::drop_table::handle_drop_table;
94
95fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
96    for option_def in &c.options {
97        match option_def.option {
98            ColumnOption::GeneratedColumns(_) => {}
99            ColumnOption::DefaultValue(_) => {}
100            ColumnOption::DefaultValueInternal { .. } => {}
101            ColumnOption::Unique { is_primary: true } => {}
102            ColumnOption::Null => {}
103            ColumnOption::NotNull => {}
104            _ => bail_not_implemented!("column constraints \"{}\"", option_def),
105        }
106    }
107    Ok(())
108}
109
110/// Binds the column schemas declared in CREATE statement into `ColumnDesc`.
111/// If a column is marked as `primary key`, its `ColumnId` is also returned.
112/// This primary key is not combined with table constraints yet.
113pub fn bind_sql_columns(
114    column_defs: &[ColumnDef],
115    is_for_drop_table_connector: bool,
116) -> Result<Vec<ColumnCatalog>> {
117    let mut columns = Vec::with_capacity(column_defs.len());
118
119    for column in column_defs {
120        ensure_column_options_supported(column)?;
121        // Destruct to make sure all fields are properly handled rather than ignored.
122        // Do NOT use `..` to ignore fields you do not want to deal with.
123        // Reject them with a clear NotImplemented error.
124        let ColumnDef {
125            name,
126            data_type,
127            collation,
128            options,
129            ..
130        } = column;
131
132        let data_type = data_type
133            .clone()
134            .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
135        if let Some(collation) = collation {
136            // PostgreSQL will limit the datatypes that collate can work on.
137            // https://www.postgresql.org/docs/16/collation.html#COLLATION-CONCEPTS
138            //   > The built-in collatable data types are `text`, `varchar`, and `char`.
139            //
140            // But we don't support real collation, we simply ignore it here.
141            if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
142                bail_not_implemented!(
143                    "Collate collation other than `C` or `POSIX` is not implemented"
144                );
145            }
146
147            match data_type {
148                AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
149                _ => {
150                    return Err(ErrorCode::NotSupported(
151                        format!("{} is not a collatable data type", data_type),
152                        "The only built-in collatable data types are `varchar`, please check your type".into(),
153                    ).into());
154                }
155            }
156        }
157
158        if !is_for_drop_table_connector {
159            // additional column name may have prefix _rw
160            // When converting dropping the connector from table, the additional columns are converted to normal columns and keep the original name.
161            // Under this case, we loosen the check for _rw prefix.
162            check_column_name_not_reserved(&name.real_value())?;
163        }
164
165        let nullable: bool = !options
166            .iter()
167            .any(|def| matches!(def.option, ColumnOption::NotNull));
168
169        columns.push(ColumnCatalog {
170            column_desc: ColumnDesc {
171                data_type: bind_data_type(&data_type)?,
172                column_id: ColumnId::placeholder(),
173                name: name.real_value(),
174                generated_or_default_column: None,
175                description: None,
176                additional_column: AdditionalColumn { column_type: None },
177                version: ColumnDescVersion::LATEST,
178                system_column: None,
179                nullable,
180            },
181            is_hidden: false,
182        });
183    }
184
185    Ok(columns)
186}
187
188fn check_generated_column_constraints(
189    column_name: &String,
190    column_id: ColumnId,
191    expr: &ExprImpl,
192    column_catalogs: &[ColumnCatalog],
193    generated_column_names: &[String],
194    pk_column_ids: &[ColumnId],
195) -> Result<()> {
196    let input_refs = expr.collect_input_refs(column_catalogs.len());
197    for idx in input_refs.ones() {
198        let referred_generated_column = &column_catalogs[idx].column_desc.name;
199        if generated_column_names
200            .iter()
201            .any(|c| c == referred_generated_column)
202        {
203            return Err(ErrorCode::BindError(format!(
204                "Generated can not reference another generated column. \
205                But here generated column \"{}\" referenced another generated column \"{}\"",
206                column_name, referred_generated_column
207            ))
208            .into());
209        }
210    }
211
212    if pk_column_ids.contains(&column_id) && expr.is_impure() {
213        return Err(ErrorCode::BindError(format!(
214            "Generated columns with impure expressions should not be part of the primary key. \
215            Here column \"{}\" is defined as part of the primary key.",
216            column_name
217        ))
218        .into());
219    }
220
221    Ok(())
222}
223
224/// Binds constraints that can be only specified in column definitions,
225/// currently generated columns and default columns.
226pub fn bind_sql_column_constraints(
227    session: &SessionImpl,
228    table_name: String,
229    column_catalogs: &mut [ColumnCatalog],
230    columns: Vec<ColumnDef>,
231    pk_column_ids: &[ColumnId],
232) -> Result<()> {
233    let generated_column_names = {
234        let mut names = vec![];
235        for column in &columns {
236            for option_def in &column.options {
237                if let ColumnOption::GeneratedColumns(_) = option_def.option {
238                    names.push(column.name.real_value());
239                    break;
240                }
241            }
242        }
243        names
244    };
245
246    let mut binder = Binder::new_for_ddl(session);
247    binder.bind_columns_to_context(table_name.clone(), column_catalogs)?;
248
249    for column in columns {
250        let Some(idx) = column_catalogs
251            .iter()
252            .position(|c| c.name() == column.name.real_value())
253        else {
254            // It's possible that we don't follow the user defined columns in SQL but take the
255            // ones resolved from the source, thus missing some columns. Simply ignore them.
256            continue;
257        };
258
259        for option_def in column.options {
260            match option_def.option {
261                ColumnOption::GeneratedColumns(expr) => {
262                    binder.set_clause(Some(Clause::GeneratedColumn));
263
264                    let expr_impl = binder.bind_expr(expr).with_context(|| {
265                        format!(
266                            "fail to bind expression in generated column \"{}\"",
267                            column.name.real_value()
268                        )
269                    })?;
270
271                    check_generated_column_constraints(
272                        &column.name.real_value(),
273                        column_catalogs[idx].column_id(),
274                        &expr_impl,
275                        column_catalogs,
276                        &generated_column_names,
277                        pk_column_ids,
278                    )?;
279
280                    column_catalogs[idx].column_desc.generated_or_default_column = Some(
281                        GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
282                            expr: Some(expr_impl.to_expr_proto()),
283                        }),
284                    );
285                    binder.set_clause(None);
286                }
287                ColumnOption::DefaultValue(expr) => {
288                    let expr_impl = binder
289                        .bind_expr(expr)?
290                        .cast_assign(column_catalogs[idx].data_type().clone())?;
291
292                    // Rewrite expressions to evaluate a snapshot value, used for missing values in the case of
293                    // schema change.
294                    //
295                    // TODO: Currently we don't support impure expressions other than `now()` (like `random()`),
296                    // so the rewritten expression should almost always be pure and we directly call `fold_const`
297                    // here. Actually we do not require purity of the expression here since we're only to get a
298                    // snapshot value.
299                    let rewritten_expr_impl = session
300                        .pinned_snapshot()
301                        .inline_now_proc_time()
302                        .rewrite_expr(expr_impl.clone());
303
304                    if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
305                        let snapshot_value = snapshot_value?;
306
307                        column_catalogs[idx].column_desc.generated_or_default_column =
308                            Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
309                                snapshot_value: Some(snapshot_value.to_protobuf()),
310                                expr: Some(expr_impl.to_expr_proto()),
311                                // persist the original expression
312                            }));
313                    } else {
314                        return Err(ErrorCode::BindError(format!(
315                            "Default expression used in column `{}` cannot be evaluated. \
316                            Use generated columns instead if you mean to reference other columns.",
317                            column.name
318                        ))
319                        .into());
320                    }
321                }
322                ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
323                    // When a `DEFAULT INTERNAL` is used internally for schema change, the persisted value
324                    // should already be set during purifcation. So if we encounter an empty value here, it
325                    // means the user has specified it explicitly in the SQL statement, typically by
326                    // directly copying the result of `SHOW CREATE TABLE` and executing it.
327                    if persisted.is_empty() {
328                        bail_bind_error!(
329                            "DEFAULT INTERNAL is only used for internal purposes, \
330                             please specify a concrete default value"
331                        );
332                    }
333
334                    let desc = DefaultColumnDesc::decode(&*persisted)
335                        .expect("failed to decode persisted `DefaultColumnDesc`");
336
337                    column_catalogs[idx].column_desc.generated_or_default_column =
338                        Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
339                }
340                _ => {}
341            }
342        }
343    }
344    Ok(())
345}
346
347/// Currently we only support Primary key table constraint, so just return pk names if it exists
348pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
349    let mut pk_column_names = vec![];
350
351    for constraint in table_constraints {
352        match constraint {
353            TableConstraint::Unique {
354                name: _,
355                columns,
356                is_primary: true,
357            } => {
358                if !pk_column_names.is_empty() {
359                    return Err(multiple_pk_definition_err());
360                }
361                pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
362            }
363            _ => bail_not_implemented!("table constraint \"{}\"", constraint),
364        }
365    }
366    Ok(pk_column_names)
367}
368
369pub fn bind_sql_pk_names(
370    columns_defs: &[ColumnDef],
371    pk_names_from_table_constraints: Vec<String>,
372) -> Result<Vec<String>> {
373    let mut pk_column_names = pk_names_from_table_constraints;
374
375    for column in columns_defs {
376        for option_def in &column.options {
377            if let ColumnOption::Unique { is_primary: true } = option_def.option {
378                if !pk_column_names.is_empty() {
379                    return Err(multiple_pk_definition_err());
380                }
381                pk_column_names.push(column.name.real_value());
382            };
383        }
384    }
385
386    Ok(pk_column_names)
387}
388
389fn multiple_pk_definition_err() -> RwError {
390    ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
391}
392
393/// Binds primary keys defined in SQL.
394///
395/// It returns the columns together with `pk_column_ids`, and an optional row id column index if
396/// added.
397pub fn bind_pk_and_row_id_on_relation(
398    mut columns: Vec<ColumnCatalog>,
399    pk_names: Vec<String>,
400    must_need_pk: bool,
401) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
402    for c in &columns {
403        assert!(c.column_id() != ColumnId::placeholder());
404    }
405
406    // Mapping from column name to column id.
407    let name_to_id = columns
408        .iter()
409        .map(|c| (c.name(), c.column_id()))
410        .collect::<HashMap<_, _>>();
411
412    let mut pk_column_ids: Vec<_> = pk_names
413        .iter()
414        .map(|name| {
415            name_to_id.get(name.as_str()).copied().ok_or_else(|| {
416                ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
417            })
418        })
419        .try_collect()?;
420
421    // Add `_row_id` column if `pk_column_ids` is empty and must_need_pk
422    let need_row_id = pk_column_ids.is_empty() && must_need_pk;
423
424    let row_id_index = need_row_id.then(|| {
425        let column = ColumnCatalog::row_id_column();
426        let index = columns.len();
427        pk_column_ids = vec![column.column_id()];
428        columns.push(column);
429        index
430    });
431
432    if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
433        Err(ErrorCode::InvalidInputSyntax(format!(
434            "column \"{col}\" specified more than once"
435        )))?;
436    }
437
438    Ok((columns, pk_column_ids, row_id_index))
439}
440
441/// `gen_create_table_plan_with_source` generates the plan for creating a table with an external
442/// stream source.
443#[allow(clippy::too_many_arguments)]
444pub(crate) async fn gen_create_table_plan_with_source(
445    mut handler_args: HandlerArgs,
446    explain_options: ExplainOptions,
447    table_name: ObjectName,
448    column_defs: Vec<ColumnDef>,
449    wildcard_idx: Option<usize>,
450    constraints: Vec<TableConstraint>,
451    format_encode: FormatEncodeOptions,
452    source_watermarks: Vec<SourceWatermark>,
453    mut col_id_gen: ColumnIdGenerator,
454    include_column_options: IncludeOption,
455    props: CreateTableProps,
456    sql_column_strategy: SqlColumnStrategy,
457) -> Result<(PlanRef, Option<PbSource>, PbTable)> {
458    if props.append_only
459        && format_encode.format != Format::Plain
460        && format_encode.format != Format::Native
461    {
462        return Err(ErrorCode::BindError(format!(
463            "Append only table does not support format {}.",
464            format_encode.format
465        ))
466        .into());
467    }
468
469    let session = &handler_args.session;
470    let with_properties = bind_connector_props(&handler_args, &format_encode, false)?;
471    if with_properties.is_shareable_cdc_connector() {
472        generated_columns_check_for_cdc_table(&column_defs)?;
473        not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
474    } else if column_defs.iter().any(|col| {
475        col.options
476            .iter()
477            .any(|def| matches!(def.option, ColumnOption::NotNull))
478    }) {
479        // if non-cdc source
480        notice_to_user(
481            "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
482        );
483    }
484
485    let db_name: &str = &session.database();
486    let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
487
488    // TODO: omit this step if `sql_column_strategy` is `Follow`.
489    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
490        session,
491        &format_encode,
492        Either::Left(&with_properties),
493        CreateSourceType::Table,
494    )
495    .await?;
496
497    let overwrite_options = OverwriteOptions::new(&mut handler_args);
498    let rate_limit = overwrite_options.source_rate_limit;
499    let source_catalog = bind_create_source_or_table_with_connector(
500        handler_args.clone(),
501        table_name,
502        format_encode,
503        with_properties,
504        &column_defs,
505        constraints,
506        wildcard_idx,
507        source_watermarks,
508        columns_from_resolve_source,
509        source_info,
510        include_column_options,
511        &mut col_id_gen,
512        CreateSourceType::Table,
513        rate_limit,
514        sql_column_strategy,
515    )
516    .await?;
517
518    let pb_source = source_catalog.to_prost();
519
520    let context = OptimizerContext::new(handler_args, explain_options);
521
522    let (plan, table) = gen_table_plan_with_source(
523        context.into(),
524        schema_name,
525        source_catalog,
526        col_id_gen.into_version(),
527        props,
528    )?;
529
530    Ok((plan, Some(pb_source), table))
531}
532
533/// `gen_create_table_plan` generates the plan for creating a table without an external stream
534/// source.
535#[allow(clippy::too_many_arguments)]
536pub(crate) fn gen_create_table_plan(
537    context: OptimizerContext,
538    table_name: ObjectName,
539    column_defs: Vec<ColumnDef>,
540    constraints: Vec<TableConstraint>,
541    mut col_id_gen: ColumnIdGenerator,
542    source_watermarks: Vec<SourceWatermark>,
543    props: CreateTableProps,
544    is_for_replace_plan: bool,
545) -> Result<(PlanRef, PbTable)> {
546    let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
547    for c in &mut columns {
548        col_id_gen.generate(c)?;
549    }
550
551    let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
552    if !secret_refs.is_empty() || !connection_refs.is_empty() {
553        return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
554    }
555
556    gen_create_table_plan_without_source(
557        context,
558        table_name,
559        columns,
560        column_defs,
561        constraints,
562        source_watermarks,
563        col_id_gen.into_version(),
564        props,
565    )
566}
567
568#[allow(clippy::too_many_arguments)]
569pub(crate) fn gen_create_table_plan_without_source(
570    context: OptimizerContext,
571    table_name: ObjectName,
572    columns: Vec<ColumnCatalog>,
573    column_defs: Vec<ColumnDef>,
574    constraints: Vec<TableConstraint>,
575    source_watermarks: Vec<SourceWatermark>,
576    version: TableVersion,
577    props: CreateTableProps,
578) -> Result<(PlanRef, PbTable)> {
579    // XXX: Why not bind outside?
580    let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
581    let (mut columns, pk_column_ids, row_id_index) =
582        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
583
584    let watermark_descs: Vec<WatermarkDesc> = bind_source_watermark(
585        context.session_ctx(),
586        table_name.real_value(),
587        source_watermarks,
588        &columns,
589    )?;
590
591    bind_sql_column_constraints(
592        context.session_ctx(),
593        table_name.real_value(),
594        &mut columns,
595        column_defs,
596        &pk_column_ids,
597    )?;
598    let session = context.session_ctx().clone();
599
600    let db_name = &session.database();
601    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?;
602
603    let info = CreateTableInfo {
604        columns,
605        pk_column_ids,
606        row_id_index,
607        watermark_descs,
608        source_catalog: None,
609        version,
610    };
611
612    gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
613}
614
615fn gen_table_plan_with_source(
616    context: OptimizerContextRef,
617    schema_name: Option<String>,
618    source_catalog: SourceCatalog,
619    version: TableVersion,
620    props: CreateTableProps,
621) -> Result<(PlanRef, PbTable)> {
622    let table_name = source_catalog.name.clone();
623
624    let info = CreateTableInfo {
625        columns: source_catalog.columns.clone(),
626        pk_column_ids: source_catalog.pk_col_ids.clone(),
627        row_id_index: source_catalog.row_id_index,
628        watermark_descs: source_catalog.watermark_descs.clone(),
629        source_catalog: Some(source_catalog),
630        version,
631    };
632
633    gen_table_plan_inner(context, schema_name, table_name, info, props)
634}
635
636/// On-conflict behavior either from user input or existing table catalog.
637#[derive(Clone, Copy)]
638pub enum EitherOnConflict {
639    Ast(Option<OnConflict>),
640    Resolved(ConflictBehavior),
641}
642
643impl From<Option<OnConflict>> for EitherOnConflict {
644    fn from(v: Option<OnConflict>) -> Self {
645        Self::Ast(v)
646    }
647}
648
649impl From<ConflictBehavior> for EitherOnConflict {
650    fn from(v: ConflictBehavior) -> Self {
651        Self::Resolved(v)
652    }
653}
654
655impl EitherOnConflict {
656    /// Resolves the conflict behavior based on the given information.
657    pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
658        let conflict_behavior = match self {
659            EitherOnConflict::Ast(on_conflict) => {
660                if append_only {
661                    if row_id_as_pk {
662                        // Primary key will be generated, no conflict check needed.
663                        ConflictBehavior::NoCheck
664                    } else {
665                        // User defined PK on append-only table, enforce `DO NOTHING`.
666                        if let Some(on_conflict) = on_conflict
667                            && on_conflict != OnConflict::Nothing
668                        {
669                            return Err(ErrorCode::InvalidInputSyntax(
670                                "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
671                                     the ON CONFLICT behavior must be DO NOTHING."
672                                    .to_owned(),
673                            )
674                            .into());
675                        }
676                        ConflictBehavior::IgnoreConflict
677                    }
678                } else {
679                    // Default to `UPDATE FULL` for non-append-only tables.
680                    match on_conflict.unwrap_or(OnConflict::UpdateFull) {
681                        OnConflict::UpdateFull => ConflictBehavior::Overwrite,
682                        OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
683                        OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
684                    }
685                }
686            }
687            EitherOnConflict::Resolved(b) => b,
688        };
689
690        Ok(conflict_behavior)
691    }
692}
693
694/// Arguments of the functions that generate a table plan, part 1.
695///
696/// Compared to [`CreateTableProps`], this struct contains fields that need some work of binding
697/// or resolving based on the user input.
698pub struct CreateTableInfo {
699    pub columns: Vec<ColumnCatalog>,
700    pub pk_column_ids: Vec<ColumnId>,
701    pub row_id_index: Option<usize>,
702    pub watermark_descs: Vec<WatermarkDesc>,
703    pub source_catalog: Option<SourceCatalog>,
704    pub version: TableVersion,
705}
706
707/// Arguments of the functions that generate a table plan, part 2.
708///
709/// Compared to [`CreateTableInfo`], this struct contains fields that can be (relatively) simply
710/// obtained from the input or the context.
711pub struct CreateTableProps {
712    pub definition: String,
713    pub append_only: bool,
714    pub on_conflict: EitherOnConflict,
715    pub with_version_column: Option<String>,
716    pub webhook_info: Option<PbWebhookSourceInfo>,
717    pub engine: Engine,
718}
719
720#[allow(clippy::too_many_arguments)]
721fn gen_table_plan_inner(
722    context: OptimizerContextRef,
723    schema_name: Option<String>,
724    table_name: String,
725    info: CreateTableInfo,
726    props: CreateTableProps,
727) -> Result<(PlanRef, PbTable)> {
728    let CreateTableInfo {
729        ref columns,
730        row_id_index,
731        ref watermark_descs,
732        ref source_catalog,
733        ..
734    } = info;
735    let CreateTableProps { append_only, .. } = props;
736
737    let (database_id, schema_id) = context
738        .session_ctx()
739        .get_database_and_schema_id_for_create(schema_name)?;
740
741    let session = context.session_ctx().clone();
742    let retention_seconds = context.with_options().retention_seconds();
743
744    let source_node: PlanRef = LogicalSource::new(
745        source_catalog.clone().map(Rc::new),
746        columns.clone(),
747        row_id_index,
748        SourceNodeKind::CreateTable,
749        context.clone(),
750        None,
751    )?
752    .into();
753
754    let required_cols = FixedBitSet::with_capacity(columns.len());
755    let plan_root = PlanRoot::new_with_logical_plan(
756        source_node,
757        RequiredDist::Any,
758        Order::any(),
759        required_cols,
760        vec![],
761    );
762
763    if !append_only && !watermark_descs.is_empty() {
764        return Err(ErrorCode::NotSupported(
765            "Defining watermarks on table requires the table to be append only.".to_owned(),
766            "Use the key words `APPEND ONLY`".to_owned(),
767        )
768        .into());
769    }
770
771    if !append_only && retention_seconds.is_some() {
772        return Err(ErrorCode::NotSupported(
773            "Defining retention seconds on table requires the table to be append only.".to_owned(),
774            "Use the key words `APPEND ONLY`".to_owned(),
775        )
776        .into());
777    }
778
779    let materialize =
780        plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
781
782    let mut table = materialize.table().to_prost();
783
784    table.owner = session.user_id();
785    Ok((materialize.into(), table))
786}
787
788/// Generate stream plan for cdc table based on shared source.
789/// In replace workflow, the `table_id` is the id of the table to be replaced
790/// in create table workflow, the `table_id` is a placeholder will be filled in the Meta
791#[allow(clippy::too_many_arguments)]
792pub(crate) fn gen_create_table_plan_for_cdc_table(
793    context: OptimizerContextRef,
794    source: Arc<SourceCatalog>,
795    external_table_name: String,
796    column_defs: Vec<ColumnDef>,
797    mut columns: Vec<ColumnCatalog>,
798    pk_names: Vec<String>,
799    cdc_with_options: WithOptionsSecResolved,
800    mut col_id_gen: ColumnIdGenerator,
801    on_conflict: Option<OnConflict>,
802    with_version_column: Option<String>,
803    include_column_options: IncludeOption,
804    table_name: ObjectName,
805    resolved_table_name: String, // table name without schema prefix
806    database_id: DatabaseId,
807    schema_id: SchemaId,
808    table_id: TableId,
809    engine: Engine,
810) -> Result<(PlanRef, PbTable)> {
811    let session = context.session_ctx().clone();
812
813    // append additional columns to the end
814    handle_addition_columns(
815        None,
816        &cdc_with_options,
817        include_column_options,
818        &mut columns,
819        true,
820    )?;
821
822    for c in &mut columns {
823        col_id_gen.generate(c)?;
824    }
825
826    let (mut columns, pk_column_ids, _row_id_index) =
827        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
828
829    // NOTES: In auto schema change, default value is not provided in column definition.
830    bind_sql_column_constraints(
831        context.session_ctx(),
832        table_name.real_value(),
833        &mut columns,
834        column_defs,
835        &pk_column_ids,
836    )?;
837
838    let definition = context.normalized_sql().to_owned();
839
840    let pk_column_indices = {
841        let mut id_to_idx = HashMap::new();
842        columns.iter().enumerate().for_each(|(idx, c)| {
843            id_to_idx.insert(c.column_id(), idx);
844        });
845        // pk column id must exist in table columns.
846        pk_column_ids
847            .iter()
848            .map(|c| id_to_idx.get(c).copied().unwrap())
849            .collect_vec()
850    };
851    let table_pk = pk_column_indices
852        .iter()
853        .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
854        .collect();
855
856    let (options, secret_refs) = cdc_with_options.into_parts();
857
858    let non_generated_column_descs = columns
859        .iter()
860        .filter(|&c| (!c.is_generated()))
861        .map(|c| c.column_desc.clone())
862        .collect_vec();
863    let non_generated_column_num = non_generated_column_descs.len();
864
865    let cdc_table_desc = CdcTableDesc {
866        table_id,
867        source_id: source.id.into(), // id of cdc source streaming job
868        external_table_name: external_table_name.clone(),
869        pk: table_pk,
870        columns: non_generated_column_descs,
871        stream_key: pk_column_indices,
872        connect_properties: options,
873        secret_refs,
874    };
875
876    tracing::debug!(?cdc_table_desc, "create cdc table");
877
878    let options = CdcScanOptions::from_with_options(context.with_options())?;
879
880    let logical_scan = LogicalCdcScan::create(
881        external_table_name.clone(),
882        Rc::new(cdc_table_desc),
883        context.clone(),
884        options,
885    );
886
887    let scan_node: PlanRef = logical_scan.into();
888    let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
889    let plan_root = PlanRoot::new_with_logical_plan(
890        scan_node,
891        RequiredDist::Any,
892        Order::any(),
893        required_cols,
894        vec![],
895    );
896
897    let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
898    let materialize = plan_root.gen_table_plan(
899        context,
900        resolved_table_name,
901        database_id,
902        schema_id,
903        CreateTableInfo {
904            columns,
905            pk_column_ids,
906            row_id_index: None,
907            watermark_descs: vec![],
908            source_catalog: Some((*source).clone()),
909            version: col_id_gen.into_version(),
910        },
911        CreateTableProps {
912            definition,
913            append_only: false,
914            on_conflict: on_conflict.into(),
915            with_version_column,
916            webhook_info: None,
917            engine,
918        },
919    )?;
920
921    let mut table = materialize.table().to_prost();
922    table.owner = session.user_id();
923    table.cdc_table_id = Some(cdc_table_id);
924    table.dependent_relations = vec![source.id];
925
926    Ok((materialize.into(), table))
927}
928
929fn derive_with_options_for_cdc_table(
930    source_with_properties: &WithOptionsSecResolved,
931    external_table_name: String,
932) -> Result<WithOptionsSecResolved> {
933    use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
934    // we should remove the prefix from `full_table_name`
935    let source_database_name: &str = source_with_properties
936        .get("database.name")
937        .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
938        .as_str();
939    let mut with_options = source_with_properties.clone();
940    if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
941        match connector.as_str() {
942            MYSQL_CDC_CONNECTOR => {
943                // MySQL doesn't allow '.' in database name and table name, so we can split the
944                // external table name by '.' to get the table name
945                let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
946                    anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
947                })?;
948                // We allow multiple database names in the source definition
949                if !source_database_name
950                    .split(',')
951                    .map(|s| s.trim())
952                    .any(|name| name == db_name)
953                {
954                    return Err(anyhow!(
955                        "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
956                        db_name,
957                        source_database_name
958                    ).into());
959                }
960                with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
961                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
962            }
963            POSTGRES_CDC_CONNECTOR => {
964                let (schema_name, table_name) = external_table_name
965                    .split_once('.')
966                    .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
967
968                // insert 'schema.name' into connect properties
969                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
970                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
971            }
972            SQL_SERVER_CDC_CONNECTOR => {
973                // SQL Server external table name is in 'databaseName.schemaName.tableName' pattern,
974                // we remove the database name prefix and split the schema name and table name
975                let (db_name, schema_table_name) =
976                    external_table_name.split_once('.').ok_or_else(|| {
977                        anyhow!("The upstream table name must be in 'database.schema.table' format")
978                    })?;
979
980                // Currently SQL Server only supports single database name in the source definition
981                if db_name != source_database_name {
982                    return Err(anyhow!(
983                            "The database name `{}` in the FROM clause is not the same as the database name `{}` in source definition",
984                            db_name,
985                            source_database_name
986                        ).into());
987                }
988
989                let (schema_name, table_name) =
990                    schema_table_name.split_once('.').ok_or_else(|| {
991                        anyhow!("The table name must contain schema name prefix, e.g. 'dbo.table'")
992                    })?;
993
994                // insert 'schema.name' into connect properties
995                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
996                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
997            }
998            _ => {
999                return Err(RwError::from(anyhow!(
1000                    "connector {} is not supported for cdc table",
1001                    connector
1002                )));
1003            }
1004        };
1005    }
1006    Ok(with_options)
1007}
1008
1009#[allow(clippy::too_many_arguments)]
1010pub(super) async fn handle_create_table_plan(
1011    handler_args: HandlerArgs,
1012    explain_options: ExplainOptions,
1013    format_encode: Option<FormatEncodeOptions>,
1014    cdc_table_info: Option<CdcTableInfo>,
1015    table_name: ObjectName,
1016    column_defs: Vec<ColumnDef>,
1017    wildcard_idx: Option<usize>,
1018    constraints: Vec<TableConstraint>,
1019    source_watermarks: Vec<SourceWatermark>,
1020    append_only: bool,
1021    on_conflict: Option<OnConflict>,
1022    with_version_column: Option<String>,
1023    include_column_options: IncludeOption,
1024    webhook_info: Option<WebhookSourceInfo>,
1025    engine: Engine,
1026) -> Result<(PlanRef, Option<PbSource>, PbTable, TableJobType)> {
1027    let col_id_gen = ColumnIdGenerator::new_initial();
1028    let format_encode = check_create_table_with_source(
1029        &handler_args.with_options,
1030        format_encode,
1031        &include_column_options,
1032        &cdc_table_info,
1033    )?;
1034    let webhook_info = webhook_info
1035        .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1036        .transpose()?;
1037
1038    let props = CreateTableProps {
1039        definition: handler_args.normalized_sql.clone(),
1040        append_only,
1041        on_conflict: on_conflict.into(),
1042        with_version_column: with_version_column.clone(),
1043        webhook_info,
1044        engine,
1045    };
1046
1047    let ((plan, source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
1048        (Some(format_encode), None) => (
1049            gen_create_table_plan_with_source(
1050                handler_args,
1051                explain_options,
1052                table_name.clone(),
1053                column_defs,
1054                wildcard_idx,
1055                constraints,
1056                format_encode,
1057                source_watermarks,
1058                col_id_gen,
1059                include_column_options,
1060                props,
1061                SqlColumnStrategy::FollowChecked,
1062            )
1063            .await?,
1064            TableJobType::General,
1065        ),
1066        (None, None) => {
1067            let context = OptimizerContext::new(handler_args, explain_options);
1068            let (plan, table) = gen_create_table_plan(
1069                context,
1070                table_name.clone(),
1071                column_defs,
1072                constraints,
1073                col_id_gen,
1074                source_watermarks,
1075                props,
1076                false,
1077            )?;
1078
1079            ((plan, None, table), TableJobType::General)
1080        }
1081
1082        (None, Some(cdc_table)) => {
1083            sanity_check_for_table_on_cdc_source(
1084                append_only,
1085                &column_defs,
1086                &wildcard_idx,
1087                &constraints,
1088                &source_watermarks,
1089            )?;
1090
1091            generated_columns_check_for_cdc_table(&column_defs)?;
1092            not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1093
1094            let session = &handler_args.session;
1095            let db_name = &session.database();
1096            let user_name = &session.user_name();
1097            let search_path = session.config().search_path();
1098            let (schema_name, resolved_table_name) =
1099                Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
1100            let (database_id, schema_id) =
1101                session.get_database_and_schema_id_for_create(schema_name.clone())?;
1102
1103            // cdc table cannot be append-only
1104            let (format_encode, source_name) =
1105                Binder::resolve_schema_qualified_name(db_name, cdc_table.source_name.clone())?;
1106
1107            let source = {
1108                let catalog_reader = session.env().catalog_reader().read_guard();
1109                let schema_path =
1110                    SchemaPath::new(format_encode.as_deref(), &search_path, user_name);
1111
1112                let (source, _) = catalog_reader.get_source_by_name(
1113                    db_name,
1114                    schema_path,
1115                    source_name.as_str(),
1116                )?;
1117                source.clone()
1118            };
1119            let cdc_with_options: WithOptionsSecResolved = derive_with_options_for_cdc_table(
1120                &source.with_properties,
1121                cdc_table.external_table_name.clone(),
1122            )?;
1123
1124            let (columns, pk_names) = match wildcard_idx {
1125                Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1126                None => {
1127                    for column_def in &column_defs {
1128                        for option_def in &column_def.options {
1129                            if let ColumnOption::DefaultValue(_)
1130                            | ColumnOption::DefaultValueInternal { .. } = option_def.option
1131                            {
1132                                return Err(ErrorCode::NotSupported(
1133                                            "Default value for columns defined on the table created from a CDC source".into(),
1134                                            "Remove the default value expression in the column definitions".into(),
1135                                        )
1136                                            .into());
1137                            }
1138                        }
1139                    }
1140
1141                    let (columns, pk_names) =
1142                        bind_cdc_table_schema(&column_defs, &constraints, false)?;
1143                    // read default value definition from external db
1144                    let (options, secret_refs) = cdc_with_options.clone().into_parts();
1145                    let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1146                        .context("failed to extract external table config")?;
1147
1148                    // NOTE: if the external table has a default column, we will only treat it as a normal column.
1149                    (columns, pk_names)
1150                }
1151            };
1152
1153            let context: OptimizerContextRef =
1154                OptimizerContext::new(handler_args, explain_options).into();
1155            let (plan, table) = gen_create_table_plan_for_cdc_table(
1156                context,
1157                source,
1158                cdc_table.external_table_name.clone(),
1159                column_defs,
1160                columns,
1161                pk_names,
1162                cdc_with_options,
1163                col_id_gen,
1164                on_conflict,
1165                with_version_column,
1166                include_column_options,
1167                table_name,
1168                resolved_table_name,
1169                database_id,
1170                schema_id,
1171                TableId::placeholder(),
1172                engine,
1173            )?;
1174
1175            ((plan, None, table), TableJobType::SharedCdcSource)
1176        }
1177        (Some(_), Some(_)) => {
1178            return Err(ErrorCode::NotSupported(
1179                "Data format and encoding format doesn't apply to table created from a CDC source"
1180                    .into(),
1181                "Remove the FORMAT and ENCODE specification".into(),
1182            )
1183            .into());
1184        }
1185    };
1186    Ok((plan, source, table, job_type))
1187}
1188
1189// For both table from cdc source and table with cdc connector
1190fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1191    let mut found_generated_column = false;
1192    for column in columns {
1193        let mut is_generated = false;
1194
1195        for option_def in &column.options {
1196            if let ColumnOption::GeneratedColumns(_) = option_def.option {
1197                is_generated = true;
1198                break;
1199            }
1200        }
1201
1202        if is_generated {
1203            found_generated_column = true;
1204        } else if found_generated_column {
1205            return Err(ErrorCode::NotSupported(
1206                "Non-generated column found after a generated column.".into(),
1207                "Ensure that all generated columns appear at the end of the cdc table definition."
1208                    .into(),
1209            )
1210            .into());
1211        }
1212    }
1213    Ok(())
1214}
1215
1216// For both table from cdc source and table with cdc connector
1217fn not_null_check_for_cdc_table(
1218    wildcard_idx: &Option<usize>,
1219    column_defs: &Vec<ColumnDef>,
1220) -> Result<()> {
1221    if !wildcard_idx.is_some()
1222        && column_defs.iter().any(|col| {
1223            col.options
1224                .iter()
1225                .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1226        })
1227    {
1228        return Err(ErrorCode::NotSupported(
1229            "CDC table with NOT NULL constraint is not supported".to_owned(),
1230            "Please remove the NOT NULL constraint for columns".to_owned(),
1231        )
1232        .into());
1233    }
1234    Ok(())
1235}
1236
1237// Only for table from cdc source
1238fn sanity_check_for_table_on_cdc_source(
1239    append_only: bool,
1240    column_defs: &Vec<ColumnDef>,
1241    wildcard_idx: &Option<usize>,
1242    constraints: &Vec<TableConstraint>,
1243    source_watermarks: &Vec<SourceWatermark>,
1244) -> Result<()> {
1245    // wildcard cannot be used with column definitions
1246    if wildcard_idx.is_some() && !column_defs.is_empty() {
1247        return Err(ErrorCode::NotSupported(
1248            "wildcard(*) and column definitions cannot be used together".to_owned(),
1249            "Remove the wildcard or column definitions".to_owned(),
1250        )
1251        .into());
1252    }
1253
1254    // cdc table must have primary key constraint or primary key column
1255    if !wildcard_idx.is_some()
1256        && !constraints.iter().any(|c| {
1257            matches!(
1258                c,
1259                TableConstraint::Unique {
1260                    is_primary: true,
1261                    ..
1262                }
1263            )
1264        })
1265        && !column_defs.iter().any(|col| {
1266            col.options
1267                .iter()
1268                .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1269        })
1270    {
1271        return Err(ErrorCode::NotSupported(
1272            "CDC table without primary key constraint is not supported".to_owned(),
1273            "Please define a primary key".to_owned(),
1274        )
1275        .into());
1276    }
1277
1278    if append_only {
1279        return Err(ErrorCode::NotSupported(
1280            "append only modifier on the table created from a CDC source".into(),
1281            "Remove the APPEND ONLY clause".into(),
1282        )
1283        .into());
1284    }
1285
1286    if !source_watermarks.is_empty() {
1287        return Err(ErrorCode::NotSupported(
1288            "watermark defined on the table created from a CDC source".into(),
1289            "Remove the Watermark definitions".into(),
1290        )
1291        .into());
1292    }
1293
1294    Ok(())
1295}
1296
1297/// Derive schema for cdc table when create a new Table or alter an existing Table
1298async fn bind_cdc_table_schema_externally(
1299    cdc_with_options: WithOptionsSecResolved,
1300) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1301    // read cdc table schema from external db or parsing the schema from SQL definitions
1302    let (options, secret_refs) = cdc_with_options.into_parts();
1303    let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1304        .context("failed to extract external table config")?;
1305
1306    let table = ExternalTableImpl::connect(config)
1307        .await
1308        .context("failed to auto derive table schema")?;
1309    Ok((
1310        table
1311            .column_descs()
1312            .iter()
1313            .cloned()
1314            .map(|column_desc| ColumnCatalog {
1315                column_desc,
1316                is_hidden: false,
1317            })
1318            .collect(),
1319        table.pk_names().clone(),
1320    ))
1321}
1322
1323/// Derive schema for cdc table when create a new Table or alter an existing Table
1324fn bind_cdc_table_schema(
1325    column_defs: &Vec<ColumnDef>,
1326    constraints: &Vec<TableConstraint>,
1327    is_for_replace_plan: bool,
1328) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1329    let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1330
1331    let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1332    Ok((columns, pk_names))
1333}
1334
1335#[allow(clippy::too_many_arguments)]
1336pub async fn handle_create_table(
1337    mut handler_args: HandlerArgs,
1338    table_name: ObjectName,
1339    column_defs: Vec<ColumnDef>,
1340    wildcard_idx: Option<usize>,
1341    constraints: Vec<TableConstraint>,
1342    if_not_exists: bool,
1343    format_encode: Option<FormatEncodeOptions>,
1344    source_watermarks: Vec<SourceWatermark>,
1345    append_only: bool,
1346    on_conflict: Option<OnConflict>,
1347    with_version_column: Option<String>,
1348    cdc_table_info: Option<CdcTableInfo>,
1349    include_column_options: IncludeOption,
1350    webhook_info: Option<WebhookSourceInfo>,
1351    ast_engine: risingwave_sqlparser::ast::Engine,
1352) -> Result<RwPgResponse> {
1353    let session = handler_args.session.clone();
1354
1355    if append_only {
1356        session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1357    }
1358
1359    session.check_cluster_limits().await?;
1360
1361    let engine = match ast_engine {
1362        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1363        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1364    };
1365    if engine == Engine::Iceberg && handler_args.with_options.get_connector().is_some() {
1366        // HACK: since we don't have atomic DDL, table with connector may lose data.
1367        // FIXME: remove this after https://github.com/risingwavelabs/risingwave/issues/21863
1368        if let Some(_rate_limit) = handler_args.with_options.insert(
1369            OverwriteOptions::SOURCE_RATE_LIMIT_KEY.to_owned(),
1370            "0".to_owned(),
1371        ) {
1372            // prevent user specified rate limit
1373            return Err(ErrorCode::NotSupported(
1374                "source_rate_limit for iceberg table engine during table creation".to_owned(),
1375                "Please remove source_rate_limit from WITH options.".to_owned(),
1376            )
1377            .into());
1378        }
1379    }
1380
1381    if let Either::Right(resp) = session.check_relation_name_duplicated(
1382        table_name.clone(),
1383        StatementType::CREATE_TABLE,
1384        if_not_exists,
1385    )? {
1386        return Ok(resp);
1387    }
1388
1389    let (graph, source, hummock_table, job_type) = {
1390        let (plan, source, table, job_type) = handle_create_table_plan(
1391            handler_args.clone(),
1392            ExplainOptions::default(),
1393            format_encode,
1394            cdc_table_info,
1395            table_name.clone(),
1396            column_defs.clone(),
1397            wildcard_idx,
1398            constraints.clone(),
1399            source_watermarks,
1400            append_only,
1401            on_conflict,
1402            with_version_column,
1403            include_column_options,
1404            webhook_info,
1405            engine,
1406        )
1407        .await?;
1408        tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1409
1410        let graph = build_graph(plan, Some(GraphJobType::Table))?;
1411
1412        (graph, source, table, job_type)
1413    };
1414
1415    tracing::trace!(
1416        "name={}, graph=\n{}",
1417        table_name,
1418        serde_json::to_string_pretty(&graph).unwrap()
1419    );
1420
1421    // Handle engine
1422    match engine {
1423        Engine::Hummock => {
1424            let catalog_writer = session.catalog_writer()?;
1425            catalog_writer
1426                .create_table(source, hummock_table, graph, job_type, if_not_exists)
1427                .await?;
1428        }
1429        Engine::Iceberg => {
1430            create_iceberg_engine_table(
1431                session,
1432                handler_args,
1433                source,
1434                hummock_table,
1435                graph,
1436                table_name,
1437                job_type,
1438                if_not_exists,
1439            )
1440            .await?;
1441        }
1442    }
1443
1444    Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1445}
1446
1447/// Iceberg table engine is composed of hummock table, iceberg sink and iceberg source.
1448///
1449/// 1. fetch iceberg engine options from the meta node. Or use iceberg engine connection provided by users.
1450/// 2. create a hummock table
1451/// 3. create an iceberg sink
1452/// 4. create an iceberg source
1453///
1454/// See <https://github.com/risingwavelabs/risingwave/issues/21586> for an architecture diagram.
1455#[allow(clippy::too_many_arguments)]
1456pub async fn create_iceberg_engine_table(
1457    session: Arc<SessionImpl>,
1458    handler_args: HandlerArgs,
1459    mut source: Option<PbSource>,
1460    table: PbTable,
1461    graph: StreamFragmentGraph,
1462    table_name: ObjectName,
1463    job_type: PbTableJobType,
1464    if_not_exists: bool,
1465) -> Result<()> {
1466    let meta_client = session.env().meta_client();
1467    let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1468
1469    let meta_store_endpoint = url::Url::parse(&meta_store_endpoint).map_err(|_| {
1470        ErrorCode::InternalError("failed to parse the meta store endpoint".to_owned())
1471    })?;
1472    let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1473    let meta_store_user = meta_store_endpoint.username().to_owned();
1474    let meta_store_password = meta_store_endpoint
1475        .password()
1476        .and_then(|p| percent_decode_str(p).decode_utf8().ok())
1477        .ok_or_else(|| {
1478            ErrorCode::InternalError("failed to parse password from meta store endpoint".to_owned())
1479        })?
1480        .into_owned();
1481    let meta_store_host = meta_store_endpoint
1482        .host_str()
1483        .ok_or_else(|| {
1484            ErrorCode::InternalError("failed to parse host from meta store endpoint".to_owned())
1485        })?
1486        .to_owned();
1487    let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1488        ErrorCode::InternalError("failed to parse port from meta store endpoint".to_owned())
1489    })?;
1490    let meta_store_database = meta_store_endpoint
1491        .path()
1492        .trim_start_matches('/')
1493        .to_owned();
1494
1495    let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1496        bail!("failed to parse meta backend: {}", meta_store_backend);
1497    };
1498
1499    let catalog_uri = match meta_backend {
1500        MetaBackend::Postgres => {
1501            format!(
1502                "jdbc:postgresql://{}:{}/{}",
1503                meta_store_host.clone(),
1504                meta_store_port.clone(),
1505                meta_store_database.clone()
1506            )
1507        }
1508        MetaBackend::Mysql => {
1509            format!(
1510                "jdbc:mysql://{}:{}/{}",
1511                meta_store_host.clone(),
1512                meta_store_port.clone(),
1513                meta_store_database.clone()
1514            )
1515        }
1516        MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1517            bail!(
1518                "Unsupported meta backend for iceberg engine table: {}",
1519                meta_store_backend
1520            );
1521        }
1522    };
1523
1524    let rw_db_name = session
1525        .env()
1526        .catalog_reader()
1527        .read_guard()
1528        .get_database_by_id(&table.database_id)?
1529        .name()
1530        .to_owned();
1531    let rw_schema_name = session
1532        .env()
1533        .catalog_reader()
1534        .read_guard()
1535        .get_schema_by_id(&table.database_id, &table.schema_id)?
1536        .name()
1537        .clone();
1538    let iceberg_catalog_name = rw_db_name.clone();
1539    let iceberg_database_name = rw_schema_name.clone();
1540    let iceberg_table_name = table_name.0.last().unwrap().real_value();
1541
1542    let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1543    let sink_decouple = session.config().sink_decouple();
1544    if matches!(sink_decouple, SinkDecouple::Disable) {
1545        bail!(
1546            "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1547        );
1548    }
1549
1550    let mut connection_ref = BTreeMap::new();
1551    let with_common = if iceberg_engine_connection.is_empty() {
1552        bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1553    } else {
1554        let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1555        assert_eq!(parts.len(), 2);
1556        let connection_catalog =
1557            session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1558        if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1559            if params.connection_type == ConnectionType::Iceberg as i32 {
1560                // With iceberg engine connection:
1561                connection_ref.insert(
1562                    "connection".to_owned(),
1563                    ConnectionRefValue {
1564                        connection_name: ObjectName::from(vec![
1565                            Ident::from(parts[0]),
1566                            Ident::from(parts[1]),
1567                        ]),
1568                    },
1569                );
1570
1571                let mut with_common = BTreeMap::new();
1572                with_common.insert("connector".to_owned(), "iceberg".to_owned());
1573                with_common.insert("database.name".to_owned(), iceberg_database_name.to_owned());
1574                with_common.insert("table.name".to_owned(), iceberg_table_name.to_owned());
1575
1576                if let Some(s) = params.properties.get("hosted_catalog") {
1577                    if s.eq_ignore_ascii_case("true") {
1578                        with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1579                        with_common.insert("catalog.uri".to_owned(), catalog_uri.to_owned());
1580                        with_common
1581                            .insert("catalog.jdbc.user".to_owned(), meta_store_user.to_owned());
1582                        with_common.insert(
1583                            "catalog.jdbc.password".to_owned(),
1584                            meta_store_password.clone(),
1585                        );
1586                        with_common
1587                            .insert("catalog.name".to_owned(), iceberg_catalog_name.to_owned());
1588                    }
1589                }
1590
1591                with_common
1592            } else {
1593                return Err(RwError::from(ErrorCode::InvalidParameterValue(
1594                    "Only iceberg connection could be used in iceberg engine".to_owned(),
1595                )));
1596            }
1597        } else {
1598            return Err(RwError::from(ErrorCode::InvalidParameterValue(
1599                "Private Link Service has been deprecated. Please create a new connection instead."
1600                    .to_owned(),
1601            )));
1602        }
1603    };
1604
1605    let table_catalog = TableCatalog::from(table.clone());
1606
1607    // Iceberg sinks require a primary key, if none is provided, we will use the _row_id column
1608    // Fetch primary key from columns
1609    let mut pks = table_catalog
1610        .pk_column_names()
1611        .iter()
1612        .map(|c| c.to_string())
1613        .collect::<Vec<String>>();
1614
1615    // For the table without primary key. We will use `_row_id` as primary key
1616    let sink_from = if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1617        pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1618        let [stmt]: [_; 1] = Parser::parse_sql(&format!(
1619            "select {} as {}, * from {}",
1620            ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, table_name
1621        ))
1622        .context("unable to parse query")?
1623        .try_into()
1624        .unwrap();
1625
1626        let Statement::Query(query) = &stmt else {
1627            panic!("unexpected statement: {:?}", stmt);
1628        };
1629        CreateSink::AsQuery(query.clone())
1630    } else {
1631        CreateSink::From(table_name.clone())
1632    };
1633
1634    let mut sink_name = table_name.clone();
1635    *sink_name.0.last_mut().unwrap() = Ident::from(
1636        (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1637    );
1638    let create_sink_stmt = CreateSinkStatement {
1639        if_not_exists: false,
1640        sink_name,
1641        with_properties: WithProperties(vec![]),
1642        sink_from,
1643        columns: vec![],
1644        emit_mode: None,
1645        sink_schema: None,
1646        into_table_name: None,
1647    };
1648
1649    let mut sink_handler_args = handler_args.clone();
1650
1651    let mut sink_with = with_common.clone();
1652
1653    sink_with.insert("primary_key".to_owned(), pks.join(","));
1654    sink_with.insert("type".to_owned(), "upsert".to_owned());
1655    // sink_with.insert(SINK_SNAPSHOT_OPTION.to_owned(), "false".to_owned());
1656    //
1657    // Note: in theory, we don't need to backfill from the table to the sink,
1658    // but we don't have atomic DDL now https://github.com/risingwavelabs/risingwave/issues/21863
1659    // so it may have potential data loss problem on the first barrier.
1660    //
1661    // For non-append-only table, we can always solve it by the initial sink with backfill, since
1662    // data will be present in hummock table.
1663    //
1664    // For append-only table, we need to be more careful.
1665    //
1666    // The possible cases for a table:
1667    // - For table without connector: it doesn't matter, since there's no data before the table is created
1668    // - For table with connector: we workarounded it by setting SOURCE_RATE_LIMIT to 0
1669    //   + If we support blocking DDL for table with connector, we need to be careful.
1670    // - For table with an upstream job: Specifically, CDC table from shared CDC source.
1671    //   + Data may come from both upstream connector, and CDC table backfill, so we need to pause both of them.
1672    //   + For now we don't support APPEND ONLY CDC table, so it's safe.
1673    let commit_checkpoint_interval = handler_args
1674        .with_options
1675        .get(COMMIT_CHECKPOINT_INTERVAL)
1676        .map(|v| v.to_owned())
1677        .unwrap_or_else(|| "60".to_owned());
1678    let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1679        ErrorCode::InvalidInputSyntax(format!(
1680            "commit_checkpoint_interval must be a positive integer: {}",
1681            commit_checkpoint_interval
1682        ))
1683    })?;
1684
1685    if commit_checkpoint_interval == 0 {
1686        bail!("commit_checkpoint_interval must be a positive integer: 0");
1687    }
1688
1689    // remove commit_checkpoint_interval from source options, otherwise it will be considered as an unknown field.
1690    source
1691        .as_mut()
1692        .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1693
1694    let sink_decouple = session.config().sink_decouple();
1695    if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1696        bail!(
1697            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1698        )
1699    }
1700
1701    sink_with.insert(
1702        COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1703        commit_checkpoint_interval.to_string(),
1704    );
1705    sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1706
1707    sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1708
1709    const ENABLE_COMPACTION: &str = "enable_compaction";
1710    const COMPACTION_INTERVAL_SEC: &str = "compaction_interval_sec";
1711    const ENABLE_SNAPSHOT_EXPIRATION: &str = "enable_snapshot_expiration";
1712
1713    if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1714        match enable_compaction.to_lowercase().as_str() {
1715            "true" => {
1716                risingwave_common::license::Feature::IcebergCompaction
1717                    .check_available()
1718                    .map_err(|e| anyhow::anyhow!(e))?;
1719
1720                sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1721            }
1722            "false" => {
1723                sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1724            }
1725            _ => {
1726                return Err(ErrorCode::InvalidInputSyntax(format!(
1727                    "enable_compaction must be true or false: {}",
1728                    enable_compaction
1729                ))
1730                .into());
1731            }
1732        }
1733
1734        // remove enable_compaction from source options, otherwise it will be considered as an unknown field.
1735        source
1736            .as_mut()
1737            .map(|x| x.with_properties.remove("enable_compaction"));
1738    } else {
1739        sink_with.insert(
1740            ENABLE_COMPACTION.to_owned(),
1741            risingwave_common::license::Feature::IcebergCompaction
1742                .check_available()
1743                .is_ok()
1744                .to_string(),
1745        );
1746    }
1747
1748    if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1749        let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1750            ErrorCode::InvalidInputSyntax(format!(
1751                "compaction_interval_sec must be a positive integer: {}",
1752                commit_checkpoint_interval
1753            ))
1754        })?;
1755        if compaction_interval_sec == 0 {
1756            bail!("compaction_interval_sec must be a positive integer: 0");
1757        }
1758        sink_with.insert(
1759            "compaction_interval_sec".to_owned(),
1760            compaction_interval_sec.to_string(),
1761        );
1762        // remove compaction_interval_sec from source options, otherwise it will be considered as an unknown field.
1763        source
1764            .as_mut()
1765            .map(|x| x.with_properties.remove("compaction_interval_sec"));
1766    }
1767
1768    if let Some(enable_snapshot_expiration) =
1769        handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1770    {
1771        match enable_snapshot_expiration.to_lowercase().as_str() {
1772            "true" => {
1773                risingwave_common::license::Feature::IcebergCompaction
1774                    .check_available()
1775                    .map_err(|e| anyhow::anyhow!(e))?;
1776                sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1777            }
1778            "false" => {
1779                sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1780            }
1781            _ => {
1782                return Err(ErrorCode::InvalidInputSyntax(format!(
1783                    "enable_snapshot_expiration must be true or false: {}",
1784                    enable_snapshot_expiration
1785                ))
1786                .into());
1787            }
1788        }
1789
1790        // remove enable_snapshot_expiration from source options, otherwise it will be considered as an unknown field.
1791        source
1792            .as_mut()
1793            .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1794    } else {
1795        sink_with.insert(
1796            ENABLE_SNAPSHOT_EXPIRATION.to_owned(),
1797            risingwave_common::license::Feature::IcebergCompaction
1798                .check_available()
1799                .is_ok()
1800                .to_string(),
1801        );
1802    }
1803
1804    let partition_by = handler_args
1805        .with_options
1806        .get("partition_by")
1807        .map(|v| v.to_owned());
1808
1809    if let Some(partition_by) = &partition_by {
1810        let mut partition_columns = vec![];
1811        for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
1812            table_catalog
1813                .columns()
1814                .iter()
1815                .find(|col| col.name().eq_ignore_ascii_case(&column))
1816                .ok_or_else(|| {
1817                    ErrorCode::InvalidInputSyntax(format!(
1818                        "Partition source column does not exist in schema: {}",
1819                        column
1820                    ))
1821                })?;
1822
1823            partition_columns.push(column.to_owned());
1824        }
1825
1826        ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
1827            |_| {
1828                ErrorCode::InvalidInputSyntax(
1829                    "The partition columns should be the prefix of the primary key".to_owned(),
1830                )
1831            },
1832        )?;
1833
1834        sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
1835
1836        // remove partition_by from source options, otherwise it will be considered as an unknown field.
1837        source
1838            .as_mut()
1839            .map(|x| x.with_properties.remove("partition_by"));
1840    }
1841
1842    sink_handler_args.with_options =
1843        WithOptions::new(sink_with, Default::default(), connection_ref.clone());
1844
1845    let mut source_name = table_name.clone();
1846    *source_name.0.last_mut().unwrap() = Ident::from(
1847        (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
1848    );
1849    let create_source_stmt = CreateSourceStatement {
1850        temporary: false,
1851        if_not_exists: false,
1852        columns: vec![],
1853        source_name,
1854        wildcard_idx: Some(0),
1855        constraints: vec![],
1856        with_properties: WithProperties(vec![]),
1857        format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
1858        source_watermarks: vec![],
1859        include_column_options: vec![],
1860    };
1861
1862    let mut source_handler_args = handler_args.clone();
1863    let source_with = with_common;
1864    source_handler_args.with_options =
1865        WithOptions::new(source_with, Default::default(), connection_ref);
1866
1867    // before we create the table, ensure the JVM is initialized as we use jdbc catalog right now.
1868    // If JVM isn't initialized successfully, current not atomic ddl will result in a partially created iceberg engine table.
1869    let _ = JVM.get_or_init()?;
1870
1871    let catalog_writer = session.catalog_writer()?;
1872    // TODO(iceberg): make iceberg engine table creation ddl atomic
1873    let has_connector = source.is_some();
1874    catalog_writer
1875        .create_table(source, table, graph, job_type, if_not_exists)
1876        .await?;
1877    let res = create_sink::handle_create_sink(sink_handler_args, create_sink_stmt, true).await;
1878    if res.is_err() {
1879        // Since we don't support ddl atomicity, we need to drop the partial created table.
1880        handle_drop_table(handler_args.clone(), table_name.clone(), true, true).await?;
1881        res?;
1882    }
1883    let res = create_source::handle_create_source(source_handler_args, create_source_stmt).await;
1884    if res.is_err() {
1885        // Since we don't support ddl atomicity, we need to drop the partial created table.
1886        handle_drop_table(handler_args.clone(), table_name.clone(), true, true).await?;
1887        res?;
1888    }
1889
1890    if has_connector {
1891        alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1892            handler_args,
1893            PbThrottleTarget::TableWithSource,
1894            table_name,
1895            -1,
1896        )
1897        .await?;
1898    }
1899
1900    Ok(())
1901}
1902
1903pub fn check_create_table_with_source(
1904    with_options: &WithOptions,
1905    format_encode: Option<FormatEncodeOptions>,
1906    include_column_options: &IncludeOption,
1907    cdc_table_info: &Option<CdcTableInfo>,
1908) -> Result<Option<FormatEncodeOptions>> {
1909    // skip check for cdc table
1910    if cdc_table_info.is_some() {
1911        return Ok(format_encode);
1912    }
1913    let defined_source = with_options.is_source_connector();
1914
1915    if !include_column_options.is_empty() && !defined_source {
1916        return Err(ErrorCode::InvalidInputSyntax(
1917            "INCLUDE should be used with a connector".to_owned(),
1918        )
1919        .into());
1920    }
1921    if defined_source {
1922        format_encode.as_ref().ok_or_else(|| {
1923            ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
1924        })?;
1925    }
1926    Ok(format_encode)
1927}
1928
1929fn ensure_partition_columns_are_prefix_of_primary_key(
1930    partition_columns: &[String],
1931    primary_key_columns: &[String],
1932) -> std::result::Result<(), String> {
1933    if partition_columns.len() > primary_key_columns.len() {
1934        return Err("Partition columns cannot be longer than primary key columns.".to_owned());
1935    }
1936
1937    for (i, partition_col) in partition_columns.iter().enumerate() {
1938        if primary_key_columns.get(i) != Some(partition_col) {
1939            return Err(format!(
1940                "Partition column '{}' is not a prefix of the primary key.",
1941                partition_col
1942            ));
1943        }
1944    }
1945
1946    Ok(())
1947}
1948
1949#[allow(clippy::too_many_arguments)]
1950pub async fn generate_stream_graph_for_replace_table(
1951    _session: &Arc<SessionImpl>,
1952    table_name: ObjectName,
1953    original_catalog: &Arc<TableCatalog>,
1954    handler_args: HandlerArgs,
1955    statement: Statement,
1956    col_id_gen: ColumnIdGenerator,
1957    sql_column_strategy: SqlColumnStrategy,
1958) -> Result<(StreamFragmentGraph, Table, Option<PbSource>, TableJobType)> {
1959    use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
1960
1961    let Statement::CreateTable {
1962        columns,
1963        constraints,
1964        source_watermarks,
1965        append_only,
1966        on_conflict,
1967        with_version_column,
1968        wildcard_idx,
1969        cdc_table_info,
1970        format_encode,
1971        include_column_options,
1972        engine,
1973        with_options,
1974        ..
1975    } = statement
1976    else {
1977        panic!("unexpected statement type: {:?}", statement);
1978    };
1979
1980    let format_encode = format_encode
1981        .clone()
1982        .map(|format_encode| format_encode.into_v2_with_warning());
1983
1984    let engine = match engine {
1985        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1986        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1987    };
1988
1989    let is_drop_connector =
1990        original_catalog.associated_source_id().is_some() && format_encode.is_none();
1991    if is_drop_connector {
1992        debug_assert!(
1993            source_watermarks.is_empty()
1994                && include_column_options.is_empty()
1995                && with_options
1996                    .iter()
1997                    .all(|opt| opt.name.real_value().to_lowercase() != "connector")
1998        );
1999    }
2000
2001    let props = CreateTableProps {
2002        definition: handler_args.normalized_sql.clone(),
2003        append_only,
2004        on_conflict: on_conflict.into(),
2005        with_version_column: with_version_column.as_ref().map(|x| x.real_value()),
2006        webhook_info: original_catalog.webhook_info.clone(),
2007        engine,
2008    };
2009
2010    let ((plan, mut source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2011        (Some(format_encode), None) => (
2012            gen_create_table_plan_with_source(
2013                handler_args,
2014                ExplainOptions::default(),
2015                table_name,
2016                columns,
2017                wildcard_idx,
2018                constraints,
2019                format_encode,
2020                source_watermarks,
2021                col_id_gen,
2022                include_column_options,
2023                props,
2024                sql_column_strategy,
2025            )
2026            .await?,
2027            TableJobType::General,
2028        ),
2029        (None, None) => {
2030            let context = OptimizerContext::from_handler_args(handler_args);
2031            let (plan, table) = gen_create_table_plan(
2032                context,
2033                table_name,
2034                columns,
2035                constraints,
2036                col_id_gen,
2037                source_watermarks,
2038                props,
2039                true,
2040            )?;
2041            ((plan, None, table), TableJobType::General)
2042        }
2043        (None, Some(cdc_table)) => {
2044            let session = &handler_args.session;
2045            let (source, resolved_table_name, database_id, schema_id) =
2046                get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2047
2048            let cdc_with_options = derive_with_options_for_cdc_table(
2049                &source.with_properties,
2050                cdc_table.external_table_name.clone(),
2051            )?;
2052
2053            let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2054
2055            let context: OptimizerContextRef =
2056                OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2057            let (plan, table) = gen_create_table_plan_for_cdc_table(
2058                context,
2059                source,
2060                cdc_table.external_table_name.clone(),
2061                columns,
2062                column_catalogs,
2063                pk_names,
2064                cdc_with_options,
2065                col_id_gen,
2066                on_conflict,
2067                with_version_column.map(|x| x.real_value()),
2068                IncludeOption::default(),
2069                table_name,
2070                resolved_table_name,
2071                database_id,
2072                schema_id,
2073                original_catalog.id(),
2074                engine,
2075            )?;
2076
2077            ((plan, None, table), TableJobType::SharedCdcSource)
2078        }
2079        (Some(_), Some(_)) => {
2080            return Err(ErrorCode::NotSupported(
2081                "Data format and encoding format doesn't apply to table created from a CDC source"
2082                    .into(),
2083                "Remove the FORMAT and ENCODE specification".into(),
2084            )
2085            .into());
2086        }
2087    };
2088
2089    // TODO: avoid this backward conversion.
2090    if TableCatalog::from(&table).pk_column_ids() != original_catalog.pk_column_ids() {
2091        Err(ErrorCode::InvalidInputSyntax(
2092            "alter primary key of table is not supported".to_owned(),
2093        ))?
2094    }
2095
2096    let graph = build_graph(plan, Some(GraphJobType::Table))?;
2097
2098    // Fill the original table ID.
2099    let mut table = Table {
2100        id: original_catalog.id().table_id(),
2101        ..table
2102    };
2103    if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2104        table.optional_associated_source_id = Some(OptionalAssociatedSourceId::AssociatedSourceId(
2105            source_id.table_id,
2106        ));
2107        source.as_mut().unwrap().id = source_id.table_id;
2108        source.as_mut().unwrap().optional_associated_table_id =
2109            Some(OptionalAssociatedTableId::AssociatedTableId(table.id))
2110    }
2111
2112    Ok((graph, table, source, job_type))
2113}
2114
2115fn get_source_and_resolved_table_name(
2116    session: &Arc<SessionImpl>,
2117    cdc_table: CdcTableInfo,
2118    table_name: ObjectName,
2119) -> Result<(Arc<SourceCatalog>, String, DatabaseId, SchemaId)> {
2120    let db_name = &session.database();
2121    let (schema_name, resolved_table_name) =
2122        Binder::resolve_schema_qualified_name(db_name, table_name)?;
2123    let (database_id, schema_id) =
2124        session.get_database_and_schema_id_for_create(schema_name.clone())?;
2125
2126    let (format_encode, source_name) =
2127        Binder::resolve_schema_qualified_name(db_name, cdc_table.source_name.clone())?;
2128
2129    let source = {
2130        let catalog_reader = session.env().catalog_reader().read_guard();
2131        let schema_name = format_encode.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2132        let (source, _) = catalog_reader.get_source_by_name(
2133            db_name,
2134            SchemaPath::Name(schema_name.as_str()),
2135            source_name.as_str(),
2136        )?;
2137        source.clone()
2138    };
2139
2140    Ok((source, resolved_table_name, database_id, schema_id))
2141}
2142
2143// validate the webhook_info and also bind the webhook_info to protobuf
2144fn bind_webhook_info(
2145    session: &Arc<SessionImpl>,
2146    columns_defs: &[ColumnDef],
2147    webhook_info: WebhookSourceInfo,
2148) -> Result<PbWebhookSourceInfo> {
2149    // validate columns
2150    if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &AstDataType::Jsonb
2151    {
2152        return Err(ErrorCode::InvalidInputSyntax(
2153            "Table with webhook source should have exactly one JSONB column".to_owned(),
2154        )
2155        .into());
2156    }
2157
2158    let WebhookSourceInfo {
2159        secret_ref,
2160        signature_expr,
2161        wait_for_persistence,
2162        is_batched,
2163    } = webhook_info;
2164
2165    // validate secret_ref
2166    let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2167        let db_name = &session.database();
2168        let (schema_name, secret_name) =
2169            Binder::resolve_schema_qualified_name(db_name, secret_ref.secret_name.clone())?;
2170        let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2171        (
2172            Some(PbSecretRef {
2173                secret_id: secret_catalog.id.secret_id(),
2174                ref_as: match secret_ref.ref_as {
2175                    SecretRefAsType::Text => PbRefAsType::Text,
2176                    SecretRefAsType::File => PbRefAsType::File,
2177                }
2178                .into(),
2179            }),
2180            Some(secret_name),
2181        )
2182    } else {
2183        (None, None)
2184    };
2185
2186    let secure_compare_context = SecureCompareContext {
2187        column_name: columns_defs[0].name.real_value(),
2188        secret_name,
2189    };
2190    let mut binder = Binder::new_for_ddl_with_secure_compare(session, secure_compare_context);
2191    let expr = binder.bind_expr(signature_expr.clone())?;
2192
2193    // validate expr, ensuring it is SECURE_COMPARE()
2194    if expr.as_function_call().is_none()
2195        || expr.as_function_call().unwrap().func_type()
2196            != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2197    {
2198        return Err(ErrorCode::InvalidInputSyntax(
2199            "The signature verification function must be SECURE_COMPARE()".to_owned(),
2200        )
2201        .into());
2202    }
2203
2204    let pb_webhook_info = PbWebhookSourceInfo {
2205        secret_ref: pb_secret_ref,
2206        signature_expr: Some(expr.to_expr_proto()),
2207        wait_for_persistence,
2208        is_batched,
2209    };
2210
2211    Ok(pb_webhook_info)
2212}
2213
2214#[cfg(test)]
2215mod tests {
2216    use risingwave_common::catalog::{
2217        DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2218    };
2219    use risingwave_common::types::{DataType, StructType};
2220
2221    use super::*;
2222    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2223
2224    #[tokio::test]
2225    async fn test_create_table_handler() {
2226        let sql =
2227            "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2228        let frontend = LocalFrontend::new(Default::default()).await;
2229        frontend.run_sql(sql).await.unwrap();
2230
2231        let session = frontend.session_ref();
2232        let catalog_reader = session.env().catalog_reader().read_guard();
2233        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2234
2235        // Check table exists.
2236        let (table, _) = catalog_reader
2237            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2238            .unwrap();
2239        assert_eq!(table.name(), "t");
2240
2241        let columns = table
2242            .columns
2243            .iter()
2244            .map(|col| (col.name(), col.data_type().clone()))
2245            .collect::<HashMap<&str, DataType>>();
2246
2247        let expected_columns = maplit::hashmap! {
2248            ROW_ID_COLUMN_NAME => DataType::Serial,
2249            "v1" => DataType::Int16,
2250            "v2" => StructType::new(
2251                vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2252            )
2253            .with_ids([3, 4, 5].map(ColumnId::new))
2254            .into(),
2255            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2256        };
2257
2258        assert_eq!(columns, expected_columns, "{columns:#?}");
2259    }
2260
2261    #[test]
2262    fn test_bind_primary_key() {
2263        // Note: Column ID 0 is reserved for row ID column.
2264
2265        for (sql, expected) in [
2266            ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
2267            ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
2268            ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
2269            (
2270                "create table t (v1 int primary key, v2 int primary key)",
2271                Err("multiple primary keys are not allowed"),
2272            ),
2273            (
2274                "create table t (v1 int primary key primary key, v2 int)",
2275                Err("multiple primary keys are not allowed"),
2276            ),
2277            (
2278                "create table t (v1 int, v2 int, primary key (v1))",
2279                Ok(&[1]),
2280            ),
2281            (
2282                "create table t (v1 int, primary key (v2), v2 int)",
2283                Ok(&[2]),
2284            ),
2285            (
2286                "create table t (primary key (v2, v1), v1 int, v2 int)",
2287                Ok(&[2, 1]),
2288            ),
2289            (
2290                "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
2291                Err("multiple primary keys are not allowed"),
2292            ),
2293            (
2294                "create table t (v1 int primary key, primary key (v1), v2 int)",
2295                Err("multiple primary keys are not allowed"),
2296            ),
2297            (
2298                "create table t (v1 int, primary key (V3), v2 int)",
2299                Err("column \"v3\" named in key does not exist"),
2300            ),
2301        ] {
2302            let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
2303            let risingwave_sqlparser::ast::Statement::CreateTable {
2304                columns: column_defs,
2305                constraints,
2306                ..
2307            } = ast.remove(0)
2308            else {
2309                panic!("test case should be create table")
2310            };
2311            let actual: Result<_> = (|| {
2312                let mut columns = bind_sql_columns(&column_defs, false)?;
2313                let mut col_id_gen = ColumnIdGenerator::new_initial();
2314                for c in &mut columns {
2315                    col_id_gen.generate(c)?;
2316                }
2317
2318                let pk_names =
2319                    bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
2320                let (_, pk_column_ids, _) =
2321                    bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
2322                Ok(pk_column_ids)
2323            })();
2324            match (expected, actual) {
2325                (Ok(expected), Ok(actual)) => assert_eq!(
2326                    expected.iter().copied().map(ColumnId::new).collect_vec(),
2327                    actual,
2328                    "sql: {sql}"
2329                ),
2330                (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
2331                (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
2332                (Err(expected), Err(actual)) => assert!(
2333                    actual.to_string().contains(expected),
2334                    "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
2335                ),
2336            }
2337        }
2338    }
2339
2340    #[tokio::test]
2341    async fn test_duplicate_props_options() {
2342        let proto_file = create_proto_file(PROTO_FILE_DATA);
2343        let sql = format!(
2344            r#"CREATE TABLE t
2345    WITH (
2346        connector = 'kinesis',
2347        aws.region='user_test_topic',
2348        endpoint='172.10.1.1:9090,172.10.1.2:9090',
2349        aws.credentials.access_key_id = 'your_access_key_1',
2350        aws.credentials.secret_access_key = 'your_secret_key_1'
2351    )
2352    FORMAT PLAIN ENCODE PROTOBUF (
2353        message = '.test.TestRecord',
2354        aws.credentials.access_key_id = 'your_access_key_2',
2355        aws.credentials.secret_access_key = 'your_secret_key_2',
2356        schema.location = 'file://{}',
2357    )"#,
2358            proto_file.path().to_str().unwrap()
2359        );
2360        let frontend = LocalFrontend::new(Default::default()).await;
2361        frontend.run_sql(sql).await.unwrap();
2362
2363        let session = frontend.session_ref();
2364        let catalog_reader = session.env().catalog_reader().read_guard();
2365        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2366
2367        // Check source exists.
2368        let (source, _) = catalog_reader
2369            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2370            .unwrap();
2371        assert_eq!(source.name, "t");
2372
2373        // AwsAuth params exist in options.
2374        assert_eq!(
2375            source
2376                .info
2377                .format_encode_options
2378                .get("aws.credentials.access_key_id")
2379                .unwrap(),
2380            "your_access_key_2"
2381        );
2382        assert_eq!(
2383            source
2384                .info
2385                .format_encode_options
2386                .get("aws.credentials.secret_access_key")
2387                .unwrap(),
2388            "your_secret_key_2"
2389        );
2390
2391        // AwsAuth params exist in props.
2392        assert_eq!(
2393            source
2394                .with_properties
2395                .get("aws.credentials.access_key_id")
2396                .unwrap(),
2397            "your_access_key_1"
2398        );
2399        assert_eq!(
2400            source
2401                .with_properties
2402                .get("aws.credentials.secret_access_key")
2403                .unwrap(),
2404            "your_secret_key_1"
2405        );
2406
2407        // Options are not merged into props.
2408        assert!(!source.with_properties.contains_key("schema.location"));
2409    }
2410}