risingwave_frontend/binder/expr/column.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::DataType;
16use risingwave_sqlparser::ast::Ident;
17
18use crate::binder::{Binder, Clause};
19use crate::error::{ErrorCode, Result};
20use crate::expr::{CorrelatedInputRef, ExprImpl, ExprType, FunctionCall, InputRef, Literal};
21
22impl Binder {
23 pub fn bind_column(&mut self, idents: &[Ident]) -> Result<ExprImpl> {
24 // TODO: check quote style of `ident`.
25 let (_schema_name, table_name, column_name) = match idents {
26 [column] => (None, None, column.real_value()),
27 [table, column] => (None, Some(table.real_value()), column.real_value()),
28 [schema, table, column] => (
29 Some(schema.real_value()),
30 Some(table.real_value()),
31 column.real_value(),
32 ),
33 _ => {
34 return Err(
35 ErrorCode::InternalError(format!("Too many idents: {:?}", idents)).into(),
36 );
37 }
38 };
39
40 // If we find `sql_udf_arguments` in the current context, it means we're binding an inline SQL UDF
41 // (without a layer of subquery). This only happens when the function body is a trivial `SELECT`
42 // statement without any `FROM` clause etc. In this case, the column must be a UDF parameter.
43 if self.is_binding_inline_sql_udf() {
44 return self.bind_sql_udf_parameter(&column_name);
45 }
46
47 match self
48 .context
49 .get_column_binding_indices(&table_name, &column_name)
50 {
51 Ok(mut indices) => {
52 match indices.len() {
53 0 => unreachable!(),
54 1 => {
55 let index = indices[0];
56 let column = &self.context.columns[index];
57 return Ok(
58 InputRef::new(column.index, column.field.data_type.clone()).into()
59 );
60 }
61 _ => {
62 indices.sort(); // make sure we have a consistent result
63 let inputs = indices
64 .iter()
65 .map(|index| {
66 let column = &self.context.columns[*index];
67 InputRef::new(column.index, column.field.data_type.clone()).into()
68 })
69 .collect::<Vec<_>>();
70 return Ok(FunctionCall::new(ExprType::Coalesce, inputs)?.into());
71 }
72 }
73 }
74 Err(e) => {
75 // If the error message is not that the column is not found, throw the error
76 if let ErrorCode::ItemNotFound(_) = e {
77 } else {
78 return Err(e.into());
79 }
80 }
81 }
82
83 // Try to find a correlated column in `upper_contexts`, starting from the innermost context.
84 let mut err = ErrorCode::ItemNotFound(format!("Invalid column: {}", column_name));
85
86 for (i, lateral_context) in self.lateral_contexts.iter().rev().enumerate() {
87 if lateral_context.is_visible {
88 let context = &lateral_context.context;
89 if matches!(context.clause, Some(Clause::Insert)) {
90 continue;
91 }
92 // input ref from lateral context `depth` starts from 1.
93 let depth = i + 1;
94 match context.get_column_binding_index(&table_name, &column_name) {
95 Ok(index) => {
96 let column = &context.columns[index];
97 return Ok(CorrelatedInputRef::new(
98 column.index,
99 column.field.data_type.clone(),
100 depth,
101 )
102 .into());
103 }
104 Err(e) => {
105 err = e;
106 }
107 }
108 }
109 }
110
111 for (i, (context, lateral_contexts)) in
112 self.visible_upper_subquery_contexts_rev().enumerate()
113 {
114 if matches!(context.clause, Some(Clause::Insert)) {
115 continue;
116 }
117 // `depth` starts from 1.
118 let depth = i + 1;
119 match context.get_column_binding_index(&table_name, &column_name) {
120 Ok(index) => {
121 let column = &context.columns[index];
122 return Ok(CorrelatedInputRef::new(
123 column.index,
124 column.field.data_type.clone(),
125 depth,
126 )
127 .into());
128 }
129 Err(e) => {
130 err = e;
131 }
132 }
133
134 for (j, lateral_context) in lateral_contexts.iter().rev().enumerate() {
135 if lateral_context.is_visible {
136 let context = &lateral_context.context;
137 if matches!(context.clause, Some(Clause::Insert)) {
138 continue;
139 }
140 // correlated input ref from lateral context `depth` starts from 1.
141 let depth = i + j + 1;
142 match context.get_column_binding_index(&table_name, &column_name) {
143 Ok(index) => {
144 let column = &context.columns[index];
145 return Ok(CorrelatedInputRef::new(
146 column.index,
147 column.field.data_type.clone(),
148 depth,
149 )
150 .into());
151 }
152 Err(e) => {
153 err = e;
154 }
155 }
156 }
157 }
158 }
159
160 // `CTID` is a system column in postgres.
161 // https://www.postgresql.org/docs/current/ddl-system-columns.html
162 //
163 // We return an empty string here to support some tools such as DataGrip.
164 //
165 // FIXME: The type of `CTID` should be `tid`.
166 // FIXME: The `CTID` column should be unique, so literal may break something.
167 // FIXME: At least we should add a notice here.
168 if let ErrorCode::ItemNotFound(_) = err
169 && column_name == "ctid"
170 {
171 return Ok(Literal::new(Some("".into()), DataType::Varchar).into());
172 }
173
174 // Failed to resolve the column in current context. Now check if it's a sql udf parameter.
175 if let ErrorCode::ItemNotFound(_) = err
176 && self.is_binding_subquery_sql_udf()
177 {
178 return self.bind_sql_udf_parameter(&column_name);
179 }
180
181 Err(err.into())
182 }
183}