risingwave_frontend/binder/expr/column.rs
1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::DataType;
16use risingwave_sqlparser::ast::Ident;
17
18use crate::binder::{Binder, Clause};
19use crate::error::{ErrorCode, Result};
20use crate::expr::{CorrelatedInputRef, ExprImpl, ExprType, FunctionCall, InputRef, Literal};
21
22impl Binder {
23 pub fn bind_column(&mut self, idents: &[Ident]) -> Result<ExprImpl> {
24 // TODO: check quote style of `ident`.
25 let (schema_name, table_name, column_name) = match idents {
26 [column] => (None, None, column.real_value()),
27 [table, column] => (None, Some(table.real_value()), column.real_value()),
28 [schema, table, column] => (
29 Some(schema.real_value()),
30 Some(table.real_value()),
31 column.real_value(),
32 ),
33 _ => {
34 return Err(
35 ErrorCode::InternalError(format!("Too many idents: {:?}", idents)).into(),
36 );
37 }
38 };
39
40 // If we find `sql_udf_arguments` in the current context, it means we're binding an inline SQL UDF
41 // (without a layer of subquery). This only happens when the function body is a trivial `SELECT`
42 // statement without any `FROM` clause etc. In this case, the column must be a UDF parameter.
43 if self.is_binding_inline_sql_udf() {
44 return self.bind_sql_udf_parameter(&column_name);
45 }
46
47 match self
48 .context
49 .get_column_binding_indices(&schema_name, &table_name, &column_name)
50 {
51 Ok(mut indices) => {
52 match indices.len() {
53 0 => unreachable!(),
54 1 => {
55 let index = indices[0];
56 let column = &self.context.columns[index];
57 return Ok(
58 InputRef::new(column.index, column.field.data_type.clone()).into()
59 );
60 }
61 _ => {
62 indices.sort(); // make sure we have a consistent result
63 let inputs = indices
64 .iter()
65 .map(|index| {
66 let column = &self.context.columns[*index];
67 InputRef::new(column.index, column.field.data_type.clone()).into()
68 })
69 .collect::<Vec<_>>();
70 return Ok(FunctionCall::new(ExprType::Coalesce, inputs)?.into());
71 }
72 }
73 }
74 Err(e) => {
75 // If a column is referenced using three-level qualification and the table has an alias,
76 // prompt the user to use the table alias instead.
77 if let ErrorCode::ItemNotFound(_) = e {
78 if let (Some(schema), Some(table)) = (&schema_name, &table_name)
79 && let Some(index) =
80 self.context.get_table_alias(schema, table, &column_name)?
81 {
82 let column = &self.context.columns[index];
83 return Err(ErrorCode::InvalidReference(format!(
84 "missing FROM-clause entry for table \"{}\"\n\
85 HINT: Perhaps you meant to reference the table alias \"{}\".",
86 table, column.table_name
87 ))
88 .into());
89 };
90 } else {
91 // If the error message is not that the column is not found, throw the error
92 return Err(e.into());
93 }
94 }
95 }
96
97 // Try to find a correlated column in `upper_contexts`, starting from the innermost context.
98 let mut err = ErrorCode::ItemNotFound(format!("Invalid column: {}", column_name));
99
100 for (i, lateral_context) in self.lateral_contexts.iter().rev().enumerate() {
101 if lateral_context.is_visible {
102 let context = &lateral_context.context;
103 if matches!(context.clause, Some(Clause::Insert)) {
104 continue;
105 }
106 // input ref from lateral context `depth` starts from 1.
107 let depth = i + 1;
108 match context.get_column_binding_index(&schema_name, &table_name, &column_name) {
109 Ok(index) => {
110 let column = &context.columns[index];
111 return Ok(CorrelatedInputRef::new(
112 column.index,
113 column.field.data_type.clone(),
114 depth,
115 )
116 .into());
117 }
118 Err(e) => {
119 err = e;
120 }
121 }
122 }
123 }
124
125 for (i, (context, lateral_contexts)) in
126 self.visible_upper_subquery_contexts_rev().enumerate()
127 {
128 if matches!(context.clause, Some(Clause::Insert)) {
129 continue;
130 }
131 // `depth` starts from 1.
132 let depth = i + 1;
133 match context.get_column_binding_index(&schema_name, &table_name, &column_name) {
134 Ok(index) => {
135 let column = &context.columns[index];
136 return Ok(CorrelatedInputRef::new(
137 column.index,
138 column.field.data_type.clone(),
139 depth,
140 )
141 .into());
142 }
143 Err(e) => {
144 err = e;
145 }
146 }
147
148 for (j, lateral_context) in lateral_contexts.iter().rev().enumerate() {
149 if lateral_context.is_visible {
150 let context = &lateral_context.context;
151 if matches!(context.clause, Some(Clause::Insert)) {
152 continue;
153 }
154 // correlated input ref from lateral context `depth` starts from 1.
155 let depth = i + j + 1;
156 match context.get_column_binding_index(&schema_name, &table_name, &column_name)
157 {
158 Ok(index) => {
159 let column = &context.columns[index];
160 return Ok(CorrelatedInputRef::new(
161 column.index,
162 column.field.data_type.clone(),
163 depth,
164 )
165 .into());
166 }
167 Err(e) => {
168 err = e;
169 }
170 }
171 }
172 }
173 }
174
175 // `CTID` is a system column in postgres.
176 // https://www.postgresql.org/docs/current/ddl-system-columns.html
177 //
178 // We return an empty string here to support some tools such as DataGrip.
179 //
180 // FIXME: The type of `CTID` should be `tid`.
181 // FIXME: The `CTID` column should be unique, so literal may break something.
182 // FIXME: At least we should add a notice here.
183 if let ErrorCode::ItemNotFound(_) = err
184 && column_name == "ctid"
185 {
186 return Ok(Literal::new(Some("".into()), DataType::Varchar).into());
187 }
188
189 // Failed to resolve the column in current context. Now check if it's a sql udf parameter.
190 if let ErrorCode::ItemNotFound(_) = err
191 && self.is_binding_subquery_sql_udf()
192 {
193 return self.bind_sql_udf_parameter(&column_name);
194 }
195
196 Err(err.into())
197 }
198}