risingwave_frontend/catalog/
purify.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use itertools::Itertools;
17use prost::Message as _;
18use risingwave_common::bail;
19use risingwave_common::catalog::{ColumnCatalog, ColumnId};
20use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
21use risingwave_sqlparser::ast::*;
22
23use crate::error::Result;
24use crate::session::current;
25use crate::utils::data_type::DataTypeToAst as _;
26
27mod pk_column {
28    use super::*;
29    // Identifies a primary key column...
30    pub trait PkColumn {
31        fn is(&self, column: &ColumnCatalog) -> bool;
32    }
33    // ...by column name.
34    impl PkColumn for &str {
35        fn is(&self, column: &ColumnCatalog) -> bool {
36            column.name() == *self
37        }
38    }
39    // ...by column ID.
40    impl PkColumn for ColumnId {
41        fn is(&self, column: &ColumnCatalog) -> bool {
42            column.column_id() == *self
43        }
44    }
45}
46use pk_column::PkColumn;
47
48/// Try to restore missing column definitions and constraints in the persisted table (or source)
49/// definition, if the schema is derived from external systems (like schema registry) or it's
50/// created by `CREATE TABLE AS`.
51///
52/// Returns error if restoring failed, or the persisted definition is invalid.
53pub fn try_purify_table_source_create_sql_ast(
54    mut base: Statement,
55    columns: &[ColumnCatalog],
56    row_id_index: Option<usize>,
57    pk_column_ids: &[impl PkColumn],
58) -> Result<Statement> {
59    if let Some(config) = current::config()
60        && config.read().disable_purify_definition()
61    {
62        current::notice_to_user(
63            "purifying definition is disabled via session config, results may be inaccurate",
64        );
65        return Ok(base);
66    }
67
68    let (Statement::CreateTable {
69        columns: column_defs,
70        constraints,
71        wildcard_idx,
72        include_column_options,
73        ..
74    }
75    | Statement::CreateSource {
76        stmt:
77            CreateSourceStatement {
78                columns: column_defs,
79                constraints,
80                wildcard_idx,
81                include_column_options,
82                ..
83            },
84    }) = &mut base
85    else {
86        bail!("expect `CREATE TABLE` or `CREATE SOURCE` statement, found: `{base:?}`");
87    };
88
89    // First, remove the wildcard from the definition.
90    *wildcard_idx = None;
91
92    // Filter out columns that are not defined by users in SQL.
93    let defined_columns = columns.iter().filter(|c| c.is_defined_in_columns_clause());
94
95    // Derive `ColumnDef` from `ColumnCatalog`.
96    let mut purified_column_defs = Vec::new();
97    for column in defined_columns {
98        let mut column_def = if let Some(existing) = column_defs
99            .iter()
100            .find(|c| c.name.real_value() == column.name())
101        {
102            // If the column is already defined in the persisted definition, retrieve it.
103            existing.clone()
104        } else {
105            assert!(
106                !column.is_generated(),
107                "generated column must not be inferred"
108            );
109
110            // Generate a new `ColumnDef` from the catalog.
111            ColumnDef {
112                name: Ident::from_real_value(column.name()),
113                data_type: Some(column.data_type().to_ast()),
114                collation: None,
115                options: Vec::new(), // pk will be specified with table constraints
116            }
117        };
118
119        // Fill in the persisted default value desc.
120        if let Some(c) = &column.column_desc.generated_or_default_column
121            && let GeneratedOrDefaultColumn::DefaultColumn(desc) = c
122        {
123            let persisted = desc.encode_to_vec().into_boxed_slice();
124
125            let default_value_option = column_def
126                .options
127                .extract_if(.., |o| {
128                    matches!(
129                        o.option,
130                        ColumnOption::DefaultValue { .. }
131                            | ColumnOption::DefaultValueInternal { .. }
132                    )
133                })
134                .at_most_one()
135                .ok()
136                .context("multiple default value options found")?;
137
138            let expr = default_value_option.and_then(|o| match o.option {
139                ColumnOption::DefaultValue(expr) => Some(expr),
140                ColumnOption::DefaultValueInternal { expr, .. } => expr,
141                _ => unreachable!(),
142            });
143
144            column_def.options.push(ColumnOptionDef {
145                name: None,
146                option: ColumnOption::DefaultValueInternal { persisted, expr },
147            });
148        }
149
150        purified_column_defs.push(column_def);
151    }
152    *column_defs = purified_column_defs;
153
154    // Specify user-defined primary key in table constraints.
155    let has_pk_column_constraint = column_defs.iter().any(|c| {
156        c.options
157            .iter()
158            .any(|o| matches!(o.option, ColumnOption::Unique { is_primary: true }))
159    });
160    if !has_pk_column_constraint && row_id_index.is_none() {
161        let mut pk_columns = Vec::new();
162
163        for id in pk_column_ids {
164            let column = columns
165                .iter()
166                .find(|c| id.is(c))
167                .context("primary key column not found")?;
168            // Primary key must refer to a column that is defined in the columns or `INCLUDE` clause.
169            if !(column.is_defined_in_columns_clause() || column.is_connector_additional_column()) {
170                bail /* unlikely */ !(
171                    "primary key column \"{}\" is not user-defined",
172                    column.name()
173                );
174            }
175
176            // Find the name in `Ident` form from `column_defs` to preserve quote style best.
177            let name_ident = if let Some(col) = column_defs
178                .iter()
179                .find(|c| c.name.real_value() == column.name())
180            {
181                col.name.clone()
182            } else if let Some(alias) = include_column_options
183                .iter()
184                .filter_map(|c| c.column_alias.as_ref())
185                .find(|c| c.real_value() == column.name())
186            {
187                alias.clone()
188            } else {
189                // Fallback to generate a quoted name.
190                Ident::from_real_value(column.name())
191            };
192
193            pk_columns.push(name_ident);
194        }
195
196        let pk_constraint = TableConstraint::Unique {
197            name: None,
198            columns: pk_columns,
199            is_primary: true,
200        };
201
202        // We don't support table constraints other than `PRIMARY KEY`, thus simply overwrite.
203        assert!(
204            constraints.len() <= 1
205                && constraints.iter().all(|c| matches!(
206                    c,
207                    TableConstraint::Unique {
208                        is_primary: true,
209                        ..
210                    }
211                )),
212            "unexpected table constraints: {constraints:?}",
213        );
214
215        *constraints = vec![pk_constraint];
216    }
217
218    Ok(base)
219}