risingwave_frontend/binder/relation/
join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::plan_common::JoinType;
16use risingwave_sqlparser::ast::{
17    BinaryOperator, Expr, Ident, JoinConstraint, JoinOperator, TableFactor, TableWithJoins, Value,
18};
19
20use crate::binder::bind_context::BindContext;
21use crate::binder::statement::RewriteExprsRecursive;
22use crate::binder::{Binder, COLUMN_GROUP_PREFIX, Clause, Relation};
23use crate::error::{ErrorCode, Result};
24use crate::expr::ExprImpl;
25
26#[derive(Debug, Clone)]
27pub struct BoundJoin {
28    pub join_type: JoinType,
29    pub left: Relation,
30    pub right: Relation,
31    pub cond: ExprImpl,
32}
33
34impl RewriteExprsRecursive for BoundJoin {
35    fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
36        self.left.rewrite_exprs_recursive(rewriter);
37        self.right.rewrite_exprs_recursive(rewriter);
38        self.cond = rewriter.rewrite_expr(self.cond.take());
39    }
40}
41
42impl Binder {
43    pub(crate) fn bind_vec_table_with_joins(
44        &mut self,
45        from: Vec<TableWithJoins>,
46    ) -> Result<Option<Relation>> {
47        let mut from_iter = from.into_iter();
48        let first = match from_iter.next() {
49            Some(t) => t,
50            None => return Ok(None),
51        };
52        self.push_lateral_context();
53        let mut root = self.bind_table_with_joins(first)?;
54        self.pop_and_merge_lateral_context()?;
55        for t in from_iter {
56            self.push_lateral_context();
57            let right = self.bind_table_with_joins(t.clone())?;
58            self.pop_and_merge_lateral_context()?;
59
60            let is_lateral = match &right {
61                Relation::Subquery(subquery) if subquery.lateral => true,
62                Relation::TableFunction { .. } => true,
63                _ => false,
64            };
65
66            root = if is_lateral {
67                Relation::Apply(Box::new(BoundJoin {
68                    join_type: JoinType::Inner,
69                    left: root,
70                    right,
71                    cond: ExprImpl::literal_bool(true),
72                }))
73            } else {
74                Relation::Join(Box::new(BoundJoin {
75                    join_type: JoinType::Inner,
76                    left: root,
77                    right,
78                    cond: ExprImpl::literal_bool(true),
79                }))
80            }
81        }
82        Ok(Some(root))
83    }
84
85    pub(crate) fn bind_table_with_joins(&mut self, table: TableWithJoins) -> Result<Relation> {
86        let mut root = self.bind_table_factor(table.relation)?;
87        for join in table.joins {
88            let (constraint, join_type) = match join.join_operator {
89                JoinOperator::Inner(constraint) => (constraint, JoinType::Inner),
90                JoinOperator::LeftOuter(constraint) => (constraint, JoinType::LeftOuter),
91                JoinOperator::RightOuter(constraint) => (constraint, JoinType::RightOuter),
92                JoinOperator::FullOuter(constraint) => (constraint, JoinType::FullOuter),
93                // Cross join equals to inner join with with no constraint.
94                JoinOperator::CrossJoin => (JoinConstraint::None, JoinType::Inner),
95                JoinOperator::AsOfInner(constraint) => (constraint, JoinType::AsofInner),
96                JoinOperator::AsOfLeft(constraint) => (constraint, JoinType::AsofLeftOuter),
97            };
98            let right: Relation;
99            let cond: ExprImpl;
100            if matches!(
101                constraint.clone(),
102                JoinConstraint::Using(_) | JoinConstraint::Natural
103            ) {
104                let option_rel: Option<Relation>;
105                (cond, option_rel) =
106                    self.bind_join_constraint(constraint, Some(join.relation), join_type)?;
107                right = option_rel.unwrap();
108            } else {
109                right = self.bind_table_factor(join.relation.clone())?;
110                (cond, _) = self.bind_join_constraint(constraint, None, join_type)?;
111            }
112
113            let is_lateral = match &right {
114                Relation::Subquery(subquery) if subquery.lateral => true,
115                Relation::TableFunction { .. } => true,
116                _ => false,
117            };
118
119            root = if is_lateral {
120                match join_type {
121                    JoinType::Inner | JoinType::LeftOuter => {}
122                    _ => {
123                        return Err(ErrorCode::InvalidInputSyntax("The combining JOIN type must be INNER or LEFT for a LATERAL reference.".to_owned())
124                            .into());
125                    }
126                }
127
128                Relation::Apply(Box::new(BoundJoin {
129                    join_type,
130                    left: root,
131                    right,
132                    cond,
133                }))
134            } else {
135                Relation::Join(Box::new(BoundJoin {
136                    join_type,
137                    left: root,
138                    right,
139                    cond,
140                }))
141            };
142        }
143
144        Ok(root)
145    }
146
147    fn bind_join_constraint(
148        &mut self,
149        constraint: JoinConstraint,
150        table_factor: Option<TableFactor>,
151        join_type: JoinType,
152    ) -> Result<(ExprImpl, Option<Relation>)> {
153        Ok(match constraint {
154            JoinConstraint::None => (ExprImpl::literal_bool(true), None),
155            c @ JoinConstraint::Natural | c @ JoinConstraint::Using(_) => {
156                // First, we identify columns with the same name.
157                let old_context = self.context.clone();
158                let l_len = old_context.columns.len();
159                // Bind this table factor to an empty context
160                self.push_lateral_context();
161                let table_factor = table_factor.unwrap();
162                let relation = self.bind_table_factor(table_factor)?;
163
164                let using_columns = match c {
165                    JoinConstraint::Natural => None,
166                    JoinConstraint::Using(cols) => {
167                        // sanity check
168                        for col in &cols {
169                            if !old_context.indices_of.contains_key(&col.real_value()) {
170                                return Err(ErrorCode::ItemNotFound(format!("column \"{}\" specified in USING clause does not exist in left table", col.real_value())).into());
171                            }
172                            if !self.context.indices_of.contains_key(&col.real_value()) {
173                                return Err(ErrorCode::ItemNotFound(format!("column \"{}\" specified in USING clause does not exist in right table", col.real_value())).into());
174                            }
175                        }
176                        Some(cols)
177                    }
178                    _ => unreachable!(),
179                };
180
181                let mut columns = self
182                    .context
183                    .indices_of
184                    .iter()
185                    .filter(|(_, idxs)| idxs.iter().all(|i| !self.context.columns[*i].is_hidden))
186                    .map(|(s, idxes)| (Ident::new_unchecked(s.to_owned()), idxes))
187                    .collect::<Vec<_>>();
188                columns.sort_by(|a, b| a.0.real_value().cmp(&b.0.real_value()));
189
190                let mut col_indices = Vec::new();
191                let mut binary_expr = Expr::Value(Value::Boolean(true));
192
193                // Walk the RHS cols, checking to see if any share a name with any LHS cols
194                for (column, indices_r) in columns {
195                    // TODO: is it ok to ignore quote style?
196                    // If we have a `USING` constraint, we only bind the columns appearing in the
197                    // constraint.
198                    if let Some(cols) = &using_columns
199                        && !cols.contains(&column)
200                    {
201                        continue;
202                    }
203                    let indices_l = match old_context.get_unqualified_indices(&column.real_value())
204                    {
205                        Err(e) => {
206                            if let ErrorCode::ItemNotFound(_) = e {
207                                continue;
208                            } else {
209                                return Err(e.into());
210                            }
211                        }
212                        Ok(idxs) => idxs,
213                    };
214                    // Select at most one column from each natural column group from left and right
215                    col_indices.push((indices_l[0], indices_r[0] + l_len));
216                    let left_expr = Self::get_identifier_from_indices(
217                        &old_context,
218                        &indices_l,
219                        column.clone(),
220                    )?;
221                    let right_expr = Self::get_identifier_from_indices(
222                        &self.context,
223                        indices_r,
224                        column.clone(),
225                    )?;
226                    binary_expr = Expr::BinaryOp {
227                        left: Box::new(binary_expr),
228                        op: BinaryOperator::And,
229                        right: Box::new(Expr::BinaryOp {
230                            left: Box::new(left_expr),
231                            op: BinaryOperator::Eq,
232                            right: Box::new(right_expr),
233                        }),
234                    }
235                }
236                self.pop_and_merge_lateral_context()?;
237                // Bind the expression first, before allowing disambiguation of the columns involved
238                // in the join
239                let expr = self.bind_expr(binary_expr)?;
240                for (l, r) in col_indices {
241                    let non_nullable = match join_type {
242                        JoinType::LeftOuter | JoinType::Inner => Some(l),
243                        JoinType::RightOuter => Some(r),
244                        JoinType::FullOuter => None,
245                        _ => unreachable!(),
246                    };
247                    self.context.add_natural_columns(l, r, non_nullable);
248                }
249                (expr, Some(relation))
250            }
251            JoinConstraint::On(expr) => {
252                let clause = self.context.clause;
253                self.context.clause = Some(Clause::JoinOn);
254                let bound_expr: ExprImpl = self
255                    .bind_expr(expr)
256                    .and_then(|expr| expr.enforce_bool_clause("JOIN ON"))?;
257                self.context.clause = clause;
258                (bound_expr, None)
259            }
260        })
261    }
262
263    fn get_identifier_from_indices(
264        context: &BindContext,
265        indices: &[usize],
266        column: Ident,
267    ) -> Result<Expr> {
268        if indices.len() == 1 {
269            let right_table = context.columns[indices[0]].table_name.clone();
270            Ok(Expr::CompoundIdentifier(vec![
271                Ident::new_unchecked(right_table),
272                column,
273            ]))
274        } else if let Some(group_id) = context.column_group_context.mapping.get(&indices[0]) {
275            Ok(Expr::CompoundIdentifier(vec![
276                Ident::new_unchecked(format!("{COLUMN_GROUP_PREFIX}{}", group_id)),
277                column,
278            ]))
279        } else {
280            Err(
281                ErrorCode::InternalError(format!("Ambiguous column name: {}", column.real_value()))
282                    .into(),
283            )
284        }
285    }
286}