1use std::collections::HashMap;
16
17use itertools::Itertools;
18use risingwave_common::bail_not_implemented;
19use risingwave_common::catalog::Schema;
20use risingwave_common::types::DataType;
21use risingwave_common::util::iter_util::ZipEqFast;
22use risingwave_common::util::sort_util::ColumnOrder;
23use risingwave_expr::ExprError;
24use risingwave_pb::plan_common::JoinType;
25
26use crate::OptimizerContextRef;
27use crate::binder::{BoundDistinct, BoundSelect};
28use crate::error::{ErrorCode, Result};
29use crate::expr::{
30 CorrelatedId, Expr, ExprImpl, ExprRewriter, ExprType, FunctionCall, InputRef, Subquery,
31 SubqueryKind,
32};
33pub use crate::optimizer::plan_node::LogicalFilter;
34use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef, Project, ProjectBuilder};
35use crate::optimizer::plan_node::{
36 LogicalAgg, LogicalApply, LogicalDedup, LogicalJoin, LogicalOverWindow, LogicalProject,
37 LogicalProjectSet, LogicalTopN, LogicalValues, PlanAggCall, PlanRef,
38};
39use crate::optimizer::property::Order;
40use crate::planner::Planner;
41use crate::utils::{Condition, IndexSet};
42
43impl Planner {
44 pub(super) fn plan_select(
45 &mut self,
46 BoundSelect {
47 from,
48 where_clause,
49 mut select_items,
50 group_by,
51 mut having,
52 distinct,
53 ..
54 }: BoundSelect,
55 extra_order_exprs: Vec<ExprImpl>,
56 order: &[ColumnOrder],
57 ) -> Result<PlanRef> {
58 if distinct.is_distinct() && !extra_order_exprs.is_empty() {
60 return Err(ErrorCode::InvalidInputSyntax(
61 "for SELECT DISTINCT, ORDER BY expressions must appear in select list".into(),
62 )
63 .into());
64 }
65 select_items.extend(extra_order_exprs);
66 if let BoundDistinct::DistinctOn(exprs) = &distinct {
68 let mut distinct_on_exprs: HashMap<ExprImpl, bool> =
69 exprs.iter().map(|expr| (expr.clone(), false)).collect();
70 let mut uncovered_distinct_on_exprs_cnt = distinct_on_exprs.len();
71 let mut order_iter = order.iter().map(|o| &select_items[o.column_index]);
72 while uncovered_distinct_on_exprs_cnt > 0
73 && let Some(order_expr) = order_iter.next()
74 {
75 match distinct_on_exprs.get_mut(order_expr) {
76 Some(has_been_covered) => {
77 if !*has_been_covered {
78 *has_been_covered = true;
79 uncovered_distinct_on_exprs_cnt -= 1;
80 }
81 }
82 None => {
83 return Err(ErrorCode::InvalidInputSyntax(
84 "the SELECT DISTINCT ON expressions must match the leftmost ORDER BY expressions"
85 .into(),
86 )
87 .into());
88 }
89 }
90 }
91 }
92
93 let mut root = match from {
95 None => self.create_dummy_values(),
96 Some(t) => self.plan_relation(t)?,
97 };
98 if let Some(where_clause) = where_clause {
100 root = self.plan_where(root, where_clause)?;
101 }
102 let has_agg_call = select_items.iter().any(|expr| expr.has_agg_call());
105 if !group_by.is_empty() || having.is_some() || has_agg_call {
106 (root, select_items, having) =
107 LogicalAgg::create(select_items, group_by, having, root)?;
108 }
109
110 if let Some(having) = having {
111 root = self.plan_where(root, having)?;
112 }
113
114 if select_items.iter().any(|e| e.has_subquery()) {
115 (root, select_items) =
116 self.substitute_subqueries_in_cross_join_way(root, select_items)?;
117 }
118 if select_items.iter().any(|e| e.has_window_function()) {
119 (root, select_items) = LogicalOverWindow::create(root, select_items)?;
120 }
121
122 let original_select_items_len = select_items.len();
123
124 let mut distinct_list_index_to_select_items_index = vec![];
127 if let BoundDistinct::DistinctOn(distinct_list) = &distinct {
128 distinct_list_index_to_select_items_index.reserve(distinct_list.len());
129 let mut builder_index_to_select_items_index =
130 Vec::with_capacity(original_select_items_len);
131 let mut input_proj_builder = ProjectBuilder::default();
132 for (select_item_index, select_item) in select_items.iter().enumerate() {
133 let builder_index = input_proj_builder
134 .add_expr(select_item)
135 .map_err(|msg| ExprError::UnsupportedFunction(String::from(msg)))?;
136 if builder_index >= builder_index_to_select_items_index.len() {
137 debug_assert_eq!(builder_index, builder_index_to_select_items_index.len());
138 builder_index_to_select_items_index.push(select_item_index);
139 }
140 }
141 for distinct_expr in distinct_list {
142 let builder_index = input_proj_builder
143 .add_expr(distinct_expr)
144 .map_err(|msg| ExprError::UnsupportedFunction(String::from(msg)))?;
145 if builder_index >= builder_index_to_select_items_index.len() {
146 debug_assert_eq!(builder_index, builder_index_to_select_items_index.len());
147 select_items.push(distinct_expr.clone());
148 builder_index_to_select_items_index.push(select_items.len() - 1);
149 }
150 distinct_list_index_to_select_items_index
151 .push(builder_index_to_select_items_index[builder_index]);
152 }
153 }
154
155 let need_restore_select_items = select_items.len() > original_select_items_len;
156
157 root = LogicalProjectSet::create(root, select_items);
158
159 if matches!(&distinct, BoundDistinct::DistinctOn(_)) {
160 root = if order.is_empty() {
161 LogicalDedup::new(root, distinct_list_index_to_select_items_index).into()
164 } else {
165 LogicalTopN::new(
166 root,
167 1,
168 0,
169 false,
170 Order::new(order.to_vec()),
171 distinct_list_index_to_select_items_index,
172 )
173 .into()
174 };
175 }
176
177 if need_restore_select_items {
178 root = LogicalProject::with_core(Project::with_out_col_idx(
179 root,
180 0..original_select_items_len,
181 ))
182 .into();
183 }
184
185 if let BoundDistinct::Distinct = distinct {
186 let fields = root.schema().fields();
187 let group_key = if let Some(field) = fields.first()
188 && field.name == "projected_row_id"
189 {
190 (1..fields.len()).collect()
192 } else {
193 (0..fields.len()).collect()
194 };
195 root = Agg::new(vec![], group_key, root).into();
196 }
197
198 Ok(root)
199 }
200
201 fn create_dummy_values(&self) -> PlanRef {
204 LogicalValues::create(vec![vec![]], Schema::default(), self.ctx.clone())
205 }
206
207 fn create_exists(&self, input: PlanRef) -> Result<PlanRef> {
210 let count_star = Agg::new(vec![PlanAggCall::count_star()], IndexSet::empty(), input);
211 let ge = FunctionCall::new(
212 ExprType::GreaterThanOrEqual,
213 vec![
214 InputRef::new(0, DataType::Int64).into(),
215 ExprImpl::literal_int(1),
216 ],
217 )
218 .unwrap();
219 Ok(LogicalProject::create(count_star.into(), vec![ge.into()]))
220 }
221
222 pub(super) fn plan_where(
227 &mut self,
228 mut input: PlanRef,
229 where_clause: ExprImpl,
230 ) -> Result<PlanRef> {
231 if !where_clause.has_subquery() {
232 return Ok(LogicalFilter::create_with_expr(input, where_clause));
233 }
234 let (subquery_conjunctions, not_subquery_conjunctions, others) =
235 Condition::with_expr(where_clause)
236 .group_by::<_, 3>(|expr| match expr {
237 ExprImpl::Subquery(_) => 0,
238 ExprImpl::FunctionCall(func_call)
239 if func_call.func_type() == ExprType::Not
240 && matches!(func_call.inputs()[0], ExprImpl::Subquery(_)) =>
241 {
242 1
243 }
244 _ => 2,
245 })
246 .into_iter()
247 .next_tuple()
248 .unwrap();
249
250 for expr in subquery_conjunctions {
252 self.handle_exists_and_in(expr, false, &mut input)?;
253 }
254
255 for expr in not_subquery_conjunctions {
257 let not = expr.into_function_call().unwrap();
258 let (_, expr) = not.decompose_as_unary();
259 self.handle_exists_and_in(expr, true, &mut input)?;
260 }
261
262 if others.always_true() {
263 Ok(input)
264 } else {
265 let (input, others) =
266 self.substitute_subqueries_in_left_deep_tree_way(input, others.conjunctions)?;
267 Ok(LogicalFilter::create(
268 input,
269 Condition {
270 conjunctions: others,
271 },
272 ))
273 }
274 }
275
276 fn handle_exists_and_in(
281 &mut self,
282 expr: ExprImpl,
283 negated: bool,
284 input: &mut PlanRef,
285 ) -> Result<()> {
286 let join_type = if negated {
287 JoinType::LeftAnti
288 } else {
289 JoinType::LeftSemi
290 };
291 let correlated_id = self.ctx.next_correlated_id();
292 let mut subquery = expr.into_subquery().unwrap();
293 let mut correlated_indices = subquery
297 .query
298 .collect_correlated_indices_by_depth_and_assign_id(0, correlated_id);
299 correlated_indices.sort();
300 correlated_indices.dedup();
301 let output_column_type = subquery.query.data_types()[0].clone();
302 let right_plan = self.plan_query(subquery.query)?.into_unordered_subplan();
303 let on = match subquery.kind {
304 SubqueryKind::Existential => ExprImpl::literal_bool(true),
305 SubqueryKind::In(left_expr) => {
306 let right_expr = InputRef::new(input.schema().len(), output_column_type);
307 FunctionCall::new(ExprType::Equal, vec![left_expr, right_expr.into()])?.into()
308 }
309 kind => bail_not_implemented!(issue = 1343, "Not supported subquery kind: {:?}", kind),
310 };
311 *input = Self::create_apply(
312 correlated_id,
313 correlated_indices,
314 input.clone(),
315 right_plan,
316 on,
317 join_type,
318 false,
319 );
320 Ok(())
321 }
322
323 pub(super) fn substitute_subqueries_in_left_deep_tree_way(
347 &mut self,
348 mut root: PlanRef,
349 mut exprs: Vec<ExprImpl>,
350 ) -> Result<(PlanRef, Vec<ExprImpl>)> {
351 struct SubstituteSubQueries {
352 input_col_num: usize,
353 subqueries: Vec<Subquery>,
354 correlated_indices_collection: Vec<Vec<usize>>,
355 correlated_ids: Vec<CorrelatedId>,
356 ctx: OptimizerContextRef,
357 }
358
359 impl ExprRewriter for SubstituteSubQueries {
361 fn rewrite_subquery(&mut self, mut subquery: Subquery) -> ExprImpl {
362 let correlated_id = self.ctx.next_correlated_id();
363 self.correlated_ids.push(correlated_id);
364 let input_ref = InputRef::new(self.input_col_num, subquery.return_type()).into();
365 self.input_col_num += 1;
366 self.correlated_indices_collection.push(
367 subquery.collect_correlated_indices_by_depth_and_assign_id(0, correlated_id),
368 );
369 self.subqueries.push(subquery);
370 input_ref
371 }
372 }
373
374 let mut rewriter = SubstituteSubQueries {
375 input_col_num: root.schema().len(),
376 subqueries: vec![],
377 correlated_indices_collection: vec![],
378 correlated_ids: vec![],
379 ctx: self.ctx.clone(),
380 };
381 exprs = exprs
382 .into_iter()
383 .map(|e| rewriter.rewrite_expr(e))
384 .collect();
385
386 for ((subquery, correlated_indices), correlated_id) in rewriter
387 .subqueries
388 .into_iter()
389 .zip_eq_fast(rewriter.correlated_indices_collection)
390 .zip_eq_fast(rewriter.correlated_ids)
391 {
392 let return_type = subquery.return_type();
393 let subroot = self.plan_query(subquery.query)?;
394
395 let right = match subquery.kind {
396 SubqueryKind::Scalar => subroot.into_unordered_subplan(),
397 SubqueryKind::UpdateSet => {
398 let plan = subroot.into_unordered_subplan();
399
400 let all_input_refs = plan
402 .schema()
403 .data_types()
404 .into_iter()
405 .enumerate()
406 .map(|(i, data_type)| InputRef::new(i, data_type).into())
407 .collect::<Vec<_>>();
408 let call =
409 FunctionCall::new_unchecked(ExprType::Row, all_input_refs, return_type);
410
411 LogicalProject::create(plan, vec![call.into()])
412 }
413 SubqueryKind::Existential => {
414 self.create_exists(subroot.into_unordered_subplan())?
415 }
416 SubqueryKind::Array => subroot.into_array_agg()?,
417 _ => bail_not_implemented!(issue = 1343, "{:?}", subquery.kind),
418 };
419
420 root = Self::create_apply(
421 correlated_id,
422 correlated_indices,
423 root,
424 right,
425 ExprImpl::literal_bool(true),
426 JoinType::LeftOuter,
427 true,
428 );
429 }
430 Ok((root, exprs))
431 }
432
433 pub(super) fn substitute_subqueries_in_cross_join_way(
457 &mut self,
458 mut root: PlanRef,
459 mut exprs: Vec<ExprImpl>,
460 ) -> Result<(PlanRef, Vec<ExprImpl>)> {
461 struct SubstituteSubQueries {
462 input_col_num: usize,
463 subqueries: Vec<Subquery>,
464 correlated_id: Option<CorrelatedId>,
465 correlated_indices_collection: Vec<Vec<usize>>,
466 ctx: OptimizerContextRef,
467 }
468
469 impl ExprRewriter for SubstituteSubQueries {
470 fn rewrite_subquery(&mut self, mut subquery: Subquery) -> ExprImpl {
471 if self.correlated_id.is_none() {
472 self.correlated_id = Some(self.ctx.next_correlated_id());
473 }
474 let input_ref = InputRef::new(self.input_col_num, subquery.return_type()).into();
475 self.input_col_num += 1;
476 self.correlated_indices_collection.push(
477 subquery.collect_correlated_indices_by_depth_and_assign_id(
478 0,
479 self.correlated_id.unwrap(),
480 ),
481 );
482 self.subqueries.push(subquery);
483 input_ref
484 }
485 }
486
487 let mut rewriter = SubstituteSubQueries {
488 input_col_num: root.schema().len(),
489 subqueries: vec![],
490 correlated_id: None,
491 correlated_indices_collection: vec![],
492 ctx: self.ctx.clone(),
493 };
494 exprs = exprs
495 .into_iter()
496 .map(|e| rewriter.rewrite_expr(e))
497 .collect();
498
499 let mut right = None;
500
501 for subquery in rewriter.subqueries {
502 let return_type = subquery.return_type();
503 let subroot = self.plan_query(subquery.query)?;
504
505 let subplan = match subquery.kind {
506 SubqueryKind::Scalar => subroot.into_unordered_subplan(),
507 SubqueryKind::UpdateSet => {
508 let plan = subroot.into_unordered_subplan();
509
510 let all_input_refs = plan
512 .schema()
513 .data_types()
514 .into_iter()
515 .enumerate()
516 .map(|(i, data_type)| InputRef::new(i, data_type).into())
517 .collect::<Vec<_>>();
518 let call =
519 FunctionCall::new_unchecked(ExprType::Row, all_input_refs, return_type);
520
521 LogicalProject::create(plan, vec![call.into()])
522 }
523 SubqueryKind::Existential => {
524 self.create_exists(subroot.into_unordered_subplan())?
525 }
526 SubqueryKind::Array => subroot.into_array_agg()?,
527 _ => bail_not_implemented!(issue = 1343, "{:?}", subquery.kind),
528 };
529 if right.is_none() {
530 right = Some(subplan);
531 } else {
532 right = Some(LogicalJoin::create(
533 right.clone().unwrap(),
534 subplan,
535 JoinType::FullOuter,
536 ExprImpl::literal_bool(true),
537 ));
538 }
539 }
540
541 root = if let Some(right) = right {
542 let mut correlated_indices = rewriter
543 .correlated_indices_collection
544 .iter()
545 .flatten()
546 .cloned()
547 .collect::<Vec<_>>();
548 correlated_indices.sort();
549 correlated_indices.dedup();
550
551 Self::create_apply(
552 rewriter.correlated_id.expect("must have a correlated id"),
553 correlated_indices,
554 root,
555 right,
556 ExprImpl::literal_bool(true),
557 JoinType::LeftOuter,
558 true,
559 )
560 } else {
561 root
562 };
563
564 Ok((root, exprs))
565 }
566
567 fn create_apply(
568 correlated_id: CorrelatedId,
569 correlated_indices: Vec<usize>,
570 left: PlanRef,
571 right: PlanRef,
572 on: ExprImpl,
573 join_type: JoinType,
574 max_one_row: bool,
575 ) -> PlanRef {
576 LogicalApply::create(
577 left,
578 right,
579 join_type,
580 Condition::with_expr(on),
581 correlated_id,
582 correlated_indices,
583 max_one_row,
584 )
585 }
586}