risingwave_frontend/catalog/
purify.rs1use anyhow::Context;
16use itertools::Itertools;
17use prost::Message as _;
18use risingwave_common::bail;
19use risingwave_common::catalog::{ColumnCatalog, ColumnId};
20use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
21use risingwave_sqlparser::ast::*;
22
23use crate::error::Result;
24use crate::session::current;
25use crate::utils::data_type::DataTypeToAst as _;
26
27mod pk_column {
28 use super::*;
29 pub trait PkColumn {
31 fn is(&self, column: &ColumnCatalog) -> bool;
32 }
33 impl PkColumn for &str {
35 fn is(&self, column: &ColumnCatalog) -> bool {
36 column.name() == *self
37 }
38 }
39 impl PkColumn for ColumnId {
41 fn is(&self, column: &ColumnCatalog) -> bool {
42 column.column_id() == *self
43 }
44 }
45}
46use pk_column::PkColumn;
47
48pub fn try_purify_table_source_create_sql_ast(
54 mut base: Statement,
55 columns: &[ColumnCatalog],
56 row_id_index: Option<usize>,
57 pk_column_ids: &[impl PkColumn],
58) -> Result<Statement> {
59 if let Some(config) = current::config()
60 && config.read().disable_purify_definition()
61 {
62 current::notice_to_user(
63 "purifying definition is disabled via session config, results may be inaccurate",
64 );
65 return Ok(base);
66 }
67
68 let (Statement::CreateTable {
69 columns: column_defs,
70 constraints,
71 wildcard_idx,
72 include_column_options,
73 ..
74 }
75 | Statement::CreateSource {
76 stmt:
77 CreateSourceStatement {
78 columns: column_defs,
79 constraints,
80 wildcard_idx,
81 include_column_options,
82 ..
83 },
84 }) = &mut base
85 else {
86 bail!("expect `CREATE TABLE` or `CREATE SOURCE` statement, found: `{base:?}`");
87 };
88
89 *wildcard_idx = None;
91
92 let defined_columns = columns.iter().filter(|c| c.is_defined_in_columns_clause());
94
95 let mut purified_column_defs = Vec::new();
97 for column in defined_columns {
98 let mut column_def = if let Some(existing) = column_defs
99 .iter()
100 .find(|c| c.name.real_value() == column.name())
101 {
102 existing.clone()
104 } else {
105 assert!(
106 !column.is_generated(),
107 "generated column must not be inferred"
108 );
109
110 ColumnDef {
112 name: Ident::from_real_value(column.name()),
113 data_type: Some(column.data_type().to_ast()),
114 collation: None,
115 options: Vec::new(), }
117 };
118
119 if let Some(c) = &column.column_desc.generated_or_default_column
121 && let GeneratedOrDefaultColumn::DefaultColumn(desc) = c
122 {
123 let persisted = desc.encode_to_vec().into_boxed_slice();
124
125 let default_value_option = column_def
126 .options
127 .extract_if(.., |o| {
128 matches!(
129 o.option,
130 ColumnOption::DefaultValue { .. }
131 | ColumnOption::DefaultValueInternal { .. }
132 )
133 })
134 .at_most_one()
135 .ok()
136 .context("multiple default value options found")?;
137
138 let expr = default_value_option.and_then(|o| match o.option {
139 ColumnOption::DefaultValue(expr) => Some(expr),
140 ColumnOption::DefaultValueInternal { expr, .. } => expr,
141 _ => unreachable!(),
142 });
143
144 column_def.options.push(ColumnOptionDef {
145 name: None,
146 option: ColumnOption::DefaultValueInternal { persisted, expr },
147 });
148 }
149
150 purified_column_defs.push(column_def);
151 }
152 *column_defs = purified_column_defs;
153
154 let has_pk_column_constraint = column_defs.iter().any(|c| {
156 c.options
157 .iter()
158 .any(|o| matches!(o.option, ColumnOption::Unique { is_primary: true }))
159 });
160 if !has_pk_column_constraint && row_id_index.is_none() {
161 let mut pk_columns = Vec::new();
162
163 for id in pk_column_ids {
164 let column = columns
165 .iter()
166 .find(|c| id.is(c))
167 .context("primary key column not found")?;
168 if !(column.is_defined_in_columns_clause() || column.is_connector_additional_column()) {
170 bail !(
171 "primary key column \"{}\" is not user-defined",
172 column.name()
173 );
174 }
175
176 let name_ident = if let Some(col) = column_defs
178 .iter()
179 .find(|c| c.name.real_value() == column.name())
180 {
181 col.name.clone()
182 } else if let Some(alias) = include_column_options
183 .iter()
184 .filter_map(|c| c.column_alias.as_ref())
185 .find(|c| c.real_value() == column.name())
186 {
187 alias.clone()
188 } else {
189 Ident::from_real_value(column.name())
191 };
192
193 pk_columns.push(name_ident);
194 }
195
196 let pk_constraint = TableConstraint::Unique {
197 name: None,
198 columns: pk_columns,
199 is_primary: true,
200 };
201
202 assert!(
204 constraints.len() <= 1
205 && constraints.iter().all(|c| matches!(
206 c,
207 TableConstraint::Unique {
208 is_primary: true,
209 ..
210 }
211 )),
212 "unexpected table constraints: {constraints:?}",
213 );
214
215 *constraints = vec![pk_constraint];
216 }
217
218 Ok(base)
219}