1use std::collections::HashMap;
16
17use itertools::Itertools;
18use risingwave_common::bail_not_implemented;
19use risingwave_common::catalog::PROJECTED_ROW_ID_COLUMN_NAME;
20use risingwave_common::types::DataType;
21use risingwave_common::util::iter_util::ZipEqFast;
22use risingwave_common::util::sort_util::ColumnOrder;
23use risingwave_expr::ExprError;
24use risingwave_pb::plan_common::JoinType;
25
26use crate::OptimizerContextRef;
27use crate::binder::{BoundDistinct, BoundSelect};
28use crate::error::{ErrorCode, Result};
29use crate::expr::{
30 CorrelatedId, Expr, ExprImpl, ExprRewriter, ExprType, FunctionCall, InputRef, Subquery,
31 SubqueryKind,
32};
33pub use crate::optimizer::plan_node::LogicalFilter;
34use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef, Project, ProjectBuilder};
35use crate::optimizer::plan_node::{
36 LogicalAgg, LogicalApply, LogicalDedup, LogicalJoin, LogicalOverWindow,
37 LogicalPlanRef as PlanRef, LogicalProject, LogicalProjectSet, LogicalTopN, LogicalValues,
38 PlanAggCall,
39};
40use crate::optimizer::property::Order;
41use crate::planner::Planner;
42use crate::utils::{Condition, IndexSet};
43
44impl Planner {
45 pub(super) fn plan_select(
46 &mut self,
47 BoundSelect {
48 from,
49 where_clause,
50 mut select_items,
51 group_by,
52 mut having,
53 mut distinct,
54 ..
55 }: BoundSelect,
56 extra_order_exprs: Vec<ExprImpl>,
57 order: &[ColumnOrder],
58 ) -> Result<PlanRef> {
59 if distinct.is_distinct() && !extra_order_exprs.is_empty() {
61 return Err(ErrorCode::InvalidInputSyntax(
62 "for SELECT DISTINCT, ORDER BY expressions must appear in select list".into(),
63 )
64 .into());
65 }
66 select_items.extend(extra_order_exprs);
67 if let BoundDistinct::DistinctOn(exprs) = &distinct {
69 let mut distinct_on_exprs: HashMap<ExprImpl, bool> =
70 exprs.iter().map(|expr| (expr.clone(), false)).collect();
71 let mut uncovered_distinct_on_exprs_cnt = distinct_on_exprs.len();
72 let mut order_iter = order.iter().map(|o| &select_items[o.column_index]);
73 while uncovered_distinct_on_exprs_cnt > 0
74 && let Some(order_expr) = order_iter.next()
75 {
76 match distinct_on_exprs.get_mut(order_expr) {
77 Some(has_been_covered) => {
78 if !*has_been_covered {
79 *has_been_covered = true;
80 uncovered_distinct_on_exprs_cnt -= 1;
81 }
82 }
83 None => {
84 return Err(ErrorCode::InvalidInputSyntax(
85 "the SELECT DISTINCT ON expressions must match the leftmost ORDER BY expressions"
86 .into(),
87 )
88 .into());
89 }
90 }
91 }
92 }
93
94 let mut root = match from {
96 None => self.create_dummy_values(),
97 Some(t) => self.plan_relation(t)?,
98 };
99 if let Some(where_clause) = where_clause {
101 root = self.plan_where(root, where_clause)?;
102 }
103 let has_agg_call = select_items.iter().any(|expr| expr.has_agg_call());
106 if !group_by.is_empty() || having.is_some() || has_agg_call {
107 let n_distinct_on = if let BoundDistinct::DistinctOn(exprs) = &distinct {
113 select_items.extend(exprs.iter().cloned());
114 exprs.len()
115 } else {
116 0
117 };
118
119 (root, select_items, having) =
120 LogicalAgg::create(select_items, group_by, having, root)?;
121
122 if n_distinct_on > 0 {
123 let rewritten = select_items.split_off(select_items.len() - n_distinct_on);
124 distinct = BoundDistinct::DistinctOn(rewritten);
125 }
126 }
127
128 if let Some(having) = having {
129 root = self.plan_where(root, having)?;
130 }
131
132 if select_items.iter().any(|e| e.has_subquery()) {
133 (root, select_items) =
134 self.substitute_subqueries_in_cross_join_way(root, select_items)?;
135 }
136 if select_items.iter().any(|e| e.has_window_function()) {
137 (root, select_items) = LogicalOverWindow::create(root, select_items)?;
138 }
139
140 let original_select_items_len = select_items.len();
141
142 let mut distinct_list_index_to_select_items_index = vec![];
145 if let BoundDistinct::DistinctOn(distinct_list) = &distinct {
146 distinct_list_index_to_select_items_index.reserve(distinct_list.len());
147 let mut builder_index_to_select_items_index =
148 Vec::with_capacity(original_select_items_len);
149 let mut input_proj_builder = ProjectBuilder::default();
150 for (select_item_index, select_item) in select_items.iter().enumerate() {
151 let builder_index = input_proj_builder
152 .add_expr(select_item)
153 .map_err(|msg| ExprError::UnsupportedFunction(String::from(msg)))?;
154 if builder_index >= builder_index_to_select_items_index.len() {
155 debug_assert_eq!(builder_index, builder_index_to_select_items_index.len());
156 builder_index_to_select_items_index.push(select_item_index);
157 }
158 }
159 for distinct_expr in distinct_list {
160 let builder_index = input_proj_builder
161 .add_expr(distinct_expr)
162 .map_err(|msg| ExprError::UnsupportedFunction(String::from(msg)))?;
163 if builder_index >= builder_index_to_select_items_index.len() {
164 debug_assert_eq!(builder_index, builder_index_to_select_items_index.len());
165 select_items.push(distinct_expr.clone());
166 builder_index_to_select_items_index.push(select_items.len() - 1);
167 }
168 distinct_list_index_to_select_items_index
169 .push(builder_index_to_select_items_index[builder_index]);
170 }
171 }
172
173 let need_restore_select_items = select_items.len() > original_select_items_len;
174
175 root = LogicalProjectSet::create(root, select_items);
176
177 if matches!(&distinct, BoundDistinct::DistinctOn(_)) {
178 root = if order.is_empty() {
179 LogicalDedup::new(root, distinct_list_index_to_select_items_index).into()
182 } else {
183 LogicalTopN::new(
184 root,
185 1,
186 0,
187 false,
188 Order::new(order.to_vec()),
189 distinct_list_index_to_select_items_index,
190 )
191 .into()
192 };
193 }
194
195 if need_restore_select_items {
196 root = LogicalProject::with_core(Project::with_out_col_idx(
197 root,
198 0..original_select_items_len,
199 ))
200 .into();
201 }
202
203 if let BoundDistinct::Distinct = distinct {
204 let fields = root.schema().fields();
205 let group_key = if let Some(field) = fields.first()
206 && field.name == PROJECTED_ROW_ID_COLUMN_NAME
207 {
208 (1..fields.len()).collect()
210 } else {
211 (0..fields.len()).collect()
212 };
213 root = Agg::new(vec![], group_key, root).into();
214 }
215
216 Ok(root)
217 }
218
219 fn create_dummy_values(&self) -> PlanRef {
222 LogicalValues::create_empty_scalar(self.ctx.clone())
223 }
224
225 fn create_exists(&self, input: PlanRef) -> Result<PlanRef> {
228 let count_star = Agg::new(vec![PlanAggCall::count_star()], IndexSet::empty(), input);
229 let ge = FunctionCall::new(
230 ExprType::GreaterThanOrEqual,
231 vec![
232 InputRef::new(0, DataType::Int64).into(),
233 ExprImpl::literal_int(1),
234 ],
235 )
236 .unwrap();
237 Ok(LogicalProject::create(count_star.into(), vec![ge.into()]))
238 }
239
240 pub(super) fn plan_where(
245 &mut self,
246 mut input: PlanRef,
247 where_clause: ExprImpl,
248 ) -> Result<PlanRef> {
249 if !where_clause.has_subquery() {
250 return Ok(LogicalFilter::create_with_expr(input, where_clause));
251 }
252 let (subquery_conjunctions, not_subquery_conjunctions, others) =
253 Condition::with_expr(where_clause)
254 .group_by::<_, 3>(|expr| match expr {
255 ExprImpl::Subquery(_) => 0,
256 ExprImpl::FunctionCall(func_call)
257 if func_call.func_type() == ExprType::Not
258 && matches!(func_call.inputs()[0], ExprImpl::Subquery(_)) =>
259 {
260 1
261 }
262 _ => 2,
263 })
264 .into_iter()
265 .next_tuple()
266 .unwrap();
267
268 for expr in subquery_conjunctions {
270 self.handle_exists_and_in(expr, false, &mut input)?;
271 }
272
273 for expr in not_subquery_conjunctions {
275 let not = expr.into_function_call().unwrap();
276 let (_, expr) = not.decompose_as_unary();
277 self.handle_exists_and_in(expr, true, &mut input)?;
278 }
279
280 if others.always_true() {
281 Ok(input)
282 } else {
283 let (input, others) =
284 self.substitute_subqueries_in_left_deep_tree_way(input, others.conjunctions)?;
285 Ok(LogicalFilter::create(
286 input,
287 Condition {
288 conjunctions: others,
289 },
290 ))
291 }
292 }
293
294 fn handle_exists_and_in(
299 &mut self,
300 expr: ExprImpl,
301 negated: bool,
302 input: &mut PlanRef,
303 ) -> Result<()> {
304 let join_type = if negated {
305 JoinType::LeftAnti
306 } else {
307 JoinType::LeftSemi
308 };
309 let correlated_id = self.ctx.next_correlated_id();
310 let mut subquery = expr.into_subquery().unwrap();
311 let mut correlated_indices = subquery
315 .query
316 .collect_correlated_indices_by_depth_and_assign_id(0, correlated_id);
317 correlated_indices.sort();
318 correlated_indices.dedup();
319 let output_column_type = subquery.query.data_types()[0].clone();
320 let right_plan = self.plan_query(subquery.query)?.into_unordered_subplan();
321 let on = match subquery.kind {
322 SubqueryKind::Existential => ExprImpl::literal_bool(true),
323 SubqueryKind::In(left_expr) => {
324 let right_expr = InputRef::new(input.schema().len(), output_column_type);
325 FunctionCall::new(ExprType::Equal, vec![left_expr, right_expr.into()])?.into()
326 }
327 kind => bail_not_implemented!(issue = 1343, "Not supported subquery kind: {:?}", kind),
328 };
329 *input = Self::create_apply(
330 correlated_id,
331 correlated_indices,
332 input.clone(),
333 right_plan,
334 on,
335 join_type,
336 false,
337 );
338 Ok(())
339 }
340
341 pub(super) fn substitute_subqueries_in_left_deep_tree_way(
365 &mut self,
366 mut root: PlanRef,
367 mut exprs: Vec<ExprImpl>,
368 ) -> Result<(PlanRef, Vec<ExprImpl>)> {
369 struct SubstituteSubQueries {
370 input_col_num: usize,
371 subqueries: Vec<Subquery>,
372 correlated_indices_collection: Vec<Vec<usize>>,
373 correlated_ids: Vec<CorrelatedId>,
374 ctx: OptimizerContextRef,
375 }
376
377 impl ExprRewriter for SubstituteSubQueries {
379 fn rewrite_subquery(&mut self, mut subquery: Subquery) -> ExprImpl {
380 let correlated_id = self.ctx.next_correlated_id();
381 self.correlated_ids.push(correlated_id);
382 let input_ref = InputRef::new(self.input_col_num, subquery.return_type()).into();
383 self.input_col_num += 1;
384 self.correlated_indices_collection.push(
385 subquery.collect_correlated_indices_by_depth_and_assign_id(0, correlated_id),
386 );
387 self.subqueries.push(subquery);
388 input_ref
389 }
390 }
391
392 let mut rewriter = SubstituteSubQueries {
393 input_col_num: root.schema().len(),
394 subqueries: vec![],
395 correlated_indices_collection: vec![],
396 correlated_ids: vec![],
397 ctx: self.ctx.clone(),
398 };
399 exprs = exprs
400 .into_iter()
401 .map(|e| rewriter.rewrite_expr(e))
402 .collect();
403
404 for ((subquery, correlated_indices), correlated_id) in rewriter
405 .subqueries
406 .into_iter()
407 .zip_eq_fast(rewriter.correlated_indices_collection)
408 .zip_eq_fast(rewriter.correlated_ids)
409 {
410 let return_type = subquery.return_type();
411 let subroot = self.plan_query(subquery.query)?;
412
413 let right = match subquery.kind {
414 SubqueryKind::Scalar => subroot.into_unordered_subplan(),
415 SubqueryKind::UpdateSet => {
416 let plan = subroot.into_unordered_subplan();
417
418 let all_input_refs = plan
420 .schema()
421 .data_types()
422 .into_iter()
423 .enumerate()
424 .map(|(i, data_type)| InputRef::new(i, data_type).into())
425 .collect::<Vec<_>>();
426 let call =
427 FunctionCall::new_unchecked(ExprType::Row, all_input_refs, return_type);
428
429 LogicalProject::create(plan, vec![call.into()])
430 }
431 SubqueryKind::Existential => {
432 self.create_exists(subroot.into_unordered_subplan())?
433 }
434 SubqueryKind::Array => subroot.into_array_agg()?,
435 _ => bail_not_implemented!(issue = 1343, "{:?}", subquery.kind),
436 };
437
438 root = Self::create_apply(
439 correlated_id,
440 correlated_indices,
441 root,
442 right,
443 ExprImpl::literal_bool(true),
444 JoinType::LeftOuter,
445 true,
446 );
447 }
448 Ok((root, exprs))
449 }
450
451 pub(super) fn substitute_subqueries_in_cross_join_way(
475 &mut self,
476 mut root: PlanRef,
477 mut exprs: Vec<ExprImpl>,
478 ) -> Result<(PlanRef, Vec<ExprImpl>)> {
479 struct SubstituteSubQueries {
480 input_col_num: usize,
481 subqueries: Vec<Subquery>,
482 correlated_id: Option<CorrelatedId>,
483 correlated_indices_collection: Vec<Vec<usize>>,
484 ctx: OptimizerContextRef,
485 }
486
487 impl ExprRewriter for SubstituteSubQueries {
488 fn rewrite_subquery(&mut self, mut subquery: Subquery) -> ExprImpl {
489 if self.correlated_id.is_none() {
490 self.correlated_id = Some(self.ctx.next_correlated_id());
491 }
492 let input_ref = InputRef::new(self.input_col_num, subquery.return_type()).into();
493 self.input_col_num += 1;
494 self.correlated_indices_collection.push(
495 subquery.collect_correlated_indices_by_depth_and_assign_id(
496 0,
497 self.correlated_id.unwrap(),
498 ),
499 );
500 self.subqueries.push(subquery);
501 input_ref
502 }
503 }
504
505 let mut rewriter = SubstituteSubQueries {
506 input_col_num: root.schema().len(),
507 subqueries: vec![],
508 correlated_id: None,
509 correlated_indices_collection: vec![],
510 ctx: self.ctx.clone(),
511 };
512 exprs = exprs
513 .into_iter()
514 .map(|e| rewriter.rewrite_expr(e))
515 .collect();
516
517 let mut right = None;
518
519 for subquery in rewriter.subqueries {
520 let return_type = subquery.return_type();
521 let subroot = self.plan_query(subquery.query)?;
522
523 let subplan = match subquery.kind {
524 SubqueryKind::Scalar => subroot.into_unordered_subplan(),
525 SubqueryKind::UpdateSet => {
526 let plan = subroot.into_unordered_subplan();
527
528 let all_input_refs = plan
530 .schema()
531 .data_types()
532 .into_iter()
533 .enumerate()
534 .map(|(i, data_type)| InputRef::new(i, data_type).into())
535 .collect::<Vec<_>>();
536 let call =
537 FunctionCall::new_unchecked(ExprType::Row, all_input_refs, return_type);
538
539 LogicalProject::create(plan, vec![call.into()])
540 }
541 SubqueryKind::Existential => {
542 self.create_exists(subroot.into_unordered_subplan())?
543 }
544 SubqueryKind::Array => subroot.into_array_agg()?,
545 _ => bail_not_implemented!(issue = 1343, "{:?}", subquery.kind),
546 };
547 if right.is_none() {
548 right = Some(subplan);
549 } else {
550 right = Some(LogicalJoin::create(
551 right.clone().unwrap(),
552 subplan,
553 JoinType::FullOuter,
554 ExprImpl::literal_bool(true),
555 ));
556 }
557 }
558
559 root = if let Some(right) = right {
560 let mut correlated_indices = rewriter
561 .correlated_indices_collection
562 .iter()
563 .flatten()
564 .cloned()
565 .collect::<Vec<_>>();
566 correlated_indices.sort();
567 correlated_indices.dedup();
568
569 Self::create_apply(
570 rewriter.correlated_id.expect("must have a correlated id"),
571 correlated_indices,
572 root,
573 right,
574 ExprImpl::literal_bool(true),
575 JoinType::LeftOuter,
576 true,
577 )
578 } else {
579 root
580 };
581
582 Ok((root, exprs))
583 }
584
585 fn create_apply(
586 correlated_id: CorrelatedId,
587 correlated_indices: Vec<usize>,
588 left: PlanRef,
589 right: PlanRef,
590 on: ExprImpl,
591 join_type: JoinType,
592 max_one_row: bool,
593 ) -> PlanRef {
594 LogicalApply::create(
595 left,
596 right,
597 join_type,
598 Condition::with_expr(on),
599 correlated_id,
600 correlated_indices,
601 max_one_row,
602 )
603 }
604}