risingwave_frontend/binder/expr/
column.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::DataType;
16use risingwave_sqlparser::ast::Ident;
17
18use crate::binder::{Binder, Clause};
19use crate::error::{ErrorCode, Result};
20use crate::expr::{CorrelatedInputRef, ExprImpl, ExprType, FunctionCall, InputRef, Literal};
21
22impl Binder {
23    pub fn bind_column(&mut self, idents: &[Ident]) -> Result<ExprImpl> {
24        // TODO: check quote style of `ident`.
25        let (_schema_name, table_name, column_name) = match idents {
26            [column] => (None, None, column.real_value()),
27            [table, column] => (None, Some(table.real_value()), column.real_value()),
28            [schema, table, column] => (
29                Some(schema.real_value()),
30                Some(table.real_value()),
31                column.real_value(),
32            ),
33            _ => {
34                return Err(
35                    ErrorCode::InternalError(format!("Too many idents: {:?}", idents)).into(),
36                );
37            }
38        };
39
40        // If we find `sql_udf_arguments` in the current context, it means we're binding an inline SQL UDF
41        // (without a layer of subquery). This only happens when the function body is a trivial `SELECT`
42        // statement without any `FROM` clause etc. In this case, the column must be a UDF parameter.
43        if self.is_binding_inline_sql_udf() {
44            return self.bind_sql_udf_parameter(&column_name);
45        }
46
47        match self
48            .context
49            .get_column_binding_indices(&table_name, &column_name)
50        {
51            Ok(mut indices) => {
52                match indices.len() {
53                    0 => unreachable!(),
54                    1 => {
55                        let index = indices[0];
56                        let column = &self.context.columns[index];
57                        return Ok(
58                            InputRef::new(column.index, column.field.data_type.clone()).into()
59                        );
60                    }
61                    _ => {
62                        indices.sort(); // make sure we have a consistent result
63                        let inputs = indices
64                            .iter()
65                            .map(|index| {
66                                let column = &self.context.columns[*index];
67                                InputRef::new(column.index, column.field.data_type.clone()).into()
68                            })
69                            .collect::<Vec<_>>();
70                        return Ok(FunctionCall::new(ExprType::Coalesce, inputs)?.into());
71                    }
72                }
73            }
74            Err(e) => {
75                // If the error message is not that the column is not found, throw the error
76                if let ErrorCode::ItemNotFound(_) = e {
77                } else {
78                    return Err(e.into());
79                }
80            }
81        }
82
83        // Try to find a correlated column in `upper_contexts`, starting from the innermost context.
84        let mut err = ErrorCode::ItemNotFound(format!("Invalid column: {}", column_name));
85
86        for (i, lateral_context) in self.lateral_contexts.iter().rev().enumerate() {
87            if lateral_context.is_visible {
88                let context = &lateral_context.context;
89                if matches!(context.clause, Some(Clause::Insert)) {
90                    continue;
91                }
92                // input ref from lateral context `depth` starts from 1.
93                let depth = i + 1;
94                match context.get_column_binding_index(&table_name, &column_name) {
95                    Ok(index) => {
96                        let column = &context.columns[index];
97                        return Ok(CorrelatedInputRef::new(
98                            column.index,
99                            column.field.data_type.clone(),
100                            depth,
101                        )
102                        .into());
103                    }
104                    Err(e) => {
105                        err = e;
106                    }
107                }
108            }
109        }
110
111        for (i, (context, lateral_contexts)) in
112            self.visible_upper_subquery_contexts_rev().enumerate()
113        {
114            if matches!(context.clause, Some(Clause::Insert)) {
115                continue;
116            }
117            // `depth` starts from 1.
118            let depth = i + 1;
119            match context.get_column_binding_index(&table_name, &column_name) {
120                Ok(index) => {
121                    let column = &context.columns[index];
122                    return Ok(CorrelatedInputRef::new(
123                        column.index,
124                        column.field.data_type.clone(),
125                        depth,
126                    )
127                    .into());
128                }
129                Err(e) => {
130                    err = e;
131                }
132            }
133
134            for (j, lateral_context) in lateral_contexts.iter().rev().enumerate() {
135                if lateral_context.is_visible {
136                    let context = &lateral_context.context;
137                    if matches!(context.clause, Some(Clause::Insert)) {
138                        continue;
139                    }
140                    // correlated input ref from lateral context `depth` starts from 1.
141                    let depth = i + j + 1;
142                    match context.get_column_binding_index(&table_name, &column_name) {
143                        Ok(index) => {
144                            let column = &context.columns[index];
145                            return Ok(CorrelatedInputRef::new(
146                                column.index,
147                                column.field.data_type.clone(),
148                                depth,
149                            )
150                            .into());
151                        }
152                        Err(e) => {
153                            err = e;
154                        }
155                    }
156                }
157            }
158        }
159
160        // `CTID` is a system column in postgres.
161        // https://www.postgresql.org/docs/current/ddl-system-columns.html
162        //
163        // We return an empty string here to support some tools such as DataGrip.
164        //
165        // FIXME: The type of `CTID` should be `tid`.
166        // FIXME: The `CTID` column should be unique, so literal may break something.
167        // FIXME: At least we should add a notice here.
168        if let ErrorCode::ItemNotFound(_) = err
169            && column_name == "ctid"
170        {
171            return Ok(Literal::new(Some("".into()), DataType::Varchar).into());
172        }
173
174        // Failed to resolve the column in current context. Now check if it's a sql udf parameter.
175        if let ErrorCode::ItemNotFound(_) = err
176            && self.is_binding_subquery_sql_udf()
177        {
178            return self.bind_sql_udf_parameter(&column_name);
179        }
180
181        Err(err.into())
182    }
183}