risingwave_frontend/handler/
create_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use clap::ValueEnum;
21use either::Either;
22use fixedbitset::FixedBitSet;
23use itertools::Itertools;
24use percent_encoding::percent_decode_str;
25use pgwire::pg_response::{PgResponse, StatementType};
26use prost::Message as _;
27use risingwave_common::catalog::{
28    CdcTableDesc, ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SCHEMA_NAME, Engine,
29    ObjectId, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, TableId,
30};
31use risingwave_common::config::MetaBackend;
32use risingwave_common::global_jvm::JVM;
33use risingwave_common::session_config::sink_decouple::SinkDecouple;
34use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
35use risingwave_common::util::value_encoding::DatumToProtoExt;
36use risingwave_common::{bail, bail_not_implemented};
37use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL;
38use risingwave_connector::source::cdc::build_cdc_table_id;
39use risingwave_connector::source::cdc::external::{
40    CdcTableType, DATABASE_NAME_KEY, ExternalTableConfig, ExternalTableImpl, SCHEMA_NAME_KEY,
41    TABLE_NAME_KEY,
42};
43use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, source};
44use risingwave_pb::catalog::connection::Info as ConnectionInfo;
45use risingwave_pb::catalog::connection_params::ConnectionType;
46use risingwave_pb::catalog::{PbSource, PbWebhookSourceInfo, WatermarkDesc};
47use risingwave_pb::ddl_service::{PbTableJobType, TableJobType};
48use risingwave_pb::meta::PbThrottleTarget;
49use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
50use risingwave_pb::plan_common::{
51    AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
52};
53use risingwave_pb::secret::PbSecretRef;
54use risingwave_pb::secret::secret_ref::PbRefAsType;
55use risingwave_pb::stream_plan::StreamFragmentGraph;
56use risingwave_sqlparser::ast::{
57    CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
58    CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
59    FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
60    Statement, TableConstraint, WebhookSourceInfo, WithProperties,
61};
62use risingwave_sqlparser::parser::{IncludeOption, Parser};
63
64use super::create_source::{CreateSourceType, SqlColumnStrategy, bind_columns_from_source};
65use super::{RwPgResponse, alter_streaming_rate_limit, create_sink, create_source};
66use crate::binder::{Clause, SecureCompareContext, bind_data_type};
67use crate::catalog::root_catalog::SchemaPath;
68use crate::catalog::source_catalog::SourceCatalog;
69use crate::catalog::table_catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, TableVersion};
70use crate::catalog::{ColumnId, DatabaseId, SchemaId, SourceId, check_column_name_not_reserved};
71use crate::error::{ErrorCode, Result, RwError, bail_bind_error};
72use crate::expr::{Expr, ExprImpl, ExprRewriter};
73use crate::handler::HandlerArgs;
74use crate::handler::create_source::{
75    UPSTREAM_SOURCE_KEY, bind_connector_props, bind_create_source_or_table_with_connector,
76    bind_source_watermark, handle_addition_columns,
77};
78use crate::handler::util::SourceSchemaCompatExt;
79use crate::optimizer::plan_node::generic::{SourceNodeKind, build_cdc_scan_options_with_options};
80use crate::optimizer::plan_node::{
81    LogicalCdcScan, LogicalPlanRef, LogicalSource, StreamPlanRef as PlanRef,
82};
83use crate::optimizer::property::{Order, RequiredDist};
84use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRoot};
85use crate::session::SessionImpl;
86use crate::session::current::notice_to_user;
87use crate::stream_fragmenter::{GraphJobType, build_graph};
88use crate::utils::OverwriteOptions;
89use crate::{Binder, Explain, TableCatalog, WithOptions};
90
91mod col_id_gen;
92pub use col_id_gen::*;
93use risingwave_connector::sink::iceberg::{
94    COMPACTION_INTERVAL_SEC, ENABLE_COMPACTION, ENABLE_SNAPSHOT_EXPIRATION,
95    ICEBERG_WRITE_MODE_COPY_ON_WRITE, ICEBERG_WRITE_MODE_MERGE_ON_READ, WRITE_MODE,
96    parse_partition_by_exprs,
97};
98
99use crate::handler::drop_table::handle_drop_table;
100
101fn ensure_column_options_supported(c: &ColumnDef) -> Result<()> {
102    for option_def in &c.options {
103        match option_def.option {
104            ColumnOption::GeneratedColumns(_) => {}
105            ColumnOption::DefaultValue(_) => {}
106            ColumnOption::DefaultValueInternal { .. } => {}
107            ColumnOption::Unique { is_primary: true } => {}
108            ColumnOption::Null => {}
109            ColumnOption::NotNull => {}
110            _ => bail_not_implemented!("column constraints \"{}\"", option_def),
111        }
112    }
113    Ok(())
114}
115
116/// Binds the column schemas declared in CREATE statement into `ColumnDesc`.
117/// If a column is marked as `primary key`, its `ColumnId` is also returned.
118/// This primary key is not combined with table constraints yet.
119pub fn bind_sql_columns(
120    column_defs: &[ColumnDef],
121    is_for_drop_table_connector: bool,
122) -> Result<Vec<ColumnCatalog>> {
123    let mut columns = Vec::with_capacity(column_defs.len());
124
125    for column in column_defs {
126        ensure_column_options_supported(column)?;
127        // Destruct to make sure all fields are properly handled rather than ignored.
128        // Do NOT use `..` to ignore fields you do not want to deal with.
129        // Reject them with a clear NotImplemented error.
130        let ColumnDef {
131            name,
132            data_type,
133            collation,
134            options,
135            ..
136        } = column;
137
138        let data_type = data_type
139            .clone()
140            .ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
141        if let Some(collation) = collation {
142            // PostgreSQL will limit the datatypes that collate can work on.
143            // https://www.postgresql.org/docs/16/collation.html#COLLATION-CONCEPTS
144            //   > The built-in collatable data types are `text`, `varchar`, and `char`.
145            //
146            // But we don't support real collation, we simply ignore it here.
147            if !["C", "POSIX"].contains(&collation.real_value().as_str()) {
148                bail_not_implemented!(
149                    "Collate collation other than `C` or `POSIX` is not implemented"
150                );
151            }
152
153            match data_type {
154                AstDataType::Text | AstDataType::Varchar | AstDataType::Char(_) => {}
155                _ => {
156                    return Err(ErrorCode::NotSupported(
157                        format!("{} is not a collatable data type", data_type),
158                        "The only built-in collatable data types are `varchar`, please check your type".into(),
159                    ).into());
160                }
161            }
162        }
163
164        if !is_for_drop_table_connector {
165            // additional column name may have prefix _rw
166            // When converting dropping the connector from table, the additional columns are converted to normal columns and keep the original name.
167            // Under this case, we loosen the check for _rw prefix.
168            check_column_name_not_reserved(&name.real_value())?;
169        }
170
171        let nullable: bool = !options
172            .iter()
173            .any(|def| matches!(def.option, ColumnOption::NotNull));
174
175        columns.push(ColumnCatalog {
176            column_desc: ColumnDesc {
177                data_type: bind_data_type(&data_type)?,
178                column_id: ColumnId::placeholder(),
179                name: name.real_value(),
180                generated_or_default_column: None,
181                description: None,
182                additional_column: AdditionalColumn { column_type: None },
183                version: ColumnDescVersion::LATEST,
184                system_column: None,
185                nullable,
186            },
187            is_hidden: false,
188        });
189    }
190
191    Ok(columns)
192}
193
194fn check_generated_column_constraints(
195    column_name: &String,
196    column_id: ColumnId,
197    expr: &ExprImpl,
198    column_catalogs: &[ColumnCatalog],
199    generated_column_names: &[String],
200    pk_column_ids: &[ColumnId],
201) -> Result<()> {
202    let input_refs = expr.collect_input_refs(column_catalogs.len());
203    for idx in input_refs.ones() {
204        let referred_generated_column = &column_catalogs[idx].column_desc.name;
205        if generated_column_names
206            .iter()
207            .any(|c| c == referred_generated_column)
208        {
209            return Err(ErrorCode::BindError(format!(
210                "Generated can not reference another generated column. \
211                But here generated column \"{}\" referenced another generated column \"{}\"",
212                column_name, referred_generated_column
213            ))
214            .into());
215        }
216    }
217
218    if pk_column_ids.contains(&column_id) && expr.is_impure() {
219        return Err(ErrorCode::BindError(format!(
220            "Generated columns with impure expressions should not be part of the primary key. \
221            Here column \"{}\" is defined as part of the primary key.",
222            column_name
223        ))
224        .into());
225    }
226
227    Ok(())
228}
229
230/// Binds constraints that can be only specified in column definitions,
231/// currently generated columns and default columns.
232pub fn bind_sql_column_constraints(
233    session: &SessionImpl,
234    table_name: String,
235    column_catalogs: &mut [ColumnCatalog],
236    columns: &[ColumnDef],
237    pk_column_ids: &[ColumnId],
238) -> Result<()> {
239    let generated_column_names = {
240        let mut names = vec![];
241        for column in columns {
242            for option_def in &column.options {
243                if let ColumnOption::GeneratedColumns(_) = option_def.option {
244                    names.push(column.name.real_value());
245                    break;
246                }
247            }
248        }
249        names
250    };
251
252    let mut binder = Binder::new_for_ddl(session);
253    binder.bind_columns_to_context(table_name.clone(), column_catalogs)?;
254
255    for column in columns {
256        let Some(idx) = column_catalogs
257            .iter()
258            .position(|c| c.name() == column.name.real_value())
259        else {
260            // It's possible that we don't follow the user defined columns in SQL but take the
261            // ones resolved from the source, thus missing some columns. Simply ignore them.
262            continue;
263        };
264
265        for option_def in &column.options {
266            match &option_def.option {
267                ColumnOption::GeneratedColumns(expr) => {
268                    binder.set_clause(Some(Clause::GeneratedColumn));
269
270                    let expr_impl = binder.bind_expr(expr).with_context(|| {
271                        format!(
272                            "fail to bind expression in generated column \"{}\"",
273                            column.name.real_value()
274                        )
275                    })?;
276
277                    check_generated_column_constraints(
278                        &column.name.real_value(),
279                        column_catalogs[idx].column_id(),
280                        &expr_impl,
281                        column_catalogs,
282                        &generated_column_names,
283                        pk_column_ids,
284                    )?;
285
286                    column_catalogs[idx].column_desc.generated_or_default_column = Some(
287                        GeneratedOrDefaultColumn::GeneratedColumn(GeneratedColumnDesc {
288                            expr: Some(expr_impl.to_expr_proto()),
289                        }),
290                    );
291                    binder.set_clause(None);
292                }
293                ColumnOption::DefaultValue(expr) => {
294                    let expr_impl = binder
295                        .bind_expr(expr)?
296                        .cast_assign(column_catalogs[idx].data_type())?;
297
298                    // Rewrite expressions to evaluate a snapshot value, used for missing values in the case of
299                    // schema change.
300                    //
301                    // TODO: Currently we don't support impure expressions other than `now()` (like `random()`),
302                    // so the rewritten expression should almost always be pure and we directly call `fold_const`
303                    // here. Actually we do not require purity of the expression here since we're only to get a
304                    // snapshot value.
305                    let rewritten_expr_impl = session
306                        .pinned_snapshot()
307                        .inline_now_proc_time()
308                        .rewrite_expr(expr_impl.clone());
309
310                    if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() {
311                        let snapshot_value = snapshot_value?;
312
313                        column_catalogs[idx].column_desc.generated_or_default_column =
314                            Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
315                                snapshot_value: Some(snapshot_value.to_protobuf()),
316                                expr: Some(expr_impl.to_expr_proto()),
317                                // persist the original expression
318                            }));
319                    } else {
320                        return Err(ErrorCode::BindError(format!(
321                            "Default expression used in column `{}` cannot be evaluated. \
322                            Use generated columns instead if you mean to reference other columns.",
323                            column.name
324                        ))
325                        .into());
326                    }
327                }
328                ColumnOption::DefaultValueInternal { persisted, expr: _ } => {
329                    // When a `DEFAULT INTERNAL` is used internally for schema change, the persisted value
330                    // should already be set during purifcation. So if we encounter an empty value here, it
331                    // means the user has specified it explicitly in the SQL statement, typically by
332                    // directly copying the result of `SHOW CREATE TABLE` and executing it.
333                    if persisted.is_empty() {
334                        bail_bind_error!(
335                            "DEFAULT INTERNAL is only used for internal purposes, \
336                             please specify a concrete default value"
337                        );
338                    }
339
340                    let desc = DefaultColumnDesc::decode(&**persisted)
341                        .expect("failed to decode persisted `DefaultColumnDesc`");
342
343                    column_catalogs[idx].column_desc.generated_or_default_column =
344                        Some(GeneratedOrDefaultColumn::DefaultColumn(desc));
345                }
346                _ => {}
347            }
348        }
349    }
350    Ok(())
351}
352
353/// Currently we only support Primary key table constraint, so just return pk names if it exists
354pub fn bind_table_constraints(table_constraints: &[TableConstraint]) -> Result<Vec<String>> {
355    let mut pk_column_names = vec![];
356
357    for constraint in table_constraints {
358        match constraint {
359            TableConstraint::Unique {
360                name: _,
361                columns,
362                is_primary: true,
363            } => {
364                if !pk_column_names.is_empty() {
365                    return Err(multiple_pk_definition_err());
366                }
367                pk_column_names = columns.iter().map(|c| c.real_value()).collect_vec();
368            }
369            _ => bail_not_implemented!("table constraint \"{}\"", constraint),
370        }
371    }
372    Ok(pk_column_names)
373}
374
375pub fn bind_sql_pk_names(
376    columns_defs: &[ColumnDef],
377    pk_names_from_table_constraints: Vec<String>,
378) -> Result<Vec<String>> {
379    let mut pk_column_names = pk_names_from_table_constraints;
380
381    for column in columns_defs {
382        for option_def in &column.options {
383            if let ColumnOption::Unique { is_primary: true } = option_def.option {
384                if !pk_column_names.is_empty() {
385                    return Err(multiple_pk_definition_err());
386                }
387                pk_column_names.push(column.name.real_value());
388            };
389        }
390    }
391
392    Ok(pk_column_names)
393}
394
395fn multiple_pk_definition_err() -> RwError {
396    ErrorCode::BindError("multiple primary keys are not allowed".into()).into()
397}
398
399/// Binds primary keys defined in SQL.
400///
401/// It returns the columns together with `pk_column_ids`, and an optional row id column index if
402/// added.
403pub fn bind_pk_and_row_id_on_relation(
404    mut columns: Vec<ColumnCatalog>,
405    pk_names: Vec<String>,
406    must_need_pk: bool,
407) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
408    for c in &columns {
409        assert!(c.column_id() != ColumnId::placeholder());
410    }
411
412    // Mapping from column name to column id.
413    let name_to_id = columns
414        .iter()
415        .map(|c| (c.name(), c.column_id()))
416        .collect::<HashMap<_, _>>();
417
418    let mut pk_column_ids: Vec<_> = pk_names
419        .iter()
420        .map(|name| {
421            name_to_id.get(name.as_str()).copied().ok_or_else(|| {
422                ErrorCode::BindError(format!("column \"{name}\" named in key does not exist"))
423            })
424        })
425        .try_collect()?;
426
427    // Add `_row_id` column if `pk_column_ids` is empty and must_need_pk
428    let need_row_id = pk_column_ids.is_empty() && must_need_pk;
429
430    let row_id_index = need_row_id.then(|| {
431        let column = ColumnCatalog::row_id_column();
432        let index = columns.len();
433        pk_column_ids = vec![column.column_id()];
434        columns.push(column);
435        index
436    });
437
438    if let Some(col) = columns.iter().map(|c| c.name()).duplicates().next() {
439        Err(ErrorCode::InvalidInputSyntax(format!(
440            "column \"{col}\" specified more than once"
441        )))?;
442    }
443
444    Ok((columns, pk_column_ids, row_id_index))
445}
446
447/// `gen_create_table_plan_with_source` generates the plan for creating a table with an external
448/// stream source.
449#[allow(clippy::too_many_arguments)]
450pub(crate) async fn gen_create_table_plan_with_source(
451    mut handler_args: HandlerArgs,
452    explain_options: ExplainOptions,
453    table_name: ObjectName,
454    column_defs: Vec<ColumnDef>,
455    wildcard_idx: Option<usize>,
456    constraints: Vec<TableConstraint>,
457    format_encode: FormatEncodeOptions,
458    source_watermarks: Vec<SourceWatermark>,
459    mut col_id_gen: ColumnIdGenerator,
460    include_column_options: IncludeOption,
461    props: CreateTableProps,
462    sql_column_strategy: SqlColumnStrategy,
463) -> Result<(PlanRef, Option<SourceCatalog>, TableCatalog)> {
464    if props.append_only
465        && format_encode.format != Format::Plain
466        && format_encode.format != Format::Native
467    {
468        return Err(ErrorCode::BindError(format!(
469            "Append only table does not support format {}.",
470            format_encode.format
471        ))
472        .into());
473    }
474
475    let session = &handler_args.session;
476    let with_properties = bind_connector_props(&handler_args, &format_encode, false)?;
477    if with_properties.is_shareable_cdc_connector() {
478        generated_columns_check_for_cdc_table(&column_defs)?;
479        not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
480    } else if column_defs.iter().any(|col| {
481        col.options
482            .iter()
483            .any(|def| matches!(def.option, ColumnOption::NotNull))
484    }) {
485        // if non-cdc source
486        notice_to_user(
487            "The table contains columns with NOT NULL constraints. Any rows from upstream violating the constraints will be ignored silently.",
488        );
489    }
490
491    let db_name: &str = &session.database();
492    let (schema_name, _) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
493
494    // TODO: omit this step if `sql_column_strategy` is `Follow`.
495    let (columns_from_resolve_source, source_info) = bind_columns_from_source(
496        session,
497        &format_encode,
498        Either::Left(&with_properties),
499        CreateSourceType::Table,
500    )
501    .await?;
502
503    let overwrite_options = OverwriteOptions::new(&mut handler_args);
504    let rate_limit = overwrite_options.source_rate_limit;
505    let source = bind_create_source_or_table_with_connector(
506        handler_args.clone(),
507        table_name,
508        format_encode,
509        with_properties,
510        &column_defs,
511        constraints,
512        wildcard_idx,
513        source_watermarks,
514        columns_from_resolve_source,
515        source_info,
516        include_column_options,
517        &mut col_id_gen,
518        CreateSourceType::Table,
519        rate_limit,
520        sql_column_strategy,
521    )
522    .await?;
523
524    let context = OptimizerContext::new(handler_args, explain_options);
525
526    let (plan, table) = gen_table_plan_with_source(
527        context.into(),
528        schema_name,
529        source.clone(),
530        col_id_gen.into_version(),
531        props,
532    )?;
533
534    Ok((plan, Some(source), table))
535}
536
537/// `gen_create_table_plan` generates the plan for creating a table without an external stream
538/// source.
539#[allow(clippy::too_many_arguments)]
540pub(crate) fn gen_create_table_plan(
541    context: OptimizerContext,
542    table_name: ObjectName,
543    column_defs: Vec<ColumnDef>,
544    constraints: Vec<TableConstraint>,
545    mut col_id_gen: ColumnIdGenerator,
546    source_watermarks: Vec<SourceWatermark>,
547    props: CreateTableProps,
548    is_for_replace_plan: bool,
549) -> Result<(PlanRef, TableCatalog)> {
550    let mut columns = bind_sql_columns(&column_defs, is_for_replace_plan)?;
551    for c in &mut columns {
552        col_id_gen.generate(c)?;
553    }
554
555    let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
556    if !secret_refs.is_empty() || !connection_refs.is_empty() {
557        return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_owned()).into());
558    }
559
560    gen_create_table_plan_without_source(
561        context,
562        table_name,
563        columns,
564        column_defs,
565        constraints,
566        source_watermarks,
567        col_id_gen.into_version(),
568        props,
569    )
570}
571
572#[allow(clippy::too_many_arguments)]
573pub(crate) fn gen_create_table_plan_without_source(
574    context: OptimizerContext,
575    table_name: ObjectName,
576    columns: Vec<ColumnCatalog>,
577    column_defs: Vec<ColumnDef>,
578    constraints: Vec<TableConstraint>,
579    source_watermarks: Vec<SourceWatermark>,
580    version: TableVersion,
581    props: CreateTableProps,
582) -> Result<(PlanRef, TableCatalog)> {
583    // XXX: Why not bind outside?
584    let pk_names = bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
585    let (mut columns, pk_column_ids, row_id_index) =
586        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
587
588    let watermark_descs: Vec<WatermarkDesc> = bind_source_watermark(
589        context.session_ctx(),
590        table_name.real_value(),
591        source_watermarks,
592        &columns,
593    )?;
594
595    bind_sql_column_constraints(
596        context.session_ctx(),
597        table_name.real_value(),
598        &mut columns,
599        &column_defs,
600        &pk_column_ids,
601    )?;
602    let session = context.session_ctx().clone();
603
604    let db_name = &session.database();
605    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
606
607    let info = CreateTableInfo {
608        columns,
609        pk_column_ids,
610        row_id_index,
611        watermark_descs,
612        source_catalog: None,
613        version,
614    };
615
616    gen_table_plan_inner(context.into(), schema_name, table_name, info, props)
617}
618
619fn gen_table_plan_with_source(
620    context: OptimizerContextRef,
621    schema_name: Option<String>,
622    source_catalog: SourceCatalog,
623    version: TableVersion,
624    props: CreateTableProps,
625) -> Result<(PlanRef, TableCatalog)> {
626    let table_name = source_catalog.name.clone();
627
628    let info = CreateTableInfo {
629        columns: source_catalog.columns.clone(),
630        pk_column_ids: source_catalog.pk_col_ids.clone(),
631        row_id_index: source_catalog.row_id_index,
632        watermark_descs: source_catalog.watermark_descs.clone(),
633        source_catalog: Some(source_catalog),
634        version,
635    };
636
637    gen_table_plan_inner(context, schema_name, table_name, info, props)
638}
639
640/// On-conflict behavior either from user input or existing table catalog.
641#[derive(Clone, Copy)]
642pub enum EitherOnConflict {
643    Ast(Option<OnConflict>),
644    Resolved(ConflictBehavior),
645}
646
647impl From<Option<OnConflict>> for EitherOnConflict {
648    fn from(v: Option<OnConflict>) -> Self {
649        Self::Ast(v)
650    }
651}
652
653impl From<ConflictBehavior> for EitherOnConflict {
654    fn from(v: ConflictBehavior) -> Self {
655        Self::Resolved(v)
656    }
657}
658
659impl EitherOnConflict {
660    /// Resolves the conflict behavior based on the given information.
661    pub fn to_behavior(self, append_only: bool, row_id_as_pk: bool) -> Result<ConflictBehavior> {
662        let conflict_behavior = match self {
663            EitherOnConflict::Ast(on_conflict) => {
664                if append_only {
665                    if row_id_as_pk {
666                        // Primary key will be generated, no conflict check needed.
667                        ConflictBehavior::NoCheck
668                    } else {
669                        // User defined PK on append-only table, enforce `DO NOTHING`.
670                        if let Some(on_conflict) = on_conflict
671                            && on_conflict != OnConflict::Nothing
672                        {
673                            return Err(ErrorCode::InvalidInputSyntax(
674                                "When PRIMARY KEY constraint applied to an APPEND ONLY table, \
675                                     the ON CONFLICT behavior must be DO NOTHING."
676                                    .to_owned(),
677                            )
678                            .into());
679                        }
680                        ConflictBehavior::IgnoreConflict
681                    }
682                } else {
683                    // Default to `UPDATE FULL` for non-append-only tables.
684                    match on_conflict.unwrap_or(OnConflict::UpdateFull) {
685                        OnConflict::UpdateFull => ConflictBehavior::Overwrite,
686                        OnConflict::Nothing => ConflictBehavior::IgnoreConflict,
687                        OnConflict::UpdateIfNotNull => ConflictBehavior::DoUpdateIfNotNull,
688                    }
689                }
690            }
691            EitherOnConflict::Resolved(b) => b,
692        };
693
694        Ok(conflict_behavior)
695    }
696}
697
698/// Arguments of the functions that generate a table plan, part 1.
699///
700/// Compared to [`CreateTableProps`], this struct contains fields that need some work of binding
701/// or resolving based on the user input.
702pub struct CreateTableInfo {
703    pub columns: Vec<ColumnCatalog>,
704    pub pk_column_ids: Vec<ColumnId>,
705    pub row_id_index: Option<usize>,
706    pub watermark_descs: Vec<WatermarkDesc>,
707    pub source_catalog: Option<SourceCatalog>,
708    pub version: TableVersion,
709}
710
711/// Arguments of the functions that generate a table plan, part 2.
712///
713/// Compared to [`CreateTableInfo`], this struct contains fields that can be (relatively) simply
714/// obtained from the input or the context.
715pub struct CreateTableProps {
716    pub definition: String,
717    pub append_only: bool,
718    pub on_conflict: EitherOnConflict,
719    pub with_version_column: Option<String>,
720    pub webhook_info: Option<PbWebhookSourceInfo>,
721    pub engine: Engine,
722}
723
724#[allow(clippy::too_many_arguments)]
725fn gen_table_plan_inner(
726    context: OptimizerContextRef,
727    schema_name: Option<String>,
728    table_name: String,
729    info: CreateTableInfo,
730    props: CreateTableProps,
731) -> Result<(PlanRef, TableCatalog)> {
732    let CreateTableInfo {
733        ref columns,
734        row_id_index,
735        ref watermark_descs,
736        ref source_catalog,
737        ..
738    } = info;
739    let CreateTableProps { append_only, .. } = props;
740
741    let (database_id, schema_id) = context
742        .session_ctx()
743        .get_database_and_schema_id_for_create(schema_name)?;
744
745    let session = context.session_ctx().clone();
746    let retention_seconds = context.with_options().retention_seconds();
747
748    let source_node: LogicalPlanRef = LogicalSource::new(
749        source_catalog.clone().map(Rc::new),
750        columns.clone(),
751        row_id_index,
752        SourceNodeKind::CreateTable,
753        context.clone(),
754        None,
755    )?
756    .into();
757
758    let required_cols = FixedBitSet::with_capacity(columns.len());
759    let plan_root = PlanRoot::new_with_logical_plan(
760        source_node,
761        RequiredDist::Any,
762        Order::any(),
763        required_cols,
764        vec![],
765    );
766
767    if !append_only && !watermark_descs.is_empty() {
768        return Err(ErrorCode::NotSupported(
769            "Defining watermarks on table requires the table to be append only.".to_owned(),
770            "Use the key words `APPEND ONLY`".to_owned(),
771        )
772        .into());
773    }
774
775    if !append_only && retention_seconds.is_some() {
776        return Err(ErrorCode::NotSupported(
777            "Defining retention seconds on table requires the table to be append only.".to_owned(),
778            "Use the key words `APPEND ONLY`".to_owned(),
779        )
780        .into());
781    }
782
783    let materialize =
784        plan_root.gen_table_plan(context, table_name, database_id, schema_id, info, props)?;
785
786    let mut table = materialize.table().clone();
787    table.owner = session.user_id();
788
789    Ok((materialize.into(), table))
790}
791
792/// Generate stream plan for cdc table based on shared source.
793/// In replace workflow, the `table_id` is the id of the table to be replaced
794/// in create table workflow, the `table_id` is a placeholder will be filled in the Meta
795#[allow(clippy::too_many_arguments)]
796pub(crate) fn gen_create_table_plan_for_cdc_table(
797    context: OptimizerContextRef,
798    source: Arc<SourceCatalog>,
799    external_table_name: String,
800    column_defs: Vec<ColumnDef>,
801    mut columns: Vec<ColumnCatalog>,
802    pk_names: Vec<String>,
803    cdc_with_options: WithOptionsSecResolved,
804    mut col_id_gen: ColumnIdGenerator,
805    on_conflict: Option<OnConflict>,
806    with_version_column: Option<String>,
807    include_column_options: IncludeOption,
808    table_name: ObjectName,
809    resolved_table_name: String, // table name without schema prefix
810    database_id: DatabaseId,
811    schema_id: SchemaId,
812    table_id: TableId,
813    engine: Engine,
814) -> Result<(PlanRef, TableCatalog)> {
815    let session = context.session_ctx().clone();
816
817    // append additional columns to the end
818    handle_addition_columns(
819        None,
820        &cdc_with_options,
821        include_column_options,
822        &mut columns,
823        true,
824    )?;
825
826    for c in &mut columns {
827        col_id_gen.generate(c)?;
828    }
829
830    let (mut columns, pk_column_ids, _row_id_index) =
831        bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
832
833    // NOTES: In auto schema change, default value is not provided in column definition.
834    bind_sql_column_constraints(
835        context.session_ctx(),
836        table_name.real_value(),
837        &mut columns,
838        &column_defs,
839        &pk_column_ids,
840    )?;
841
842    let definition = context.normalized_sql().to_owned();
843
844    let pk_column_indices = {
845        let mut id_to_idx = HashMap::new();
846        columns.iter().enumerate().for_each(|(idx, c)| {
847            id_to_idx.insert(c.column_id(), idx);
848        });
849        // pk column id must exist in table columns.
850        pk_column_ids
851            .iter()
852            .map(|c| id_to_idx.get(c).copied().unwrap())
853            .collect_vec()
854    };
855    let table_pk = pk_column_indices
856        .iter()
857        .map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
858        .collect();
859
860    let (options, secret_refs) = cdc_with_options.into_parts();
861
862    let non_generated_column_descs = columns
863        .iter()
864        .filter(|&c| (!c.is_generated()))
865        .map(|c| c.column_desc.clone())
866        .collect_vec();
867    let non_generated_column_num = non_generated_column_descs.len();
868    let cdc_table_type = CdcTableType::from_properties(&options);
869    let cdc_table_desc = CdcTableDesc {
870        table_id,
871        source_id: source.id.into(), // id of cdc source streaming job
872        external_table_name: external_table_name.clone(),
873        pk: table_pk,
874        columns: non_generated_column_descs,
875        stream_key: pk_column_indices,
876        connect_properties: options,
877        secret_refs,
878    };
879
880    tracing::debug!(?cdc_table_desc, "create cdc table");
881    let options = build_cdc_scan_options_with_options(context.with_options(), cdc_table_type)?;
882
883    let logical_scan = LogicalCdcScan::create(
884        external_table_name.clone(),
885        Rc::new(cdc_table_desc),
886        context.clone(),
887        options,
888    );
889
890    let scan_node: LogicalPlanRef = logical_scan.into();
891    let required_cols = FixedBitSet::with_capacity(non_generated_column_num);
892    let plan_root = PlanRoot::new_with_logical_plan(
893        scan_node,
894        RequiredDist::Any,
895        Order::any(),
896        required_cols,
897        vec![],
898    );
899
900    let cdc_table_id = build_cdc_table_id(source.id, &external_table_name);
901    let materialize = plan_root.gen_table_plan(
902        context,
903        resolved_table_name,
904        database_id,
905        schema_id,
906        CreateTableInfo {
907            columns,
908            pk_column_ids,
909            row_id_index: None,
910            watermark_descs: vec![],
911            source_catalog: Some((*source).clone()),
912            version: col_id_gen.into_version(),
913        },
914        CreateTableProps {
915            definition,
916            append_only: false,
917            on_conflict: on_conflict.into(),
918            with_version_column,
919            webhook_info: None,
920            engine,
921        },
922    )?;
923
924    let mut table = materialize.table().clone();
925    table.owner = session.user_id();
926    table.cdc_table_id = Some(cdc_table_id);
927
928    Ok((materialize.into(), table))
929}
930
931fn derive_with_options_for_cdc_table(
932    source_with_properties: &WithOptionsSecResolved,
933    external_table_name: String,
934) -> Result<WithOptionsSecResolved> {
935    use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
936    // we should remove the prefix from `full_table_name`
937    let source_database_name: &str = source_with_properties
938        .get("database.name")
939        .ok_or_else(|| anyhow!("The source with properties does not contain 'database.name'"))?
940        .as_str();
941    let mut with_options = source_with_properties.clone();
942    if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
943        match connector.as_str() {
944            MYSQL_CDC_CONNECTOR => {
945                // MySQL doesn't allow '.' in database name and table name, so we can split the
946                // external table name by '.' to get the table name
947                let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
948                    anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
949                })?;
950                // We allow multiple database names in the source definition
951                if !source_database_name
952                    .split(',')
953                    .map(|s| s.trim())
954                    .any(|name| name == db_name)
955                {
956                    return Err(anyhow!(
957                        "The database name `{}` in the FROM clause is not included in the database name `{}` in source definition",
958                        db_name,
959                        source_database_name
960                    ).into());
961                }
962                with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
963                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
964            }
965            POSTGRES_CDC_CONNECTOR => {
966                let (schema_name, table_name) = external_table_name
967                    .split_once('.')
968                    .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;
969
970                // insert 'schema.name' into connect properties
971                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
972                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
973            }
974            SQL_SERVER_CDC_CONNECTOR => {
975                // SQL Server external table name can be in different formats:
976                // 1. 'databaseName.schemaName.tableName' (full format)
977                // 2. 'schemaName.tableName' (schema and table only)
978                // 3. 'tableName' (table only, will use default schema 'dbo')
979                // We will auto-fill missing parts from source configuration
980                let parts: Vec<&str> = external_table_name.split('.').collect();
981                let (_, schema_name, table_name) = match parts.len() {
982                    3 => {
983                        // Full format: database.schema.table
984                        let db_name = parts[0];
985                        let schema_name = parts[1];
986                        let table_name = parts[2];
987
988                        // Verify database name matches source configuration
989                        if db_name != source_database_name {
990                            return Err(anyhow!(
991                                "The database name `{}` in the FROM clause is not the same as the database name `{}` in source definition",
992                                db_name,
993                                source_database_name
994                            ).into());
995                        }
996                        (db_name, schema_name, table_name)
997                    }
998                    2 => {
999                        // Schema and table only: schema.table
1000                        let schema_name = parts[0];
1001                        let table_name = parts[1];
1002                        (source_database_name, schema_name, table_name)
1003                    }
1004                    1 => {
1005                        // Table only: table (use default schema 'dbo')
1006                        let table_name = parts[0];
1007                        (source_database_name, "dbo", table_name)
1008                    }
1009                    _ => {
1010                        return Err(anyhow!(
1011                            "The upstream table name must be in one of these formats: 'database.schema.table', 'schema.table', or 'table'"
1012                        ).into());
1013                    }
1014                };
1015
1016                // insert 'schema.name' into connect properties
1017                with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
1018                with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
1019            }
1020            _ => {
1021                return Err(RwError::from(anyhow!(
1022                    "connector {} is not supported for cdc table",
1023                    connector
1024                )));
1025            }
1026        };
1027    }
1028    Ok(with_options)
1029}
1030
1031#[allow(clippy::too_many_arguments)]
1032pub(super) async fn handle_create_table_plan(
1033    handler_args: HandlerArgs,
1034    explain_options: ExplainOptions,
1035    format_encode: Option<FormatEncodeOptions>,
1036    cdc_table_info: Option<CdcTableInfo>,
1037    table_name: &ObjectName,
1038    column_defs: Vec<ColumnDef>,
1039    wildcard_idx: Option<usize>,
1040    constraints: Vec<TableConstraint>,
1041    source_watermarks: Vec<SourceWatermark>,
1042    append_only: bool,
1043    on_conflict: Option<OnConflict>,
1044    with_version_column: Option<String>,
1045    include_column_options: IncludeOption,
1046    webhook_info: Option<WebhookSourceInfo>,
1047    engine: Engine,
1048) -> Result<(
1049    PlanRef,
1050    Option<SourceCatalog>,
1051    TableCatalog,
1052    TableJobType,
1053    Option<SourceId>,
1054)> {
1055    let col_id_gen = ColumnIdGenerator::new_initial();
1056    let format_encode = check_create_table_with_source(
1057        &handler_args.with_options,
1058        format_encode,
1059        &include_column_options,
1060        &cdc_table_info,
1061    )?;
1062    let webhook_info = webhook_info
1063        .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info))
1064        .transpose()?;
1065
1066    let props = CreateTableProps {
1067        definition: handler_args.normalized_sql.clone(),
1068        append_only,
1069        on_conflict: on_conflict.into(),
1070        with_version_column: with_version_column.clone(),
1071        webhook_info,
1072        engine,
1073    };
1074
1075    let ((plan, source, table), job_type, shared_shource_id) = match (
1076        format_encode,
1077        cdc_table_info.as_ref(),
1078    ) {
1079        (Some(format_encode), None) => (
1080            gen_create_table_plan_with_source(
1081                handler_args,
1082                explain_options,
1083                table_name.clone(),
1084                column_defs,
1085                wildcard_idx,
1086                constraints,
1087                format_encode,
1088                source_watermarks,
1089                col_id_gen,
1090                include_column_options,
1091                props,
1092                SqlColumnStrategy::FollowChecked,
1093            )
1094            .await?,
1095            TableJobType::General,
1096            None,
1097        ),
1098        (None, None) => {
1099            let context = OptimizerContext::new(handler_args, explain_options);
1100            let (plan, table) = gen_create_table_plan(
1101                context,
1102                table_name.clone(),
1103                column_defs,
1104                constraints,
1105                col_id_gen,
1106                source_watermarks,
1107                props,
1108                false,
1109            )?;
1110
1111            ((plan, None, table), TableJobType::General, None)
1112        }
1113
1114        (None, Some(cdc_table)) => {
1115            sanity_check_for_table_on_cdc_source(
1116                append_only,
1117                &column_defs,
1118                &wildcard_idx,
1119                &constraints,
1120                &source_watermarks,
1121            )?;
1122
1123            generated_columns_check_for_cdc_table(&column_defs)?;
1124            not_null_check_for_cdc_table(&wildcard_idx, &column_defs)?;
1125
1126            let session = &handler_args.session;
1127            let db_name = &session.database();
1128            let user_name = &session.user_name();
1129            let search_path = session.config().search_path();
1130            let (schema_name, resolved_table_name) =
1131                Binder::resolve_schema_qualified_name(db_name, table_name)?;
1132            let (database_id, schema_id) =
1133                session.get_database_and_schema_id_for_create(schema_name.clone())?;
1134
1135            // cdc table cannot be append-only
1136            let (format_encode, source_name) =
1137                Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
1138
1139            let source = {
1140                let catalog_reader = session.env().catalog_reader().read_guard();
1141                let schema_path =
1142                    SchemaPath::new(format_encode.as_deref(), &search_path, user_name);
1143
1144                let (source, _) = catalog_reader.get_source_by_name(
1145                    db_name,
1146                    schema_path,
1147                    source_name.as_str(),
1148                )?;
1149                source.clone()
1150            };
1151            let cdc_with_options: WithOptionsSecResolved = derive_with_options_for_cdc_table(
1152                &source.with_properties,
1153                cdc_table.external_table_name.clone(),
1154            )?;
1155
1156            let (columns, pk_names) = match wildcard_idx {
1157                Some(_) => bind_cdc_table_schema_externally(cdc_with_options.clone()).await?,
1158                None => {
1159                    for column_def in &column_defs {
1160                        for option_def in &column_def.options {
1161                            if let ColumnOption::DefaultValue(_)
1162                            | ColumnOption::DefaultValueInternal { .. } = option_def.option
1163                            {
1164                                return Err(ErrorCode::NotSupported(
1165                                            "Default value for columns defined on the table created from a CDC source".into(),
1166                                            "Remove the default value expression in the column definitions".into(),
1167                                        )
1168                                            .into());
1169                            }
1170                        }
1171                    }
1172
1173                    let (columns, pk_names) =
1174                        bind_cdc_table_schema(&column_defs, &constraints, false)?;
1175                    // read default value definition from external db
1176                    let (options, secret_refs) = cdc_with_options.clone().into_parts();
1177                    let _config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1178                        .context("failed to extract external table config")?;
1179
1180                    // NOTE: if the external table has a default column, we will only treat it as a normal column.
1181                    (columns, pk_names)
1182                }
1183            };
1184
1185            let context: OptimizerContextRef =
1186                OptimizerContext::new(handler_args, explain_options).into();
1187            let shared_source_id = source.id;
1188            let (plan, table) = gen_create_table_plan_for_cdc_table(
1189                context,
1190                source,
1191                cdc_table.external_table_name.clone(),
1192                column_defs,
1193                columns,
1194                pk_names,
1195                cdc_with_options,
1196                col_id_gen,
1197                on_conflict,
1198                with_version_column,
1199                include_column_options,
1200                table_name.clone(),
1201                resolved_table_name,
1202                database_id,
1203                schema_id,
1204                TableId::placeholder(),
1205                engine,
1206            )?;
1207
1208            (
1209                (plan, None, table),
1210                TableJobType::SharedCdcSource,
1211                Some(shared_source_id),
1212            )
1213        }
1214        (Some(_), Some(_)) => {
1215            return Err(ErrorCode::NotSupported(
1216                "Data format and encoding format doesn't apply to table created from a CDC source"
1217                    .into(),
1218                "Remove the FORMAT and ENCODE specification".into(),
1219            )
1220            .into());
1221        }
1222    };
1223    Ok((plan, source, table, job_type, shared_shource_id))
1224}
1225
1226// For both table from cdc source and table with cdc connector
1227fn generated_columns_check_for_cdc_table(columns: &Vec<ColumnDef>) -> Result<()> {
1228    let mut found_generated_column = false;
1229    for column in columns {
1230        let mut is_generated = false;
1231
1232        for option_def in &column.options {
1233            if let ColumnOption::GeneratedColumns(_) = option_def.option {
1234                is_generated = true;
1235                break;
1236            }
1237        }
1238
1239        if is_generated {
1240            found_generated_column = true;
1241        } else if found_generated_column {
1242            return Err(ErrorCode::NotSupported(
1243                "Non-generated column found after a generated column.".into(),
1244                "Ensure that all generated columns appear at the end of the cdc table definition."
1245                    .into(),
1246            )
1247            .into());
1248        }
1249    }
1250    Ok(())
1251}
1252
1253// For both table from cdc source and table with cdc connector
1254fn not_null_check_for_cdc_table(
1255    wildcard_idx: &Option<usize>,
1256    column_defs: &Vec<ColumnDef>,
1257) -> Result<()> {
1258    if !wildcard_idx.is_some()
1259        && column_defs.iter().any(|col| {
1260            col.options
1261                .iter()
1262                .any(|opt| matches!(opt.option, ColumnOption::NotNull))
1263        })
1264    {
1265        return Err(ErrorCode::NotSupported(
1266            "CDC table with NOT NULL constraint is not supported".to_owned(),
1267            "Please remove the NOT NULL constraint for columns".to_owned(),
1268        )
1269        .into());
1270    }
1271    Ok(())
1272}
1273
1274// Only for table from cdc source
1275fn sanity_check_for_table_on_cdc_source(
1276    append_only: bool,
1277    column_defs: &Vec<ColumnDef>,
1278    wildcard_idx: &Option<usize>,
1279    constraints: &Vec<TableConstraint>,
1280    source_watermarks: &Vec<SourceWatermark>,
1281) -> Result<()> {
1282    // wildcard cannot be used with column definitions
1283    if wildcard_idx.is_some() && !column_defs.is_empty() {
1284        return Err(ErrorCode::NotSupported(
1285            "wildcard(*) and column definitions cannot be used together".to_owned(),
1286            "Remove the wildcard or column definitions".to_owned(),
1287        )
1288        .into());
1289    }
1290
1291    // cdc table must have primary key constraint or primary key column
1292    if !wildcard_idx.is_some()
1293        && !constraints.iter().any(|c| {
1294            matches!(
1295                c,
1296                TableConstraint::Unique {
1297                    is_primary: true,
1298                    ..
1299                }
1300            )
1301        })
1302        && !column_defs.iter().any(|col| {
1303            col.options
1304                .iter()
1305                .any(|opt| matches!(opt.option, ColumnOption::Unique { is_primary: true }))
1306        })
1307    {
1308        return Err(ErrorCode::NotSupported(
1309            "CDC table without primary key constraint is not supported".to_owned(),
1310            "Please define a primary key".to_owned(),
1311        )
1312        .into());
1313    }
1314
1315    if append_only {
1316        return Err(ErrorCode::NotSupported(
1317            "append only modifier on the table created from a CDC source".into(),
1318            "Remove the APPEND ONLY clause".into(),
1319        )
1320        .into());
1321    }
1322
1323    if !source_watermarks.is_empty() {
1324        return Err(ErrorCode::NotSupported(
1325            "watermark defined on the table created from a CDC source".into(),
1326            "Remove the Watermark definitions".into(),
1327        )
1328        .into());
1329    }
1330
1331    Ok(())
1332}
1333
1334/// Derive schema for cdc table when create a new Table or alter an existing Table
1335async fn bind_cdc_table_schema_externally(
1336    cdc_with_options: WithOptionsSecResolved,
1337) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1338    // read cdc table schema from external db or parsing the schema from SQL definitions
1339    let (options, secret_refs) = cdc_with_options.into_parts();
1340    let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
1341        .context("failed to extract external table config")?;
1342
1343    let table = ExternalTableImpl::connect(config)
1344        .await
1345        .context("failed to auto derive table schema")?;
1346    Ok((
1347        table
1348            .column_descs()
1349            .iter()
1350            .cloned()
1351            .map(|column_desc| ColumnCatalog {
1352                column_desc,
1353                is_hidden: false,
1354            })
1355            .collect(),
1356        table.pk_names().clone(),
1357    ))
1358}
1359
1360/// Derive schema for cdc table when create a new Table or alter an existing Table
1361fn bind_cdc_table_schema(
1362    column_defs: &Vec<ColumnDef>,
1363    constraints: &Vec<TableConstraint>,
1364    is_for_replace_plan: bool,
1365) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
1366    let columns = bind_sql_columns(column_defs, is_for_replace_plan)?;
1367
1368    let pk_names = bind_sql_pk_names(column_defs, bind_table_constraints(constraints)?)?;
1369    Ok((columns, pk_names))
1370}
1371
1372#[allow(clippy::too_many_arguments)]
1373pub async fn handle_create_table(
1374    mut handler_args: HandlerArgs,
1375    table_name: ObjectName,
1376    column_defs: Vec<ColumnDef>,
1377    wildcard_idx: Option<usize>,
1378    constraints: Vec<TableConstraint>,
1379    if_not_exists: bool,
1380    format_encode: Option<FormatEncodeOptions>,
1381    source_watermarks: Vec<SourceWatermark>,
1382    append_only: bool,
1383    on_conflict: Option<OnConflict>,
1384    with_version_column: Option<String>,
1385    cdc_table_info: Option<CdcTableInfo>,
1386    include_column_options: IncludeOption,
1387    webhook_info: Option<WebhookSourceInfo>,
1388    ast_engine: risingwave_sqlparser::ast::Engine,
1389) -> Result<RwPgResponse> {
1390    let session = handler_args.session.clone();
1391
1392    if append_only {
1393        session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
1394    }
1395
1396    session.check_cluster_limits().await?;
1397
1398    let engine = match ast_engine {
1399        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
1400        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
1401    };
1402    if engine == Engine::Iceberg && handler_args.with_options.get_connector().is_some() {
1403        // HACK: since we don't have atomic DDL, table with connector may lose data.
1404        // FIXME: remove this after https://github.com/risingwavelabs/risingwave/issues/21863
1405        if let Some(_rate_limit) = handler_args.with_options.insert(
1406            OverwriteOptions::SOURCE_RATE_LIMIT_KEY.to_owned(),
1407            "0".to_owned(),
1408        ) {
1409            // prevent user specified rate limit
1410            return Err(ErrorCode::NotSupported(
1411                "source_rate_limit for iceberg table engine during table creation".to_owned(),
1412                "Please remove source_rate_limit from WITH options.".to_owned(),
1413            )
1414            .into());
1415        }
1416    }
1417
1418    if let Either::Right(resp) = session.check_relation_name_duplicated(
1419        table_name.clone(),
1420        StatementType::CREATE_TABLE,
1421        if_not_exists,
1422    )? {
1423        return Ok(resp);
1424    }
1425
1426    let (graph, source, hummock_table, job_type, shared_source_id) = {
1427        let (plan, source, table, job_type, shared_source_id) = handle_create_table_plan(
1428            handler_args.clone(),
1429            ExplainOptions::default(),
1430            format_encode,
1431            cdc_table_info,
1432            &table_name,
1433            column_defs.clone(),
1434            wildcard_idx,
1435            constraints.clone(),
1436            source_watermarks,
1437            append_only,
1438            on_conflict,
1439            with_version_column,
1440            include_column_options,
1441            webhook_info,
1442            engine,
1443        )
1444        .await?;
1445        tracing::trace!("table_plan: {:?}", plan.explain_to_string());
1446
1447        let graph = build_graph(plan, Some(GraphJobType::Table))?;
1448
1449        (graph, source, table, job_type, shared_source_id)
1450    };
1451
1452    tracing::trace!(
1453        "name={}, graph=\n{}",
1454        table_name,
1455        serde_json::to_string_pretty(&graph).unwrap()
1456    );
1457
1458    let dependencies = shared_source_id
1459        .map(|id| HashSet::from([id as ObjectId]))
1460        .unwrap_or_default();
1461
1462    // Handle engine
1463    match engine {
1464        Engine::Hummock => {
1465            let catalog_writer = session.catalog_writer()?;
1466            catalog_writer
1467                .create_table(
1468                    source.map(|s| s.to_prost()),
1469                    hummock_table.to_prost(),
1470                    graph,
1471                    job_type,
1472                    if_not_exists,
1473                    dependencies,
1474                )
1475                .await?;
1476        }
1477        Engine::Iceberg => {
1478            create_iceberg_engine_table(
1479                session,
1480                handler_args,
1481                source.map(|s| s.to_prost()),
1482                hummock_table,
1483                graph,
1484                table_name,
1485                job_type,
1486                if_not_exists,
1487            )
1488            .await?;
1489        }
1490    }
1491
1492    Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
1493}
1494
1495/// Iceberg table engine is composed of hummock table, iceberg sink and iceberg source.
1496///
1497/// 1. fetch iceberg engine options from the meta node. Or use iceberg engine connection provided by users.
1498/// 2. create a hummock table
1499/// 3. create an iceberg sink
1500/// 4. create an iceberg source
1501///
1502/// See <https://github.com/risingwavelabs/risingwave/issues/21586> for an architecture diagram.
1503#[allow(clippy::too_many_arguments)]
1504pub async fn create_iceberg_engine_table(
1505    session: Arc<SessionImpl>,
1506    handler_args: HandlerArgs,
1507    mut source: Option<PbSource>,
1508    table: TableCatalog,
1509    graph: StreamFragmentGraph,
1510    table_name: ObjectName,
1511    job_type: PbTableJobType,
1512    if_not_exists: bool,
1513) -> Result<()> {
1514    let meta_client = session.env().meta_client();
1515    let meta_store_endpoint = meta_client.get_meta_store_endpoint().await?;
1516
1517    let meta_store_endpoint = url::Url::parse(&meta_store_endpoint).map_err(|_| {
1518        ErrorCode::InternalError("failed to parse the meta store endpoint".to_owned())
1519    })?;
1520    let meta_store_backend = meta_store_endpoint.scheme().to_owned();
1521    let meta_store_user = meta_store_endpoint.username().to_owned();
1522    let meta_store_password = match meta_store_endpoint.password() {
1523        Some(password) => percent_decode_str(password)
1524            .decode_utf8()
1525            .map_err(|_| {
1526                ErrorCode::InternalError(
1527                    "failed to parse password from meta store endpoint".to_owned(),
1528                )
1529            })?
1530            .into_owned(),
1531        None => "".to_owned(),
1532    };
1533    let meta_store_host = meta_store_endpoint
1534        .host_str()
1535        .ok_or_else(|| {
1536            ErrorCode::InternalError("failed to parse host from meta store endpoint".to_owned())
1537        })?
1538        .to_owned();
1539    let meta_store_port = meta_store_endpoint.port().ok_or_else(|| {
1540        ErrorCode::InternalError("failed to parse port from meta store endpoint".to_owned())
1541    })?;
1542    let meta_store_database = meta_store_endpoint
1543        .path()
1544        .trim_start_matches('/')
1545        .to_owned();
1546
1547    let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
1548        bail!("failed to parse meta backend: {}", meta_store_backend);
1549    };
1550
1551    let catalog_uri = match meta_backend {
1552        MetaBackend::Postgres => {
1553            format!(
1554                "jdbc:postgresql://{}:{}/{}",
1555                meta_store_host.clone(),
1556                meta_store_port.clone(),
1557                meta_store_database.clone()
1558            )
1559        }
1560        MetaBackend::Mysql => {
1561            format!(
1562                "jdbc:mysql://{}:{}/{}",
1563                meta_store_host.clone(),
1564                meta_store_port.clone(),
1565                meta_store_database.clone()
1566            )
1567        }
1568        MetaBackend::Sqlite | MetaBackend::Sql | MetaBackend::Mem => {
1569            bail!(
1570                "Unsupported meta backend for iceberg engine table: {}",
1571                meta_store_backend
1572            );
1573        }
1574    };
1575
1576    let rw_db_name = session
1577        .env()
1578        .catalog_reader()
1579        .read_guard()
1580        .get_database_by_id(&table.database_id)?
1581        .name()
1582        .to_owned();
1583    let rw_schema_name = session
1584        .env()
1585        .catalog_reader()
1586        .read_guard()
1587        .get_schema_by_id(&table.database_id, &table.schema_id)?
1588        .name()
1589        .clone();
1590    let iceberg_catalog_name = rw_db_name.clone();
1591    let iceberg_database_name = rw_schema_name.clone();
1592    let iceberg_table_name = table_name.0.last().unwrap().real_value();
1593
1594    let iceberg_engine_connection: String = session.config().iceberg_engine_connection();
1595    let sink_decouple = session.config().sink_decouple();
1596    if matches!(sink_decouple, SinkDecouple::Disable) {
1597        bail!(
1598            "Iceberg engine table only supports with sink decouple, try `set sink_decouple = true` to resolve it"
1599        );
1600    }
1601
1602    let mut connection_ref = BTreeMap::new();
1603    let with_common = if iceberg_engine_connection.is_empty() {
1604        bail!("to use iceberg engine table, the variable `iceberg_engine_connection` must be set.");
1605    } else {
1606        let parts: Vec<&str> = iceberg_engine_connection.split('.').collect();
1607        assert_eq!(parts.len(), 2);
1608        let connection_catalog =
1609            session.get_connection_by_name(Some(parts[0].to_owned()), parts[1])?;
1610        if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
1611            if params.connection_type == ConnectionType::Iceberg as i32 {
1612                // With iceberg engine connection:
1613                connection_ref.insert(
1614                    "connection".to_owned(),
1615                    ConnectionRefValue {
1616                        connection_name: ObjectName::from(vec![
1617                            Ident::from(parts[0]),
1618                            Ident::from(parts[1]),
1619                        ]),
1620                    },
1621                );
1622
1623                let mut with_common = BTreeMap::new();
1624                with_common.insert("connector".to_owned(), "iceberg".to_owned());
1625                with_common.insert("database.name".to_owned(), iceberg_database_name.to_owned());
1626                with_common.insert("table.name".to_owned(), iceberg_table_name.to_owned());
1627
1628                if let Some(s) = params.properties.get("hosted_catalog")
1629                    && s.eq_ignore_ascii_case("true")
1630                {
1631                    with_common.insert("catalog.type".to_owned(), "jdbc".to_owned());
1632                    with_common.insert("catalog.uri".to_owned(), catalog_uri.to_owned());
1633                    with_common.insert("catalog.jdbc.user".to_owned(), meta_store_user.to_owned());
1634                    with_common.insert(
1635                        "catalog.jdbc.password".to_owned(),
1636                        meta_store_password.clone(),
1637                    );
1638                    with_common.insert("catalog.name".to_owned(), iceberg_catalog_name.to_owned());
1639                }
1640
1641                with_common
1642            } else {
1643                return Err(RwError::from(ErrorCode::InvalidParameterValue(
1644                    "Only iceberg connection could be used in iceberg engine".to_owned(),
1645                )));
1646            }
1647        } else {
1648            return Err(RwError::from(ErrorCode::InvalidParameterValue(
1649                "Private Link Service has been deprecated. Please create a new connection instead."
1650                    .to_owned(),
1651            )));
1652        }
1653    };
1654
1655    // Iceberg sinks require a primary key, if none is provided, we will use the _row_id column
1656    // Fetch primary key from columns
1657    let mut pks = table
1658        .pk_column_names()
1659        .iter()
1660        .map(|c| c.to_string())
1661        .collect::<Vec<String>>();
1662
1663    // For the table without primary key. We will use `_row_id` as primary key
1664    let sink_from = if pks.len() == 1 && pks[0].eq(ROW_ID_COLUMN_NAME) {
1665        pks = vec![RISINGWAVE_ICEBERG_ROW_ID.to_owned()];
1666        let [stmt]: [_; 1] = Parser::parse_sql(&format!(
1667            "select {} as {}, * from {}",
1668            ROW_ID_COLUMN_NAME, RISINGWAVE_ICEBERG_ROW_ID, table_name
1669        ))
1670        .context("unable to parse query")?
1671        .try_into()
1672        .unwrap();
1673
1674        let Statement::Query(query) = &stmt else {
1675            panic!("unexpected statement: {:?}", stmt);
1676        };
1677        CreateSink::AsQuery(query.clone())
1678    } else {
1679        CreateSink::From(table_name.clone())
1680    };
1681
1682    let mut sink_name = table_name.clone();
1683    *sink_name.0.last_mut().unwrap() = Ident::from(
1684        (ICEBERG_SINK_PREFIX.to_owned() + &sink_name.0.last().unwrap().real_value()).as_str(),
1685    );
1686    let create_sink_stmt = CreateSinkStatement {
1687        if_not_exists: false,
1688        sink_name,
1689        with_properties: WithProperties(vec![]),
1690        sink_from,
1691        columns: vec![],
1692        emit_mode: None,
1693        sink_schema: None,
1694        into_table_name: None,
1695    };
1696
1697    let mut sink_handler_args = handler_args.clone();
1698
1699    let mut sink_with = with_common.clone();
1700
1701    if table.append_only {
1702        sink_with.insert("type".to_owned(), "append-only".to_owned());
1703    } else {
1704        sink_with.insert("primary_key".to_owned(), pks.join(","));
1705        sink_with.insert("type".to_owned(), "upsert".to_owned());
1706    }
1707    // sink_with.insert(SINK_SNAPSHOT_OPTION.to_owned(), "false".to_owned());
1708    //
1709    // Note: in theory, we don't need to backfill from the table to the sink,
1710    // but we don't have atomic DDL now https://github.com/risingwavelabs/risingwave/issues/21863
1711    // so it may have potential data loss problem on the first barrier.
1712    //
1713    // For non-append-only table, we can always solve it by the initial sink with backfill, since
1714    // data will be present in hummock table.
1715    //
1716    // For append-only table, we need to be more careful.
1717    //
1718    // The possible cases for a table:
1719    // - For table without connector: it doesn't matter, since there's no data before the table is created
1720    // - For table with connector: we workarounded it by setting SOURCE_RATE_LIMIT to 0
1721    //   + If we support blocking DDL for table with connector, we need to be careful.
1722    // - For table with an upstream job: Specifically, CDC table from shared CDC source.
1723    //   + Data may come from both upstream connector, and CDC table backfill, so we need to pause both of them.
1724    //   + For now we don't support APPEND ONLY CDC table, so it's safe.
1725    let commit_checkpoint_interval = handler_args
1726        .with_options
1727        .get(COMMIT_CHECKPOINT_INTERVAL)
1728        .map(|v| v.to_owned())
1729        .unwrap_or_else(|| "60".to_owned());
1730    let commit_checkpoint_interval = commit_checkpoint_interval.parse::<u32>().map_err(|_| {
1731        ErrorCode::InvalidInputSyntax(format!(
1732            "commit_checkpoint_interval must be a positive integer: {}",
1733            commit_checkpoint_interval
1734        ))
1735    })?;
1736
1737    if commit_checkpoint_interval == 0 {
1738        bail!("commit_checkpoint_interval must be a positive integer: 0");
1739    }
1740
1741    // remove commit_checkpoint_interval from source options, otherwise it will be considered as an unknown field.
1742    source
1743        .as_mut()
1744        .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL));
1745
1746    let sink_decouple = session.config().sink_decouple();
1747    if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 {
1748        bail!(
1749            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
1750        )
1751    }
1752
1753    sink_with.insert(
1754        COMMIT_CHECKPOINT_INTERVAL.to_owned(),
1755        commit_checkpoint_interval.to_string(),
1756    );
1757    sink_with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
1758
1759    sink_with.insert("is_exactly_once".to_owned(), "true".to_owned());
1760
1761    if let Some(enable_compaction) = handler_args.with_options.get(ENABLE_COMPACTION) {
1762        match enable_compaction.to_lowercase().as_str() {
1763            "true" => {
1764                risingwave_common::license::Feature::IcebergCompaction.check_available()?;
1765
1766                sink_with.insert(ENABLE_COMPACTION.to_owned(), "true".to_owned());
1767            }
1768            "false" => {
1769                sink_with.insert(ENABLE_COMPACTION.to_owned(), "false".to_owned());
1770            }
1771            _ => {
1772                return Err(ErrorCode::InvalidInputSyntax(format!(
1773                    "enable_compaction must be true or false: {}",
1774                    enable_compaction
1775                ))
1776                .into());
1777            }
1778        }
1779
1780        // remove enable_compaction from source options, otherwise it will be considered as an unknown field.
1781        source
1782            .as_mut()
1783            .map(|x| x.with_properties.remove("enable_compaction"));
1784    } else {
1785        sink_with.insert(
1786            ENABLE_COMPACTION.to_owned(),
1787            risingwave_common::license::Feature::IcebergCompaction
1788                .check_available()
1789                .is_ok()
1790                .to_string(),
1791        );
1792    }
1793
1794    if let Some(compaction_interval_sec) = handler_args.with_options.get(COMPACTION_INTERVAL_SEC) {
1795        let compaction_interval_sec = compaction_interval_sec.parse::<u64>().map_err(|_| {
1796            ErrorCode::InvalidInputSyntax(format!(
1797                "compaction_interval_sec must be a positive integer: {}",
1798                commit_checkpoint_interval
1799            ))
1800        })?;
1801        if compaction_interval_sec == 0 {
1802            bail!("compaction_interval_sec must be a positive integer: 0");
1803        }
1804        sink_with.insert(
1805            "compaction_interval_sec".to_owned(),
1806            compaction_interval_sec.to_string(),
1807        );
1808        // remove compaction_interval_sec from source options, otherwise it will be considered as an unknown field.
1809        source
1810            .as_mut()
1811            .map(|x| x.with_properties.remove("compaction_interval_sec"));
1812    }
1813
1814    if let Some(enable_snapshot_expiration) =
1815        handler_args.with_options.get(ENABLE_SNAPSHOT_EXPIRATION)
1816    {
1817        match enable_snapshot_expiration.to_lowercase().as_str() {
1818            "true" => {
1819                risingwave_common::license::Feature::IcebergCompaction.check_available()?;
1820                sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "true".to_owned());
1821            }
1822            "false" => {
1823                sink_with.insert(ENABLE_SNAPSHOT_EXPIRATION.to_owned(), "false".to_owned());
1824            }
1825            _ => {
1826                return Err(ErrorCode::InvalidInputSyntax(format!(
1827                    "enable_snapshot_expiration must be true or false: {}",
1828                    enable_snapshot_expiration
1829                ))
1830                .into());
1831            }
1832        }
1833
1834        // remove enable_snapshot_expiration from source options, otherwise it will be considered as an unknown field.
1835        source
1836            .as_mut()
1837            .map(|x| x.with_properties.remove(ENABLE_SNAPSHOT_EXPIRATION));
1838    } else {
1839        sink_with.insert(
1840            ENABLE_SNAPSHOT_EXPIRATION.to_owned(),
1841            risingwave_common::license::Feature::IcebergCompaction
1842                .check_available()
1843                .is_ok()
1844                .to_string(),
1845        );
1846    }
1847
1848    if let Some(write_mode) = handler_args.with_options.get(WRITE_MODE) {
1849        match write_mode.to_lowercase().as_str() {
1850            ICEBERG_WRITE_MODE_MERGE_ON_READ => {
1851                sink_with.insert(
1852                    WRITE_MODE.to_owned(),
1853                    ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
1854                );
1855            }
1856
1857            ICEBERG_WRITE_MODE_COPY_ON_WRITE => {
1858                risingwave_common::license::Feature::IcebergCompaction.check_available()?;
1859
1860                if table.append_only {
1861                    return Err(ErrorCode::NotSupported(
1862                        "COPY ON WRITE is not supported for append-only iceberg table".to_owned(),
1863                        "Please use MERGE ON READ instead".to_owned(),
1864                    )
1865                    .into());
1866                }
1867
1868                sink_with.insert(
1869                    WRITE_MODE.to_owned(),
1870                    ICEBERG_WRITE_MODE_COPY_ON_WRITE.to_owned(),
1871                );
1872            }
1873
1874            _ => {
1875                return Err(ErrorCode::InvalidInputSyntax(format!(
1876                    "write_mode must be one of: {}, {}",
1877                    ICEBERG_WRITE_MODE_MERGE_ON_READ, ICEBERG_WRITE_MODE_COPY_ON_WRITE
1878                ))
1879                .into());
1880            }
1881        }
1882
1883        // remove write_mode from source options, otherwise it will be considered as an unknown field.
1884        source
1885            .as_mut()
1886            .map(|x| x.with_properties.remove("write_mode"));
1887    } else {
1888        sink_with.insert(
1889            WRITE_MODE.to_owned(),
1890            ICEBERG_WRITE_MODE_MERGE_ON_READ.to_owned(),
1891        );
1892    }
1893
1894    let partition_by = handler_args
1895        .with_options
1896        .get("partition_by")
1897        .map(|v| v.to_owned());
1898
1899    if let Some(partition_by) = &partition_by {
1900        let mut partition_columns = vec![];
1901        for (column, _) in parse_partition_by_exprs(partition_by.clone())? {
1902            table
1903                .columns()
1904                .iter()
1905                .find(|col| col.name().eq_ignore_ascii_case(&column))
1906                .ok_or_else(|| {
1907                    ErrorCode::InvalidInputSyntax(format!(
1908                        "Partition source column does not exist in schema: {}",
1909                        column
1910                    ))
1911                })?;
1912
1913            partition_columns.push(column.to_owned());
1914        }
1915
1916        ensure_partition_columns_are_prefix_of_primary_key(&partition_columns, &pks).map_err(
1917            |_| {
1918                ErrorCode::InvalidInputSyntax(
1919                    "The partition columns should be the prefix of the primary key".to_owned(),
1920                )
1921            },
1922        )?;
1923
1924        sink_with.insert("partition_by".to_owned(), partition_by.to_owned());
1925
1926        // remove partition_by from source options, otherwise it will be considered as an unknown field.
1927        source
1928            .as_mut()
1929            .map(|x| x.with_properties.remove("partition_by"));
1930    }
1931
1932    sink_handler_args.with_options =
1933        WithOptions::new(sink_with, Default::default(), connection_ref.clone());
1934
1935    let mut source_name = table_name.clone();
1936    *source_name.0.last_mut().unwrap() = Ident::from(
1937        (ICEBERG_SOURCE_PREFIX.to_owned() + &source_name.0.last().unwrap().real_value()).as_str(),
1938    );
1939    let create_source_stmt = CreateSourceStatement {
1940        temporary: false,
1941        if_not_exists: false,
1942        columns: vec![],
1943        source_name,
1944        wildcard_idx: Some(0),
1945        constraints: vec![],
1946        with_properties: WithProperties(vec![]),
1947        format_encode: CompatibleFormatEncode::V2(FormatEncodeOptions::none()),
1948        source_watermarks: vec![],
1949        include_column_options: vec![],
1950    };
1951
1952    let mut source_handler_args = handler_args.clone();
1953    let source_with = with_common;
1954    source_handler_args.with_options =
1955        WithOptions::new(source_with, Default::default(), connection_ref);
1956
1957    // before we create the table, ensure the JVM is initialized as we use jdbc catalog right now.
1958    // If JVM isn't initialized successfully, current not atomic ddl will result in a partially created iceberg engine table.
1959    let _ = JVM.get_or_init()?;
1960
1961    let catalog_writer = session.catalog_writer()?;
1962    // TODO(iceberg): make iceberg engine table creation ddl atomic
1963    let has_connector = source.is_some();
1964    catalog_writer
1965        .create_table(
1966            source,
1967            table.to_prost(),
1968            graph,
1969            job_type,
1970            if_not_exists,
1971            HashSet::default(),
1972        )
1973        .await?;
1974    let res = create_sink::handle_create_sink(sink_handler_args, create_sink_stmt, true).await;
1975    if res.is_err() {
1976        // Since we don't support ddl atomicity, we need to drop the partial created table.
1977        handle_drop_table(handler_args.clone(), table_name.clone(), true, true).await?;
1978        res?;
1979    }
1980    let res = create_source::handle_create_source(source_handler_args, create_source_stmt).await;
1981    if res.is_err() {
1982        // Since we don't support ddl atomicity, we need to drop the partial created table.
1983        handle_drop_table(handler_args.clone(), table_name.clone(), true, true).await?;
1984        res?;
1985    }
1986
1987    if has_connector {
1988        alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
1989            handler_args,
1990            PbThrottleTarget::TableWithSource,
1991            table_name,
1992            -1,
1993        )
1994        .await?;
1995    }
1996
1997    Ok(())
1998}
1999
2000pub fn check_create_table_with_source(
2001    with_options: &WithOptions,
2002    format_encode: Option<FormatEncodeOptions>,
2003    include_column_options: &IncludeOption,
2004    cdc_table_info: &Option<CdcTableInfo>,
2005) -> Result<Option<FormatEncodeOptions>> {
2006    // skip check for cdc table
2007    if cdc_table_info.is_some() {
2008        return Ok(format_encode);
2009    }
2010    let defined_source = with_options.is_source_connector();
2011
2012    if !include_column_options.is_empty() && !defined_source {
2013        return Err(ErrorCode::InvalidInputSyntax(
2014            "INCLUDE should be used with a connector".to_owned(),
2015        )
2016        .into());
2017    }
2018    if defined_source {
2019        format_encode.as_ref().ok_or_else(|| {
2020            ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned())
2021        })?;
2022    }
2023    Ok(format_encode)
2024}
2025
2026fn ensure_partition_columns_are_prefix_of_primary_key(
2027    partition_columns: &[String],
2028    primary_key_columns: &[String],
2029) -> std::result::Result<(), String> {
2030    if partition_columns.len() > primary_key_columns.len() {
2031        return Err("Partition columns cannot be longer than primary key columns.".to_owned());
2032    }
2033
2034    for (i, partition_col) in partition_columns.iter().enumerate() {
2035        if primary_key_columns.get(i) != Some(partition_col) {
2036            return Err(format!(
2037                "Partition column '{}' is not a prefix of the primary key.",
2038                partition_col
2039            ));
2040        }
2041    }
2042
2043    Ok(())
2044}
2045
2046#[allow(clippy::too_many_arguments)]
2047pub async fn generate_stream_graph_for_replace_table(
2048    _session: &Arc<SessionImpl>,
2049    table_name: ObjectName,
2050    original_catalog: &Arc<TableCatalog>,
2051    handler_args: HandlerArgs,
2052    statement: Statement,
2053    col_id_gen: ColumnIdGenerator,
2054    sql_column_strategy: SqlColumnStrategy,
2055) -> Result<(
2056    StreamFragmentGraph,
2057    TableCatalog,
2058    Option<SourceCatalog>,
2059    TableJobType,
2060)> {
2061    let Statement::CreateTable {
2062        columns,
2063        constraints,
2064        source_watermarks,
2065        append_only,
2066        on_conflict,
2067        with_version_column,
2068        wildcard_idx,
2069        cdc_table_info,
2070        format_encode,
2071        include_column_options,
2072        engine,
2073        with_options,
2074        ..
2075    } = statement
2076    else {
2077        panic!("unexpected statement type: {:?}", statement);
2078    };
2079
2080    let format_encode = format_encode
2081        .clone()
2082        .map(|format_encode| format_encode.into_v2_with_warning());
2083
2084    let engine = match engine {
2085        risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
2086        risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
2087    };
2088
2089    let is_drop_connector =
2090        original_catalog.associated_source_id().is_some() && format_encode.is_none();
2091    if is_drop_connector {
2092        debug_assert!(
2093            source_watermarks.is_empty()
2094                && include_column_options.is_empty()
2095                && with_options
2096                    .iter()
2097                    .all(|opt| opt.name.real_value().to_lowercase() != "connector")
2098        );
2099    }
2100
2101    let props = CreateTableProps {
2102        definition: handler_args.normalized_sql.clone(),
2103        append_only,
2104        on_conflict: on_conflict.into(),
2105        with_version_column: with_version_column.as_ref().map(|x| x.real_value()),
2106        webhook_info: original_catalog.webhook_info.clone(),
2107        engine,
2108    };
2109
2110    let ((plan, mut source, mut table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
2111        (Some(format_encode), None) => (
2112            gen_create_table_plan_with_source(
2113                handler_args,
2114                ExplainOptions::default(),
2115                table_name,
2116                columns,
2117                wildcard_idx,
2118                constraints,
2119                format_encode,
2120                source_watermarks,
2121                col_id_gen,
2122                include_column_options,
2123                props,
2124                sql_column_strategy,
2125            )
2126            .await?,
2127            TableJobType::General,
2128        ),
2129        (None, None) => {
2130            let context = OptimizerContext::from_handler_args(handler_args);
2131            let (plan, table) = gen_create_table_plan(
2132                context,
2133                table_name,
2134                columns,
2135                constraints,
2136                col_id_gen,
2137                source_watermarks,
2138                props,
2139                true,
2140            )?;
2141            ((plan, None, table), TableJobType::General)
2142        }
2143        (None, Some(cdc_table)) => {
2144            let session = &handler_args.session;
2145            let (source, resolved_table_name, database_id, schema_id) =
2146                get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;
2147
2148            let cdc_with_options = derive_with_options_for_cdc_table(
2149                &source.with_properties,
2150                cdc_table.external_table_name.clone(),
2151            )?;
2152
2153            let (column_catalogs, pk_names) = bind_cdc_table_schema(&columns, &constraints, true)?;
2154
2155            let context: OptimizerContextRef =
2156                OptimizerContext::new(handler_args, ExplainOptions::default()).into();
2157            let (plan, table) = gen_create_table_plan_for_cdc_table(
2158                context,
2159                source,
2160                cdc_table.external_table_name.clone(),
2161                columns,
2162                column_catalogs,
2163                pk_names,
2164                cdc_with_options,
2165                col_id_gen,
2166                on_conflict,
2167                with_version_column.map(|x| x.real_value()),
2168                include_column_options,
2169                table_name,
2170                resolved_table_name,
2171                database_id,
2172                schema_id,
2173                original_catalog.id(),
2174                engine,
2175            )?;
2176
2177            ((plan, None, table), TableJobType::SharedCdcSource)
2178        }
2179        (Some(_), Some(_)) => {
2180            return Err(ErrorCode::NotSupported(
2181                "Data format and encoding format doesn't apply to table created from a CDC source"
2182                    .into(),
2183                "Remove the FORMAT and ENCODE specification".into(),
2184            )
2185            .into());
2186        }
2187    };
2188
2189    if table.pk_column_ids() != original_catalog.pk_column_ids() {
2190        Err(ErrorCode::InvalidInputSyntax(
2191            "alter primary key of table is not supported".to_owned(),
2192        ))?
2193    }
2194
2195    let graph = build_graph(plan, Some(GraphJobType::Table))?;
2196
2197    // Fill the original table ID.
2198    table.id = original_catalog.id();
2199    if !is_drop_connector && let Some(source_id) = original_catalog.associated_source_id() {
2200        table.associated_source_id = Some(source_id);
2201
2202        let source = source.as_mut().unwrap();
2203        source.id = source_id.table_id;
2204        source.associated_table_id = Some(table.id());
2205    }
2206
2207    Ok((graph, table, source, job_type))
2208}
2209
2210fn get_source_and_resolved_table_name(
2211    session: &Arc<SessionImpl>,
2212    cdc_table: CdcTableInfo,
2213    table_name: ObjectName,
2214) -> Result<(Arc<SourceCatalog>, String, DatabaseId, SchemaId)> {
2215    let db_name = &session.database();
2216    let (schema_name, resolved_table_name) =
2217        Binder::resolve_schema_qualified_name(db_name, &table_name)?;
2218    let (database_id, schema_id) =
2219        session.get_database_and_schema_id_for_create(schema_name.clone())?;
2220
2221    let (format_encode, source_name) =
2222        Binder::resolve_schema_qualified_name(db_name, &cdc_table.source_name)?;
2223
2224    let source = {
2225        let catalog_reader = session.env().catalog_reader().read_guard();
2226        let schema_name = format_encode.unwrap_or(DEFAULT_SCHEMA_NAME.to_owned());
2227        let (source, _) = catalog_reader.get_source_by_name(
2228            db_name,
2229            SchemaPath::Name(schema_name.as_str()),
2230            source_name.as_str(),
2231        )?;
2232        source.clone()
2233    };
2234
2235    Ok((source, resolved_table_name, database_id, schema_id))
2236}
2237
2238// validate the webhook_info and also bind the webhook_info to protobuf
2239fn bind_webhook_info(
2240    session: &Arc<SessionImpl>,
2241    columns_defs: &[ColumnDef],
2242    webhook_info: WebhookSourceInfo,
2243) -> Result<PbWebhookSourceInfo> {
2244    // validate columns
2245    if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &AstDataType::Jsonb
2246    {
2247        return Err(ErrorCode::InvalidInputSyntax(
2248            "Table with webhook source should have exactly one JSONB column".to_owned(),
2249        )
2250        .into());
2251    }
2252
2253    let WebhookSourceInfo {
2254        secret_ref,
2255        signature_expr,
2256        wait_for_persistence,
2257        is_batched,
2258    } = webhook_info;
2259
2260    // validate secret_ref
2261    let (pb_secret_ref, secret_name) = if let Some(secret_ref) = secret_ref {
2262        let db_name = &session.database();
2263        let (schema_name, secret_name) =
2264            Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
2265        let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
2266        (
2267            Some(PbSecretRef {
2268                secret_id: secret_catalog.id.secret_id(),
2269                ref_as: match secret_ref.ref_as {
2270                    SecretRefAsType::Text => PbRefAsType::Text,
2271                    SecretRefAsType::File => PbRefAsType::File,
2272                }
2273                .into(),
2274            }),
2275            Some(secret_name),
2276        )
2277    } else {
2278        (None, None)
2279    };
2280
2281    let secure_compare_context = SecureCompareContext {
2282        column_name: columns_defs[0].name.real_value(),
2283        secret_name,
2284    };
2285    let mut binder = Binder::new_for_ddl_with_secure_compare(session, secure_compare_context);
2286    let expr = binder.bind_expr(&signature_expr)?;
2287
2288    // validate expr, ensuring it is SECURE_COMPARE()
2289    if expr.as_function_call().is_none()
2290        || expr.as_function_call().unwrap().func_type()
2291            != crate::optimizer::plan_node::generic::ExprType::SecureCompare
2292    {
2293        return Err(ErrorCode::InvalidInputSyntax(
2294            "The signature verification function must be SECURE_COMPARE()".to_owned(),
2295        )
2296        .into());
2297    }
2298
2299    let pb_webhook_info = PbWebhookSourceInfo {
2300        secret_ref: pb_secret_ref,
2301        signature_expr: Some(expr.to_expr_proto()),
2302        wait_for_persistence,
2303        is_batched,
2304    };
2305
2306    Ok(pb_webhook_info)
2307}
2308
2309#[cfg(test)]
2310mod tests {
2311    use risingwave_common::catalog::{
2312        DEFAULT_DATABASE_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
2313    };
2314    use risingwave_common::types::{DataType, StructType};
2315
2316    use super::*;
2317    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
2318
2319    #[tokio::test]
2320    async fn test_create_table_handler() {
2321        let sql =
2322            "create table t (v1 smallint, v2 struct<v3 bigint, v4 float, v5 double>) append only;";
2323        let frontend = LocalFrontend::new(Default::default()).await;
2324        frontend.run_sql(sql).await.unwrap();
2325
2326        let session = frontend.session_ref();
2327        let catalog_reader = session.env().catalog_reader().read_guard();
2328        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2329
2330        // Check table exists.
2331        let (table, _) = catalog_reader
2332            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2333            .unwrap();
2334        assert_eq!(table.name(), "t");
2335
2336        let columns = table
2337            .columns
2338            .iter()
2339            .map(|col| (col.name(), col.data_type().clone()))
2340            .collect::<HashMap<&str, DataType>>();
2341
2342        let expected_columns = maplit::hashmap! {
2343            ROW_ID_COLUMN_NAME => DataType::Serial,
2344            "v1" => DataType::Int16,
2345            "v2" => StructType::new(
2346                vec![("v3", DataType::Int64),("v4", DataType::Float64),("v5", DataType::Float64)],
2347            )
2348            .with_ids([3, 4, 5].map(ColumnId::new))
2349            .into(),
2350            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
2351        };
2352
2353        assert_eq!(columns, expected_columns, "{columns:#?}");
2354    }
2355
2356    #[test]
2357    fn test_bind_primary_key() {
2358        // Note: Column ID 0 is reserved for row ID column.
2359
2360        for (sql, expected) in [
2361            ("create table t (v1 int, v2 int)", Ok(&[0] as &[_])),
2362            ("create table t (v1 int primary key, v2 int)", Ok(&[1])),
2363            ("create table t (v1 int, v2 int primary key)", Ok(&[2])),
2364            (
2365                "create table t (v1 int primary key, v2 int primary key)",
2366                Err("multiple primary keys are not allowed"),
2367            ),
2368            (
2369                "create table t (v1 int primary key primary key, v2 int)",
2370                Err("multiple primary keys are not allowed"),
2371            ),
2372            (
2373                "create table t (v1 int, v2 int, primary key (v1))",
2374                Ok(&[1]),
2375            ),
2376            (
2377                "create table t (v1 int, primary key (v2), v2 int)",
2378                Ok(&[2]),
2379            ),
2380            (
2381                "create table t (primary key (v2, v1), v1 int, v2 int)",
2382                Ok(&[2, 1]),
2383            ),
2384            (
2385                "create table t (v1 int, primary key (v1), v2 int, primary key (v1))",
2386                Err("multiple primary keys are not allowed"),
2387            ),
2388            (
2389                "create table t (v1 int primary key, primary key (v1), v2 int)",
2390                Err("multiple primary keys are not allowed"),
2391            ),
2392            (
2393                "create table t (v1 int, primary key (V3), v2 int)",
2394                Err("column \"v3\" named in key does not exist"),
2395            ),
2396        ] {
2397            let mut ast = risingwave_sqlparser::parser::Parser::parse_sql(sql).unwrap();
2398            let risingwave_sqlparser::ast::Statement::CreateTable {
2399                columns: column_defs,
2400                constraints,
2401                ..
2402            } = ast.remove(0)
2403            else {
2404                panic!("test case should be create table")
2405            };
2406            let actual: Result<_> = (|| {
2407                let mut columns = bind_sql_columns(&column_defs, false)?;
2408                let mut col_id_gen = ColumnIdGenerator::new_initial();
2409                for c in &mut columns {
2410                    col_id_gen.generate(c)?;
2411                }
2412
2413                let pk_names =
2414                    bind_sql_pk_names(&column_defs, bind_table_constraints(&constraints)?)?;
2415                let (_, pk_column_ids, _) =
2416                    bind_pk_and_row_id_on_relation(columns, pk_names, true)?;
2417                Ok(pk_column_ids)
2418            })();
2419            match (expected, actual) {
2420                (Ok(expected), Ok(actual)) => assert_eq!(
2421                    expected.iter().copied().map(ColumnId::new).collect_vec(),
2422                    actual,
2423                    "sql: {sql}"
2424                ),
2425                (Ok(_), Err(actual)) => panic!("sql: {sql}\nunexpected error: {actual:?}"),
2426                (Err(_), Ok(actual)) => panic!("sql: {sql}\nexpects error but got: {actual:?}"),
2427                (Err(expected), Err(actual)) => assert!(
2428                    actual.to_string().contains(expected),
2429                    "sql: {sql}\nexpected: {expected:?}\nactual: {actual:?}"
2430                ),
2431            }
2432        }
2433    }
2434
2435    #[tokio::test]
2436    async fn test_duplicate_props_options() {
2437        let proto_file = create_proto_file(PROTO_FILE_DATA);
2438        let sql = format!(
2439            r#"CREATE TABLE t
2440    WITH (
2441        connector = 'kinesis',
2442        aws.region='user_test_topic',
2443        endpoint='172.10.1.1:9090,172.10.1.2:9090',
2444        aws.credentials.access_key_id = 'your_access_key_1',
2445        aws.credentials.secret_access_key = 'your_secret_key_1'
2446    )
2447    FORMAT PLAIN ENCODE PROTOBUF (
2448        message = '.test.TestRecord',
2449        aws.credentials.access_key_id = 'your_access_key_2',
2450        aws.credentials.secret_access_key = 'your_secret_key_2',
2451        schema.location = 'file://{}',
2452    )"#,
2453            proto_file.path().to_str().unwrap()
2454        );
2455        let frontend = LocalFrontend::new(Default::default()).await;
2456        frontend.run_sql(sql).await.unwrap();
2457
2458        let session = frontend.session_ref();
2459        let catalog_reader = session.env().catalog_reader().read_guard();
2460        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
2461
2462        // Check source exists.
2463        let (source, _) = catalog_reader
2464            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
2465            .unwrap();
2466        assert_eq!(source.name, "t");
2467
2468        // AwsAuth params exist in options.
2469        assert_eq!(
2470            source
2471                .info
2472                .format_encode_options
2473                .get("aws.credentials.access_key_id")
2474                .unwrap(),
2475            "your_access_key_2"
2476        );
2477        assert_eq!(
2478            source
2479                .info
2480                .format_encode_options
2481                .get("aws.credentials.secret_access_key")
2482                .unwrap(),
2483            "your_secret_key_2"
2484        );
2485
2486        // AwsAuth params exist in props.
2487        assert_eq!(
2488            source
2489                .with_properties
2490                .get("aws.credentials.access_key_id")
2491                .unwrap(),
2492            "your_access_key_1"
2493        );
2494        assert_eq!(
2495            source
2496                .with_properties
2497                .get("aws.credentials.secret_access_key")
2498                .unwrap(),
2499            "your_secret_key_1"
2500        );
2501
2502        // Options are not merged into props.
2503        assert!(!source.with_properties.contains_key("schema.location"));
2504    }
2505}