1use std::collections::HashMap;
16
17use itertools::Itertools;
18use risingwave_common::bail_not_implemented;
19use risingwave_common::catalog::Schema;
20use risingwave_common::types::DataType;
21use risingwave_common::util::iter_util::ZipEqFast;
22use risingwave_common::util::sort_util::ColumnOrder;
23use risingwave_expr::ExprError;
24use risingwave_pb::plan_common::JoinType;
25
26use crate::OptimizerContextRef;
27use crate::binder::{BoundDistinct, BoundSelect};
28use crate::error::{ErrorCode, Result};
29use crate::expr::{
30 CorrelatedId, Expr, ExprImpl, ExprRewriter, ExprType, FunctionCall, InputRef, Subquery,
31 SubqueryKind,
32};
33pub use crate::optimizer::plan_node::LogicalFilter;
34use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef, Project, ProjectBuilder};
35use crate::optimizer::plan_node::{
36 LogicalAgg, LogicalApply, LogicalDedup, LogicalJoin, LogicalOverWindow,
37 LogicalPlanRef as PlanRef, LogicalProject, LogicalProjectSet, LogicalTopN, LogicalValues,
38 PlanAggCall,
39};
40use crate::optimizer::property::Order;
41use crate::planner::Planner;
42use crate::utils::{Condition, IndexSet};
43
44impl Planner {
45 pub(super) fn plan_select(
46 &mut self,
47 BoundSelect {
48 from,
49 where_clause,
50 mut select_items,
51 group_by,
52 mut having,
53 distinct,
54 ..
55 }: BoundSelect,
56 extra_order_exprs: Vec<ExprImpl>,
57 order: &[ColumnOrder],
58 ) -> Result<PlanRef> {
59 if distinct.is_distinct() && !extra_order_exprs.is_empty() {
61 return Err(ErrorCode::InvalidInputSyntax(
62 "for SELECT DISTINCT, ORDER BY expressions must appear in select list".into(),
63 )
64 .into());
65 }
66 select_items.extend(extra_order_exprs);
67 if let BoundDistinct::DistinctOn(exprs) = &distinct {
69 let mut distinct_on_exprs: HashMap<ExprImpl, bool> =
70 exprs.iter().map(|expr| (expr.clone(), false)).collect();
71 let mut uncovered_distinct_on_exprs_cnt = distinct_on_exprs.len();
72 let mut order_iter = order.iter().map(|o| &select_items[o.column_index]);
73 while uncovered_distinct_on_exprs_cnt > 0
74 && let Some(order_expr) = order_iter.next()
75 {
76 match distinct_on_exprs.get_mut(order_expr) {
77 Some(has_been_covered) => {
78 if !*has_been_covered {
79 *has_been_covered = true;
80 uncovered_distinct_on_exprs_cnt -= 1;
81 }
82 }
83 None => {
84 return Err(ErrorCode::InvalidInputSyntax(
85 "the SELECT DISTINCT ON expressions must match the leftmost ORDER BY expressions"
86 .into(),
87 )
88 .into());
89 }
90 }
91 }
92 }
93
94 let mut root = match from {
96 None => self.create_dummy_values(),
97 Some(t) => self.plan_relation(t)?,
98 };
99 if let Some(where_clause) = where_clause {
101 root = self.plan_where(root, where_clause)?;
102 }
103 let has_agg_call = select_items.iter().any(|expr| expr.has_agg_call());
106 if !group_by.is_empty() || having.is_some() || has_agg_call {
107 (root, select_items, having) =
108 LogicalAgg::create(select_items, group_by, having, root)?;
109 }
110
111 if let Some(having) = having {
112 root = self.plan_where(root, having)?;
113 }
114
115 if select_items.iter().any(|e| e.has_subquery()) {
116 (root, select_items) =
117 self.substitute_subqueries_in_cross_join_way(root, select_items)?;
118 }
119 if select_items.iter().any(|e| e.has_window_function()) {
120 (root, select_items) = LogicalOverWindow::create(root, select_items)?;
121 }
122
123 let original_select_items_len = select_items.len();
124
125 let mut distinct_list_index_to_select_items_index = vec![];
128 if let BoundDistinct::DistinctOn(distinct_list) = &distinct {
129 distinct_list_index_to_select_items_index.reserve(distinct_list.len());
130 let mut builder_index_to_select_items_index =
131 Vec::with_capacity(original_select_items_len);
132 let mut input_proj_builder = ProjectBuilder::default();
133 for (select_item_index, select_item) in select_items.iter().enumerate() {
134 let builder_index = input_proj_builder
135 .add_expr(select_item)
136 .map_err(|msg| ExprError::UnsupportedFunction(String::from(msg)))?;
137 if builder_index >= builder_index_to_select_items_index.len() {
138 debug_assert_eq!(builder_index, builder_index_to_select_items_index.len());
139 builder_index_to_select_items_index.push(select_item_index);
140 }
141 }
142 for distinct_expr in distinct_list {
143 let builder_index = input_proj_builder
144 .add_expr(distinct_expr)
145 .map_err(|msg| ExprError::UnsupportedFunction(String::from(msg)))?;
146 if builder_index >= builder_index_to_select_items_index.len() {
147 debug_assert_eq!(builder_index, builder_index_to_select_items_index.len());
148 select_items.push(distinct_expr.clone());
149 builder_index_to_select_items_index.push(select_items.len() - 1);
150 }
151 distinct_list_index_to_select_items_index
152 .push(builder_index_to_select_items_index[builder_index]);
153 }
154 }
155
156 let need_restore_select_items = select_items.len() > original_select_items_len;
157
158 root = LogicalProjectSet::create(root, select_items);
159
160 if matches!(&distinct, BoundDistinct::DistinctOn(_)) {
161 root = if order.is_empty() {
162 LogicalDedup::new(root, distinct_list_index_to_select_items_index).into()
165 } else {
166 LogicalTopN::new(
167 root,
168 1,
169 0,
170 false,
171 Order::new(order.to_vec()),
172 distinct_list_index_to_select_items_index,
173 )
174 .into()
175 };
176 }
177
178 if need_restore_select_items {
179 root = LogicalProject::with_core(Project::with_out_col_idx(
180 root,
181 0..original_select_items_len,
182 ))
183 .into();
184 }
185
186 if let BoundDistinct::Distinct = distinct {
187 let fields = root.schema().fields();
188 let group_key = if let Some(field) = fields.first()
189 && field.name == "projected_row_id"
190 {
191 (1..fields.len()).collect()
193 } else {
194 (0..fields.len()).collect()
195 };
196 root = Agg::new(vec![], group_key, root).into();
197 }
198
199 Ok(root)
200 }
201
202 fn create_dummy_values(&self) -> PlanRef {
205 LogicalValues::create(vec![vec![]], Schema::default(), self.ctx.clone())
206 }
207
208 fn create_exists(&self, input: PlanRef) -> Result<PlanRef> {
211 let count_star = Agg::new(vec![PlanAggCall::count_star()], IndexSet::empty(), input);
212 let ge = FunctionCall::new(
213 ExprType::GreaterThanOrEqual,
214 vec![
215 InputRef::new(0, DataType::Int64).into(),
216 ExprImpl::literal_int(1),
217 ],
218 )
219 .unwrap();
220 Ok(LogicalProject::create(count_star.into(), vec![ge.into()]))
221 }
222
223 pub(super) fn plan_where(
228 &mut self,
229 mut input: PlanRef,
230 where_clause: ExprImpl,
231 ) -> Result<PlanRef> {
232 if !where_clause.has_subquery() {
233 return Ok(LogicalFilter::create_with_expr(input, where_clause));
234 }
235 let (subquery_conjunctions, not_subquery_conjunctions, others) =
236 Condition::with_expr(where_clause)
237 .group_by::<_, 3>(|expr| match expr {
238 ExprImpl::Subquery(_) => 0,
239 ExprImpl::FunctionCall(func_call)
240 if func_call.func_type() == ExprType::Not
241 && matches!(func_call.inputs()[0], ExprImpl::Subquery(_)) =>
242 {
243 1
244 }
245 _ => 2,
246 })
247 .into_iter()
248 .next_tuple()
249 .unwrap();
250
251 for expr in subquery_conjunctions {
253 self.handle_exists_and_in(expr, false, &mut input)?;
254 }
255
256 for expr in not_subquery_conjunctions {
258 let not = expr.into_function_call().unwrap();
259 let (_, expr) = not.decompose_as_unary();
260 self.handle_exists_and_in(expr, true, &mut input)?;
261 }
262
263 if others.always_true() {
264 Ok(input)
265 } else {
266 let (input, others) =
267 self.substitute_subqueries_in_left_deep_tree_way(input, others.conjunctions)?;
268 Ok(LogicalFilter::create(
269 input,
270 Condition {
271 conjunctions: others,
272 },
273 ))
274 }
275 }
276
277 fn handle_exists_and_in(
282 &mut self,
283 expr: ExprImpl,
284 negated: bool,
285 input: &mut PlanRef,
286 ) -> Result<()> {
287 let join_type = if negated {
288 JoinType::LeftAnti
289 } else {
290 JoinType::LeftSemi
291 };
292 let correlated_id = self.ctx.next_correlated_id();
293 let mut subquery = expr.into_subquery().unwrap();
294 let mut correlated_indices = subquery
298 .query
299 .collect_correlated_indices_by_depth_and_assign_id(0, correlated_id);
300 correlated_indices.sort();
301 correlated_indices.dedup();
302 let output_column_type = subquery.query.data_types()[0].clone();
303 let right_plan = self.plan_query(subquery.query)?.into_unordered_subplan();
304 let on = match subquery.kind {
305 SubqueryKind::Existential => ExprImpl::literal_bool(true),
306 SubqueryKind::In(left_expr) => {
307 let right_expr = InputRef::new(input.schema().len(), output_column_type);
308 FunctionCall::new(ExprType::Equal, vec![left_expr, right_expr.into()])?.into()
309 }
310 kind => bail_not_implemented!(issue = 1343, "Not supported subquery kind: {:?}", kind),
311 };
312 *input = Self::create_apply(
313 correlated_id,
314 correlated_indices,
315 input.clone(),
316 right_plan,
317 on,
318 join_type,
319 false,
320 );
321 Ok(())
322 }
323
324 pub(super) fn substitute_subqueries_in_left_deep_tree_way(
348 &mut self,
349 mut root: PlanRef,
350 mut exprs: Vec<ExprImpl>,
351 ) -> Result<(PlanRef, Vec<ExprImpl>)> {
352 struct SubstituteSubQueries {
353 input_col_num: usize,
354 subqueries: Vec<Subquery>,
355 correlated_indices_collection: Vec<Vec<usize>>,
356 correlated_ids: Vec<CorrelatedId>,
357 ctx: OptimizerContextRef,
358 }
359
360 impl ExprRewriter for SubstituteSubQueries {
362 fn rewrite_subquery(&mut self, mut subquery: Subquery) -> ExprImpl {
363 let correlated_id = self.ctx.next_correlated_id();
364 self.correlated_ids.push(correlated_id);
365 let input_ref = InputRef::new(self.input_col_num, subquery.return_type()).into();
366 self.input_col_num += 1;
367 self.correlated_indices_collection.push(
368 subquery.collect_correlated_indices_by_depth_and_assign_id(0, correlated_id),
369 );
370 self.subqueries.push(subquery);
371 input_ref
372 }
373 }
374
375 let mut rewriter = SubstituteSubQueries {
376 input_col_num: root.schema().len(),
377 subqueries: vec![],
378 correlated_indices_collection: vec![],
379 correlated_ids: vec![],
380 ctx: self.ctx.clone(),
381 };
382 exprs = exprs
383 .into_iter()
384 .map(|e| rewriter.rewrite_expr(e))
385 .collect();
386
387 for ((subquery, correlated_indices), correlated_id) in rewriter
388 .subqueries
389 .into_iter()
390 .zip_eq_fast(rewriter.correlated_indices_collection)
391 .zip_eq_fast(rewriter.correlated_ids)
392 {
393 let return_type = subquery.return_type();
394 let subroot = self.plan_query(subquery.query)?;
395
396 let right = match subquery.kind {
397 SubqueryKind::Scalar => subroot.into_unordered_subplan(),
398 SubqueryKind::UpdateSet => {
399 let plan = subroot.into_unordered_subplan();
400
401 let all_input_refs = plan
403 .schema()
404 .data_types()
405 .into_iter()
406 .enumerate()
407 .map(|(i, data_type)| InputRef::new(i, data_type).into())
408 .collect::<Vec<_>>();
409 let call =
410 FunctionCall::new_unchecked(ExprType::Row, all_input_refs, return_type);
411
412 LogicalProject::create(plan, vec![call.into()])
413 }
414 SubqueryKind::Existential => {
415 self.create_exists(subroot.into_unordered_subplan())?
416 }
417 SubqueryKind::Array => subroot.into_array_agg()?,
418 _ => bail_not_implemented!(issue = 1343, "{:?}", subquery.kind),
419 };
420
421 root = Self::create_apply(
422 correlated_id,
423 correlated_indices,
424 root,
425 right,
426 ExprImpl::literal_bool(true),
427 JoinType::LeftOuter,
428 true,
429 );
430 }
431 Ok((root, exprs))
432 }
433
434 pub(super) fn substitute_subqueries_in_cross_join_way(
458 &mut self,
459 mut root: PlanRef,
460 mut exprs: Vec<ExprImpl>,
461 ) -> Result<(PlanRef, Vec<ExprImpl>)> {
462 struct SubstituteSubQueries {
463 input_col_num: usize,
464 subqueries: Vec<Subquery>,
465 correlated_id: Option<CorrelatedId>,
466 correlated_indices_collection: Vec<Vec<usize>>,
467 ctx: OptimizerContextRef,
468 }
469
470 impl ExprRewriter for SubstituteSubQueries {
471 fn rewrite_subquery(&mut self, mut subquery: Subquery) -> ExprImpl {
472 if self.correlated_id.is_none() {
473 self.correlated_id = Some(self.ctx.next_correlated_id());
474 }
475 let input_ref = InputRef::new(self.input_col_num, subquery.return_type()).into();
476 self.input_col_num += 1;
477 self.correlated_indices_collection.push(
478 subquery.collect_correlated_indices_by_depth_and_assign_id(
479 0,
480 self.correlated_id.unwrap(),
481 ),
482 );
483 self.subqueries.push(subquery);
484 input_ref
485 }
486 }
487
488 let mut rewriter = SubstituteSubQueries {
489 input_col_num: root.schema().len(),
490 subqueries: vec![],
491 correlated_id: None,
492 correlated_indices_collection: vec![],
493 ctx: self.ctx.clone(),
494 };
495 exprs = exprs
496 .into_iter()
497 .map(|e| rewriter.rewrite_expr(e))
498 .collect();
499
500 let mut right = None;
501
502 for subquery in rewriter.subqueries {
503 let return_type = subquery.return_type();
504 let subroot = self.plan_query(subquery.query)?;
505
506 let subplan = match subquery.kind {
507 SubqueryKind::Scalar => subroot.into_unordered_subplan(),
508 SubqueryKind::UpdateSet => {
509 let plan = subroot.into_unordered_subplan();
510
511 let all_input_refs = plan
513 .schema()
514 .data_types()
515 .into_iter()
516 .enumerate()
517 .map(|(i, data_type)| InputRef::new(i, data_type).into())
518 .collect::<Vec<_>>();
519 let call =
520 FunctionCall::new_unchecked(ExprType::Row, all_input_refs, return_type);
521
522 LogicalProject::create(plan, vec![call.into()])
523 }
524 SubqueryKind::Existential => {
525 self.create_exists(subroot.into_unordered_subplan())?
526 }
527 SubqueryKind::Array => subroot.into_array_agg()?,
528 _ => bail_not_implemented!(issue = 1343, "{:?}", subquery.kind),
529 };
530 if right.is_none() {
531 right = Some(subplan);
532 } else {
533 right = Some(LogicalJoin::create(
534 right.clone().unwrap(),
535 subplan,
536 JoinType::FullOuter,
537 ExprImpl::literal_bool(true),
538 ));
539 }
540 }
541
542 root = if let Some(right) = right {
543 let mut correlated_indices = rewriter
544 .correlated_indices_collection
545 .iter()
546 .flatten()
547 .cloned()
548 .collect::<Vec<_>>();
549 correlated_indices.sort();
550 correlated_indices.dedup();
551
552 Self::create_apply(
553 rewriter.correlated_id.expect("must have a correlated id"),
554 correlated_indices,
555 root,
556 right,
557 ExprImpl::literal_bool(true),
558 JoinType::LeftOuter,
559 true,
560 )
561 } else {
562 root
563 };
564
565 Ok((root, exprs))
566 }
567
568 fn create_apply(
569 correlated_id: CorrelatedId,
570 correlated_indices: Vec<usize>,
571 left: PlanRef,
572 right: PlanRef,
573 on: ExprImpl,
574 join_type: JoinType,
575 max_one_row: bool,
576 ) -> PlanRef {
577 LogicalApply::create(
578 left,
579 right,
580 join_type,
581 Condition::with_expr(on),
582 correlated_id,
583 correlated_indices,
584 max_one_row,
585 )
586 }
587}