risingwave_frontend/catalog/
purify.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use itertools::Itertools;
17use prost::Message as _;
18use risingwave_common::bail;
19use risingwave_common::catalog::{ColumnCatalog, ColumnId};
20use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
21use risingwave_sqlparser::ast::*;
22
23use crate::error::Result;
24use crate::session::current;
25use crate::utils::data_type::DataTypeToAst as _;
26
27mod pk_column {
28    use super::*;
29    // Identifies a primary key column...
30    pub trait PkColumn {
31        fn is(&self, column: &ColumnCatalog) -> bool;
32    }
33    // ...by column name.
34    impl PkColumn for &str {
35        fn is(&self, column: &ColumnCatalog) -> bool {
36            column.name() == *self
37        }
38    }
39    // ...by column ID.
40    impl PkColumn for ColumnId {
41        fn is(&self, column: &ColumnCatalog) -> bool {
42            column.column_id() == *self
43        }
44    }
45}
46use pk_column::PkColumn;
47
48/// Try to restore missing column definitions and constraints in the persisted table (or source)
49/// definition, if the schema is derived from external systems (like schema registry) or it's
50/// created by `CREATE TABLE AS`.
51///
52/// Returns error if restoring failed, or the persisted definition is invalid.
53pub fn try_purify_table_source_create_sql_ast(
54    mut base: Statement,
55    columns: &[ColumnCatalog],
56    row_id_index: Option<usize>,
57    pk_column_ids: &[impl PkColumn],
58) -> Result<Statement> {
59    if let Some(config) = current::config()
60        && config.read().disable_purify_definition()
61    {
62        current::notice_to_user(
63            "purifying definition is disabled via session config, results may be inaccurate",
64        );
65        return Ok(base);
66    }
67
68    let (Statement::CreateTable {
69        columns: column_defs,
70        constraints,
71        wildcard_idx,
72        ..
73    }
74    | Statement::CreateSource {
75        stmt:
76            CreateSourceStatement {
77                columns: column_defs,
78                constraints,
79                wildcard_idx,
80                ..
81            },
82    }) = &mut base
83    else {
84        bail!("expect `CREATE TABLE` or `CREATE SOURCE` statement, found: `{base:?}`");
85    };
86
87    // First, remove the wildcard from the definition.
88    *wildcard_idx = None;
89
90    // Filter out columns that are not defined by users in SQL.
91    let defined_columns = columns.iter().filter(|c| c.is_user_defined());
92
93    // Derive `ColumnDef` from `ColumnCatalog`.
94    let mut purified_column_defs = Vec::new();
95    for column in defined_columns {
96        let mut column_def = if let Some(existing) = column_defs
97            .iter()
98            .find(|c| c.name.real_value() == column.name())
99        {
100            // If the column is already defined in the persisted definition, retrieve it.
101            existing.clone()
102        } else {
103            assert!(
104                !column.is_generated(),
105                "generated column must not be inferred"
106            );
107
108            // Generate a new `ColumnDef` from the catalog.
109            ColumnDef {
110                name: Ident::from_real_value(column.name()),
111                data_type: Some(column.data_type().to_ast()),
112                collation: None,
113                options: Vec::new(), // pk will be specified with table constraints
114            }
115        };
116
117        // Fill in the persisted default value desc.
118        if let Some(c) = &column.column_desc.generated_or_default_column
119            && let GeneratedOrDefaultColumn::DefaultColumn(desc) = c
120        {
121            let persisted = desc.encode_to_vec().into_boxed_slice();
122
123            let default_value_option = column_def
124                .options
125                .extract_if(.., |o| {
126                    matches!(
127                        o.option,
128                        ColumnOption::DefaultValue { .. }
129                            | ColumnOption::DefaultValueInternal { .. }
130                    )
131                })
132                .at_most_one()
133                .ok()
134                .context("multiple default value options found")?;
135
136            let expr = default_value_option.and_then(|o| match o.option {
137                ColumnOption::DefaultValue(expr) => Some(expr),
138                ColumnOption::DefaultValueInternal { expr, .. } => expr,
139                _ => unreachable!(),
140            });
141
142            column_def.options.push(ColumnOptionDef {
143                name: None,
144                option: ColumnOption::DefaultValueInternal { persisted, expr },
145            });
146        }
147
148        purified_column_defs.push(column_def);
149    }
150    *column_defs = purified_column_defs;
151
152    // Specify user-defined primary key in table constraints.
153    let has_pk_column_constraint = column_defs.iter().any(|c| {
154        c.options
155            .iter()
156            .any(|o| matches!(o.option, ColumnOption::Unique { is_primary: true }))
157    });
158    if !has_pk_column_constraint && row_id_index.is_none() {
159        let mut pk_columns = Vec::new();
160
161        for id in pk_column_ids {
162            let column = columns
163                .iter()
164                .find(|c| id.is(c))
165                .context("primary key column not found")?;
166            if !column.is_user_defined() {
167                bail /* unlikely */ !(
168                    "primary key column \"{}\" is not user-defined",
169                    column.name()
170                );
171            }
172            // Find the name in `Ident` form from `column_defs` to preserve quote style best.
173            let name_ident = column_defs
174                .iter()
175                .find(|c| c.name.real_value() == column.name())
176                .unwrap()
177                .name
178                .clone();
179            pk_columns.push(name_ident);
180        }
181
182        let pk_constraint = TableConstraint::Unique {
183            name: None,
184            columns: pk_columns,
185            is_primary: true,
186        };
187
188        // We don't support table constraints other than `PRIMARY KEY`, thus simply overwrite.
189        assert!(
190            constraints.len() <= 1
191                && constraints.iter().all(|c| matches!(
192                    c,
193                    TableConstraint::Unique {
194                        is_primary: true,
195                        ..
196                    }
197                )),
198            "unexpected table constraints: {constraints:?}",
199        );
200
201        *constraints = vec![pk_constraint];
202    }
203
204    Ok(base)
205}