risingwave_frontend/binder/relation/
join.rs1use risingwave_pb::plan_common::JoinType;
16use risingwave_sqlparser::ast::{
17 BinaryOperator, Expr, Ident, JoinConstraint, JoinOperator, TableFactor, TableWithJoins, Value,
18};
19
20use crate::binder::bind_context::BindContext;
21use crate::binder::statement::RewriteExprsRecursive;
22use crate::binder::{Binder, COLUMN_GROUP_PREFIX, Clause, Relation};
23use crate::error::{ErrorCode, Result};
24use crate::expr::ExprImpl;
25
26#[derive(Debug, Clone)]
27pub struct BoundJoin {
28 pub join_type: JoinType,
29 pub left: Relation,
30 pub right: Relation,
31 pub cond: ExprImpl,
32}
33
34impl RewriteExprsRecursive for BoundJoin {
35 fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
36 self.left.rewrite_exprs_recursive(rewriter);
37 self.right.rewrite_exprs_recursive(rewriter);
38 self.cond = rewriter.rewrite_expr(self.cond.take());
39 }
40}
41
42impl Binder {
43 pub(crate) fn bind_vec_table_with_joins(
44 &mut self,
45 from: Vec<TableWithJoins>,
46 ) -> Result<Option<Relation>> {
47 let mut from_iter = from.into_iter();
48 let first = match from_iter.next() {
49 Some(t) => t,
50 None => return Ok(None),
51 };
52 self.push_lateral_context();
53 let mut root = self.bind_table_with_joins(first)?;
54 self.pop_and_merge_lateral_context()?;
55 for t in from_iter {
56 self.push_lateral_context();
57 let right = self.bind_table_with_joins(t.clone())?;
58 self.pop_and_merge_lateral_context()?;
59
60 let is_lateral = match &right {
61 Relation::Subquery(subquery) if subquery.lateral => true,
62 Relation::TableFunction { .. } => true,
63 _ => false,
64 };
65
66 root = if is_lateral {
67 Relation::Apply(Box::new(BoundJoin {
68 join_type: JoinType::Inner,
69 left: root,
70 right,
71 cond: ExprImpl::literal_bool(true),
72 }))
73 } else {
74 Relation::Join(Box::new(BoundJoin {
75 join_type: JoinType::Inner,
76 left: root,
77 right,
78 cond: ExprImpl::literal_bool(true),
79 }))
80 }
81 }
82 Ok(Some(root))
83 }
84
85 pub(crate) fn bind_table_with_joins(&mut self, table: TableWithJoins) -> Result<Relation> {
86 let mut root = self.bind_table_factor(table.relation)?;
87 for join in table.joins {
88 let (constraint, join_type) = match join.join_operator {
89 JoinOperator::Inner(constraint) => (constraint, JoinType::Inner),
90 JoinOperator::LeftOuter(constraint) => (constraint, JoinType::LeftOuter),
91 JoinOperator::RightOuter(constraint) => (constraint, JoinType::RightOuter),
92 JoinOperator::FullOuter(constraint) => (constraint, JoinType::FullOuter),
93 JoinOperator::CrossJoin => (JoinConstraint::None, JoinType::Inner),
95 JoinOperator::AsOfInner(constraint) => (constraint, JoinType::AsofInner),
96 JoinOperator::AsOfLeft(constraint) => (constraint, JoinType::AsofLeftOuter),
97 };
98 let right: Relation;
99 let cond: ExprImpl;
100 if matches!(
101 constraint.clone(),
102 JoinConstraint::Using(_) | JoinConstraint::Natural
103 ) {
104 let option_rel: Option<Relation>;
105 (cond, option_rel) =
106 self.bind_join_constraint(constraint, Some(join.relation), join_type)?;
107 right = option_rel.unwrap();
108 } else {
109 right = self.bind_table_factor(join.relation.clone())?;
110 (cond, _) = self.bind_join_constraint(constraint, None, join_type)?;
111 }
112
113 let is_lateral = match &right {
114 Relation::Subquery(subquery) if subquery.lateral => true,
115 Relation::TableFunction { .. } => true,
116 _ => false,
117 };
118
119 root = if is_lateral {
120 match join_type {
121 JoinType::Inner | JoinType::LeftOuter => {}
122 _ => {
123 return Err(ErrorCode::InvalidInputSyntax("The combining JOIN type must be INNER or LEFT for a LATERAL reference.".to_owned())
124 .into());
125 }
126 }
127
128 Relation::Apply(Box::new(BoundJoin {
129 join_type,
130 left: root,
131 right,
132 cond,
133 }))
134 } else {
135 Relation::Join(Box::new(BoundJoin {
136 join_type,
137 left: root,
138 right,
139 cond,
140 }))
141 };
142 }
143
144 Ok(root)
145 }
146
147 fn bind_join_constraint(
148 &mut self,
149 constraint: JoinConstraint,
150 table_factor: Option<TableFactor>,
151 join_type: JoinType,
152 ) -> Result<(ExprImpl, Option<Relation>)> {
153 Ok(match constraint {
154 JoinConstraint::None => (ExprImpl::literal_bool(true), None),
155 c @ JoinConstraint::Natural | c @ JoinConstraint::Using(_) => {
156 let old_context = self.context.clone();
158 let l_len = old_context.columns.len();
159 self.push_lateral_context();
161 let table_factor = table_factor.unwrap();
162 let relation = self.bind_table_factor(table_factor)?;
163
164 let using_columns = match c {
165 JoinConstraint::Natural => None,
166 JoinConstraint::Using(cols) => {
167 for col in &cols {
169 if !old_context.indices_of.contains_key(&col.real_value()) {
170 return Err(ErrorCode::ItemNotFound(format!("column \"{}\" specified in USING clause does not exist in left table", col.real_value())).into());
171 }
172 if !self.context.indices_of.contains_key(&col.real_value()) {
173 return Err(ErrorCode::ItemNotFound(format!("column \"{}\" specified in USING clause does not exist in right table", col.real_value())).into());
174 }
175 }
176 Some(cols)
177 }
178 _ => unreachable!(),
179 };
180
181 let mut columns = self
182 .context
183 .indices_of
184 .iter()
185 .filter(|(_, idxs)| idxs.iter().all(|i| !self.context.columns[*i].is_hidden))
186 .map(|(s, idxes)| (Ident::new_unchecked(s.to_owned()), idxes))
187 .collect::<Vec<_>>();
188 columns.sort_by(|a, b| a.0.real_value().cmp(&b.0.real_value()));
189
190 let mut col_indices = Vec::new();
191 let mut binary_expr = Expr::Value(Value::Boolean(true));
192
193 for (column, indices_r) in columns {
195 if let Some(cols) = &using_columns
199 && !cols.contains(&column)
200 {
201 continue;
202 }
203 let indices_l = match old_context.get_unqualified_indices(&column.real_value())
204 {
205 Err(e) => {
206 if let ErrorCode::ItemNotFound(_) = e {
207 continue;
208 } else {
209 return Err(e.into());
210 }
211 }
212 Ok(idxs) => idxs,
213 };
214 col_indices.push((indices_l[0], indices_r[0] + l_len));
216 let left_expr = Self::get_identifier_from_indices(
217 &old_context,
218 &indices_l,
219 column.clone(),
220 )?;
221 let right_expr = Self::get_identifier_from_indices(
222 &self.context,
223 indices_r,
224 column.clone(),
225 )?;
226 binary_expr = Expr::BinaryOp {
227 left: Box::new(binary_expr),
228 op: BinaryOperator::And,
229 right: Box::new(Expr::BinaryOp {
230 left: Box::new(left_expr),
231 op: BinaryOperator::Eq,
232 right: Box::new(right_expr),
233 }),
234 }
235 }
236 self.pop_and_merge_lateral_context()?;
237 let expr = self.bind_expr(binary_expr)?;
240 for (l, r) in col_indices {
241 let non_nullable = match join_type {
242 JoinType::LeftOuter | JoinType::Inner => Some(l),
243 JoinType::RightOuter => Some(r),
244 JoinType::FullOuter => None,
245 _ => unreachable!(),
246 };
247 self.context.add_natural_columns(l, r, non_nullable);
248 }
249 (expr, Some(relation))
250 }
251 JoinConstraint::On(expr) => {
252 let clause = self.context.clause;
253 self.context.clause = Some(Clause::JoinOn);
254 let bound_expr: ExprImpl = self
255 .bind_expr(expr)
256 .and_then(|expr| expr.enforce_bool_clause("JOIN ON"))?;
257 self.context.clause = clause;
258 (bound_expr, None)
259 }
260 })
261 }
262
263 fn get_identifier_from_indices(
264 context: &BindContext,
265 indices: &[usize],
266 column: Ident,
267 ) -> Result<Expr> {
268 if indices.len() == 1 {
269 let right_table = context.columns[indices[0]].table_name.clone();
270 Ok(Expr::CompoundIdentifier(vec![
271 Ident::new_unchecked(right_table),
272 column,
273 ]))
274 } else if let Some(group_id) = context.column_group_context.mapping.get(&indices[0]) {
275 Ok(Expr::CompoundIdentifier(vec![
276 Ident::new_unchecked(format!("{COLUMN_GROUP_PREFIX}{}", group_id)),
277 column,
278 ]))
279 } else {
280 Err(
281 ErrorCode::InternalError(format!("Ambiguous column name: {}", column.real_value()))
282 .into(),
283 )
284 }
285 }
286}