1use std::collections::HashMap;
16
17use itertools::Itertools;
18use risingwave_common::bail_not_implemented;
19use risingwave_common::catalog::Schema;
20use risingwave_common::types::DataType;
21use risingwave_common::util::iter_util::ZipEqFast;
22use risingwave_common::util::sort_util::ColumnOrder;
23use risingwave_expr::ExprError;
24use risingwave_pb::plan_common::JoinType;
25
26use crate::OptimizerContextRef;
27use crate::binder::{BoundDistinct, BoundSelect};
28use crate::error::{ErrorCode, Result};
29use crate::expr::{
30 CorrelatedId, Expr, ExprImpl, ExprRewriter, ExprType, FunctionCall, InputRef, Subquery,
31 SubqueryKind,
32};
33pub use crate::optimizer::plan_node::LogicalFilter;
34use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef, Project, ProjectBuilder};
35use crate::optimizer::plan_node::{
36 LogicalAgg, LogicalApply, LogicalDedup, LogicalOverWindow, LogicalProject, LogicalProjectSet,
37 LogicalTopN, LogicalValues, PlanAggCall, PlanRef,
38};
39use crate::optimizer::property::Order;
40use crate::planner::Planner;
41use crate::utils::{Condition, IndexSet};
42
43impl Planner {
44 pub(super) fn plan_select(
45 &mut self,
46 BoundSelect {
47 from,
48 where_clause,
49 mut select_items,
50 group_by,
51 mut having,
52 distinct,
53 ..
54 }: BoundSelect,
55 extra_order_exprs: Vec<ExprImpl>,
56 order: &[ColumnOrder],
57 ) -> Result<PlanRef> {
58 if distinct.is_distinct() && !extra_order_exprs.is_empty() {
60 return Err(ErrorCode::InvalidInputSyntax(
61 "for SELECT DISTINCT, ORDER BY expressions must appear in select list".into(),
62 )
63 .into());
64 }
65 select_items.extend(extra_order_exprs);
66 if let BoundDistinct::DistinctOn(exprs) = &distinct {
68 let mut distinct_on_exprs: HashMap<ExprImpl, bool> =
69 exprs.iter().map(|expr| (expr.clone(), false)).collect();
70 let mut uncovered_distinct_on_exprs_cnt = distinct_on_exprs.len();
71 let mut order_iter = order.iter().map(|o| &select_items[o.column_index]);
72 while uncovered_distinct_on_exprs_cnt > 0
73 && let Some(order_expr) = order_iter.next()
74 {
75 match distinct_on_exprs.get_mut(order_expr) {
76 Some(has_been_covered) => {
77 if !*has_been_covered {
78 *has_been_covered = true;
79 uncovered_distinct_on_exprs_cnt -= 1;
80 }
81 }
82 None => {
83 return Err(ErrorCode::InvalidInputSyntax(
84 "the SELECT DISTINCT ON expressions must match the leftmost ORDER BY expressions"
85 .into(),
86 )
87 .into());
88 }
89 }
90 }
91 }
92
93 let mut root = match from {
95 None => self.create_dummy_values(),
96 Some(t) => self.plan_relation(t)?,
97 };
98 if let Some(where_clause) = where_clause {
100 root = self.plan_where(root, where_clause)?;
101 }
102 let has_agg_call = select_items.iter().any(|expr| expr.has_agg_call());
105 if !group_by.is_empty() || having.is_some() || has_agg_call {
106 (root, select_items, having) =
107 LogicalAgg::create(select_items, group_by, having, root)?;
108 }
109
110 if let Some(having) = having {
111 root = self.plan_where(root, having)?;
112 }
113
114 if select_items.iter().any(|e| e.has_subquery()) {
115 (root, select_items) = self.substitute_subqueries(root, select_items)?;
116 }
117 if select_items.iter().any(|e| e.has_window_function()) {
118 (root, select_items) = LogicalOverWindow::create(root, select_items)?;
119 }
120
121 let original_select_items_len = select_items.len();
122
123 let mut distinct_list_index_to_select_items_index = vec![];
126 if let BoundDistinct::DistinctOn(distinct_list) = &distinct {
127 distinct_list_index_to_select_items_index.reserve(distinct_list.len());
128 let mut builder_index_to_select_items_index =
129 Vec::with_capacity(original_select_items_len);
130 let mut input_proj_builder = ProjectBuilder::default();
131 for (select_item_index, select_item) in select_items.iter().enumerate() {
132 let builder_index = input_proj_builder
133 .add_expr(select_item)
134 .map_err(|msg| ExprError::UnsupportedFunction(String::from(msg)))?;
135 if builder_index >= builder_index_to_select_items_index.len() {
136 debug_assert_eq!(builder_index, builder_index_to_select_items_index.len());
137 builder_index_to_select_items_index.push(select_item_index);
138 }
139 }
140 for distinct_expr in distinct_list {
141 let builder_index = input_proj_builder
142 .add_expr(distinct_expr)
143 .map_err(|msg| ExprError::UnsupportedFunction(String::from(msg)))?;
144 if builder_index >= builder_index_to_select_items_index.len() {
145 debug_assert_eq!(builder_index, builder_index_to_select_items_index.len());
146 select_items.push(distinct_expr.clone());
147 builder_index_to_select_items_index.push(select_items.len() - 1);
148 }
149 distinct_list_index_to_select_items_index
150 .push(builder_index_to_select_items_index[builder_index]);
151 }
152 }
153
154 let need_restore_select_items = select_items.len() > original_select_items_len;
155
156 root = LogicalProjectSet::create(root, select_items);
157
158 if matches!(&distinct, BoundDistinct::DistinctOn(_)) {
159 root = if order.is_empty() {
160 LogicalDedup::new(root, distinct_list_index_to_select_items_index).into()
163 } else {
164 LogicalTopN::new(
165 root,
166 1,
167 0,
168 false,
169 Order::new(order.to_vec()),
170 distinct_list_index_to_select_items_index,
171 )
172 .into()
173 };
174 }
175
176 if need_restore_select_items {
177 root = LogicalProject::with_core(Project::with_out_col_idx(
178 root,
179 0..original_select_items_len,
180 ))
181 .into();
182 }
183
184 if let BoundDistinct::Distinct = distinct {
185 let fields = root.schema().fields();
186 let group_key = if let Some(field) = fields.first()
187 && field.name == "projected_row_id"
188 {
189 (1..fields.len()).collect()
191 } else {
192 (0..fields.len()).collect()
193 };
194 root = Agg::new(vec![], group_key, root).into();
195 }
196
197 Ok(root)
198 }
199
200 fn create_dummy_values(&self) -> PlanRef {
203 LogicalValues::create(vec![vec![]], Schema::default(), self.ctx.clone())
204 }
205
206 fn create_exists(&self, input: PlanRef) -> Result<PlanRef> {
209 let count_star = Agg::new(vec![PlanAggCall::count_star()], IndexSet::empty(), input);
210 let ge = FunctionCall::new(
211 ExprType::GreaterThanOrEqual,
212 vec![
213 InputRef::new(0, DataType::Int64).into(),
214 ExprImpl::literal_int(1),
215 ],
216 )
217 .unwrap();
218 Ok(LogicalProject::create(count_star.into(), vec![ge.into()]))
219 }
220
221 pub(super) fn plan_where(
226 &mut self,
227 mut input: PlanRef,
228 where_clause: ExprImpl,
229 ) -> Result<PlanRef> {
230 if !where_clause.has_subquery() {
231 return Ok(LogicalFilter::create_with_expr(input, where_clause));
232 }
233 let (subquery_conjunctions, not_subquery_conjunctions, others) =
234 Condition::with_expr(where_clause)
235 .group_by::<_, 3>(|expr| match expr {
236 ExprImpl::Subquery(_) => 0,
237 ExprImpl::FunctionCall(func_call)
238 if func_call.func_type() == ExprType::Not
239 && matches!(func_call.inputs()[0], ExprImpl::Subquery(_)) =>
240 {
241 1
242 }
243 _ => 2,
244 })
245 .into_iter()
246 .next_tuple()
247 .unwrap();
248
249 for expr in subquery_conjunctions {
251 self.handle_exists_and_in(expr, false, &mut input)?;
252 }
253
254 for expr in not_subquery_conjunctions {
256 let not = expr.into_function_call().unwrap();
257 let (_, expr) = not.decompose_as_unary();
258 self.handle_exists_and_in(expr, true, &mut input)?;
259 }
260
261 if others.always_true() {
262 Ok(input)
263 } else {
264 let (input, others) = self.substitute_subqueries(input, others.conjunctions)?;
265 Ok(LogicalFilter::create(
266 input,
267 Condition {
268 conjunctions: others,
269 },
270 ))
271 }
272 }
273
274 fn handle_exists_and_in(
279 &mut self,
280 expr: ExprImpl,
281 negated: bool,
282 input: &mut PlanRef,
283 ) -> Result<()> {
284 let join_type = if negated {
285 JoinType::LeftAnti
286 } else {
287 JoinType::LeftSemi
288 };
289 let correlated_id = self.ctx.next_correlated_id();
290 let mut subquery = expr.into_subquery().unwrap();
291 let correlated_indices =
292 subquery.collect_correlated_indices_by_depth_and_assign_id(0, correlated_id);
293 let output_column_type = subquery.query.data_types()[0].clone();
294 let right_plan = self.plan_query(subquery.query)?.into_unordered_subplan();
295 let on = match subquery.kind {
296 SubqueryKind::Existential => ExprImpl::literal_bool(true),
297 SubqueryKind::In(left_expr) => {
298 let right_expr = InputRef::new(input.schema().len(), output_column_type);
299 FunctionCall::new(ExprType::Equal, vec![left_expr, right_expr.into()])?.into()
300 }
301 kind => bail_not_implemented!(issue = 1343, "Not supported subquery kind: {:?}", kind),
302 };
303 *input = Self::create_apply(
304 correlated_id,
305 correlated_indices,
306 input.clone(),
307 right_plan,
308 on,
309 join_type,
310 false,
311 );
312 Ok(())
313 }
314
315 pub(super) fn substitute_subqueries(
324 &mut self,
325 mut root: PlanRef,
326 mut exprs: Vec<ExprImpl>,
327 ) -> Result<(PlanRef, Vec<ExprImpl>)> {
328 struct SubstituteSubQueries {
329 input_col_num: usize,
330 subqueries: Vec<Subquery>,
331 correlated_indices_collection: Vec<Vec<usize>>,
332 correlated_ids: Vec<CorrelatedId>,
333 ctx: OptimizerContextRef,
334 }
335
336 impl ExprRewriter for SubstituteSubQueries {
338 fn rewrite_subquery(&mut self, mut subquery: Subquery) -> ExprImpl {
339 let correlated_id = self.ctx.next_correlated_id();
340 self.correlated_ids.push(correlated_id);
341 let input_ref = InputRef::new(self.input_col_num, subquery.return_type()).into();
342 self.input_col_num += 1;
343 self.correlated_indices_collection.push(
344 subquery.collect_correlated_indices_by_depth_and_assign_id(0, correlated_id),
345 );
346 self.subqueries.push(subquery);
347 input_ref
348 }
349 }
350
351 let mut rewriter = SubstituteSubQueries {
352 input_col_num: root.schema().len(),
353 subqueries: vec![],
354 correlated_indices_collection: vec![],
355 correlated_ids: vec![],
356 ctx: self.ctx.clone(),
357 };
358 exprs = exprs
359 .into_iter()
360 .map(|e| rewriter.rewrite_expr(e))
361 .collect();
362
363 for ((subquery, correlated_indices), correlated_id) in rewriter
364 .subqueries
365 .into_iter()
366 .zip_eq_fast(rewriter.correlated_indices_collection)
367 .zip_eq_fast(rewriter.correlated_ids)
368 {
369 let return_type = subquery.return_type();
370 let subroot = self.plan_query(subquery.query)?;
371
372 let right = match subquery.kind {
373 SubqueryKind::Scalar => subroot.into_unordered_subplan(),
374 SubqueryKind::UpdateSet => {
375 let plan = subroot.into_unordered_subplan();
376
377 let all_input_refs = plan
379 .schema()
380 .data_types()
381 .into_iter()
382 .enumerate()
383 .map(|(i, data_type)| InputRef::new(i, data_type).into())
384 .collect::<Vec<_>>();
385 let call =
386 FunctionCall::new_unchecked(ExprType::Row, all_input_refs, return_type);
387
388 LogicalProject::create(plan, vec![call.into()])
389 }
390 SubqueryKind::Existential => {
391 self.create_exists(subroot.into_unordered_subplan())?
392 }
393 SubqueryKind::Array => subroot.into_array_agg()?,
394 _ => bail_not_implemented!(issue = 1343, "{:?}", subquery.kind),
395 };
396
397 root = Self::create_apply(
398 correlated_id,
399 correlated_indices,
400 root,
401 right,
402 ExprImpl::literal_bool(true),
403 JoinType::LeftOuter,
404 true,
405 );
406 }
407 Ok((root, exprs))
408 }
409
410 fn create_apply(
411 correlated_id: CorrelatedId,
412 correlated_indices: Vec<usize>,
413 left: PlanRef,
414 right: PlanRef,
415 on: ExprImpl,
416 join_type: JoinType,
417 max_one_row: bool,
418 ) -> PlanRef {
419 LogicalApply::create(
420 left,
421 right,
422 join_type,
423 Condition::with_expr(on),
424 correlated_id,
425 correlated_indices,
426 max_one_row,
427 )
428 }
429}