risingwave_frontend/catalog/
purify.rs1use anyhow::Context;
16use itertools::Itertools;
17use prost::Message as _;
18use risingwave_common::bail;
19use risingwave_common::catalog::{ColumnCatalog, ColumnId};
20use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
21use risingwave_sqlparser::ast::*;
22
23use crate::error::Result;
24use crate::session::current;
25use crate::utils::data_type::DataTypeToAst as _;
26
27mod pk_column {
28 use super::*;
29 pub trait PkColumn {
31 fn is(&self, column: &ColumnCatalog) -> bool;
32 }
33 impl PkColumn for &str {
35 fn is(&self, column: &ColumnCatalog) -> bool {
36 column.name() == *self
37 }
38 }
39 impl PkColumn for ColumnId {
41 fn is(&self, column: &ColumnCatalog) -> bool {
42 column.column_id() == *self
43 }
44 }
45}
46use pk_column::PkColumn;
47
48pub fn try_purify_table_source_create_sql_ast(
54 mut base: Statement,
55 columns: &[ColumnCatalog],
56 row_id_index: Option<usize>,
57 pk_column_ids: &[impl PkColumn],
58) -> Result<Statement> {
59 if let Some(config) = current::config()
60 && config.read().disable_purify_definition()
61 {
62 current::notice_to_user(
63 "purifying definition is disabled via session config, results may be inaccurate",
64 );
65 return Ok(base);
66 }
67
68 let (Statement::CreateTable {
69 columns: column_defs,
70 constraints,
71 wildcard_idx,
72 ..
73 }
74 | Statement::CreateSource {
75 stmt:
76 CreateSourceStatement {
77 columns: column_defs,
78 constraints,
79 wildcard_idx,
80 ..
81 },
82 }) = &mut base
83 else {
84 bail!("expect `CREATE TABLE` or `CREATE SOURCE` statement, found: `{base:?}`");
85 };
86
87 *wildcard_idx = None;
89
90 let defined_columns = columns.iter().filter(|c| c.is_user_defined());
92
93 let mut purified_column_defs = Vec::new();
95 for column in defined_columns {
96 let mut column_def = if let Some(existing) = column_defs
97 .iter()
98 .find(|c| c.name.real_value() == column.name())
99 {
100 existing.clone()
102 } else {
103 assert!(
104 !column.is_generated(),
105 "generated column must not be inferred"
106 );
107
108 ColumnDef {
110 name: Ident::from_real_value(column.name()),
111 data_type: Some(column.data_type().to_ast()),
112 collation: None,
113 options: Vec::new(), }
115 };
116
117 if let Some(c) = &column.column_desc.generated_or_default_column
119 && let GeneratedOrDefaultColumn::DefaultColumn(desc) = c
120 {
121 let persisted = desc.encode_to_vec().into_boxed_slice();
122
123 let default_value_option = column_def
124 .options
125 .extract_if(.., |o| {
126 matches!(
127 o.option,
128 ColumnOption::DefaultValue { .. }
129 | ColumnOption::DefaultValueInternal { .. }
130 )
131 })
132 .at_most_one()
133 .ok()
134 .context("multiple default value options found")?;
135
136 let expr = default_value_option.and_then(|o| match o.option {
137 ColumnOption::DefaultValue(expr) => Some(expr),
138 ColumnOption::DefaultValueInternal { expr, .. } => expr,
139 _ => unreachable!(),
140 });
141
142 column_def.options.push(ColumnOptionDef {
143 name: None,
144 option: ColumnOption::DefaultValueInternal { persisted, expr },
145 });
146 }
147
148 purified_column_defs.push(column_def);
149 }
150 *column_defs = purified_column_defs;
151
152 let has_pk_column_constraint = column_defs.iter().any(|c| {
154 c.options
155 .iter()
156 .any(|o| matches!(o.option, ColumnOption::Unique { is_primary: true }))
157 });
158 if !has_pk_column_constraint && row_id_index.is_none() {
159 let mut pk_columns = Vec::new();
160
161 for id in pk_column_ids {
162 let column = columns
163 .iter()
164 .find(|c| id.is(c))
165 .context("primary key column not found")?;
166 if !column.is_user_defined() {
167 bail !(
168 "primary key column \"{}\" is not user-defined",
169 column.name()
170 );
171 }
172 let name_ident = column_defs
174 .iter()
175 .find(|c| c.name.real_value() == column.name())
176 .unwrap()
177 .name
178 .clone();
179 pk_columns.push(name_ident);
180 }
181
182 let pk_constraint = TableConstraint::Unique {
183 name: None,
184 columns: pk_columns,
185 is_primary: true,
186 };
187
188 assert!(
190 constraints.len() <= 1
191 && constraints.iter().all(|c| matches!(
192 c,
193 TableConstraint::Unique {
194 is_primary: true,
195 ..
196 }
197 )),
198 "unexpected table constraints: {constraints:?}",
199 );
200
201 *constraints = vec![pk_constraint];
202 }
203
204 Ok(base)
205}