1use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17
18use itertools::Itertools;
19use risingwave_common::catalog::{Field, Schema};
20use risingwave_common::types::ScalarImpl;
21use risingwave_common::util::iter_util::ZipEqFast;
22use risingwave_sqlparser::ast::{
23 DataType as AstDataType, Distinct, Expr, Select, SelectItem, Value,
24};
25
26use super::bind_context::{Clause, ColumnBinding};
27use super::statement::RewriteExprsRecursive;
28use super::{BoundShareInput, UNNAMED_COLUMN};
29use crate::binder::{Binder, Relation};
30use crate::catalog::check_column_name_not_reserved;
31use crate::error::{ErrorCode, Result, RwError};
32use crate::expr::{CorrelatedId, Depth, Expr as _, ExprImpl, ExprType, FunctionCall, InputRef};
33use crate::optimizer::plan_node::generic::CHANGELOG_OP;
34use crate::utils::group_by::GroupBy;
35
36#[derive(Debug, Clone)]
37pub struct BoundSelect {
38 pub distinct: BoundDistinct,
39 pub select_items: Vec<ExprImpl>,
40 pub aliases: Vec<Option<String>>,
41 pub from: Option<Relation>,
42 pub where_clause: Option<ExprImpl>,
43 pub group_by: GroupBy,
44 pub having: Option<ExprImpl>,
45 pub schema: Schema,
46}
47
48impl RewriteExprsRecursive for BoundSelect {
49 fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
50 self.distinct.rewrite_exprs_recursive(rewriter);
51
52 let new_select_items = std::mem::take(&mut self.select_items)
53 .into_iter()
54 .map(|expr| rewriter.rewrite_expr(expr))
55 .collect::<Vec<_>>();
56 self.select_items = new_select_items;
57
58 if let Some(from) = &mut self.from {
59 from.rewrite_exprs_recursive(rewriter);
60 }
61
62 self.where_clause =
63 std::mem::take(&mut self.where_clause).map(|expr| rewriter.rewrite_expr(expr));
64
65 let new_group_by = match &mut self.group_by {
66 GroupBy::GroupKey(group_key) => GroupBy::GroupKey(
67 std::mem::take(group_key)
68 .into_iter()
69 .map(|expr| rewriter.rewrite_expr(expr))
70 .collect::<Vec<_>>(),
71 ),
72 GroupBy::GroupingSets(grouping_sets) => GroupBy::GroupingSets(
73 std::mem::take(grouping_sets)
74 .into_iter()
75 .map(|set| {
76 set.into_iter()
77 .map(|expr| rewriter.rewrite_expr(expr))
78 .collect()
79 })
80 .collect::<Vec<_>>(),
81 ),
82 GroupBy::Rollup(rollup) => GroupBy::Rollup(
83 std::mem::take(rollup)
84 .into_iter()
85 .map(|set| {
86 set.into_iter()
87 .map(|expr| rewriter.rewrite_expr(expr))
88 .collect()
89 })
90 .collect::<Vec<_>>(),
91 ),
92 GroupBy::Cube(cube) => GroupBy::Cube(
93 std::mem::take(cube)
94 .into_iter()
95 .map(|set| {
96 set.into_iter()
97 .map(|expr| rewriter.rewrite_expr(expr))
98 .collect()
99 })
100 .collect::<Vec<_>>(),
101 ),
102 };
103 self.group_by = new_group_by;
104
105 self.having = std::mem::take(&mut self.having).map(|expr| rewriter.rewrite_expr(expr));
106 }
107}
108
109impl BoundSelect {
110 pub fn schema(&self) -> &Schema {
112 &self.schema
113 }
114
115 pub fn exprs(&self) -> impl Iterator<Item = &ExprImpl> {
116 self.select_items
117 .iter()
118 .chain(self.group_by.iter())
119 .chain(self.where_clause.iter())
120 .chain(self.having.iter())
121 }
122
123 pub fn exprs_mut(&mut self) -> impl Iterator<Item = &mut ExprImpl> {
124 self.select_items
125 .iter_mut()
126 .chain(self.group_by.iter_mut())
127 .chain(self.where_clause.iter_mut())
128 .chain(self.having.iter_mut())
129 }
130
131 pub fn is_correlated(&self, depth: Depth) -> bool {
132 self.exprs()
133 .any(|expr| expr.has_correlated_input_ref_by_depth(depth))
134 || match self.from.as_ref() {
135 Some(relation) => relation.is_correlated(depth),
136 None => false,
137 }
138 }
139
140 pub fn collect_correlated_indices_by_depth_and_assign_id(
141 &mut self,
142 depth: Depth,
143 correlated_id: CorrelatedId,
144 ) -> Vec<usize> {
145 let mut correlated_indices = self
146 .exprs_mut()
147 .flat_map(|expr| {
148 expr.collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id)
149 })
150 .collect_vec();
151
152 if let Some(relation) = self.from.as_mut() {
153 correlated_indices.extend(
154 relation.collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
155 );
156 }
157
158 correlated_indices
159 }
160}
161
162#[derive(Debug, Clone)]
163pub enum BoundDistinct {
164 All,
165 Distinct,
166 DistinctOn(Vec<ExprImpl>),
167}
168
169impl RewriteExprsRecursive for BoundDistinct {
170 fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
171 if let Self::DistinctOn(exprs) = self {
172 let new_exprs = std::mem::take(exprs)
173 .into_iter()
174 .map(|expr| rewriter.rewrite_expr(expr))
175 .collect::<Vec<_>>();
176 exprs.extend(new_exprs);
177 }
178 }
179}
180
181impl BoundDistinct {
182 pub const fn is_all(&self) -> bool {
183 matches!(self, Self::All)
184 }
185
186 pub const fn is_distinct(&self) -> bool {
187 matches!(self, Self::Distinct)
188 }
189}
190
191impl Binder {
192 pub(super) fn bind_select(&mut self, select: Select) -> Result<BoundSelect> {
193 let from = self.bind_vec_table_with_joins(select.from)?;
195
196 let (select_items, aliases) = self.bind_select_list(select.projection)?;
198 let out_name_to_index = Self::build_name_to_index(aliases.iter().filter_map(Clone::clone));
199
200 let distinct = self.bind_distinct_on(select.distinct, &out_name_to_index, &select_items)?;
202
203 self.context.clause = Some(Clause::Where);
205 let selection = select
206 .selection
207 .map(|expr| {
208 self.bind_expr(expr)
209 .and_then(|expr| expr.enforce_bool_clause("WHERE"))
210 })
211 .transpose()?;
212 self.context.clause = None;
213
214 self.context.clause = Some(Clause::GroupBy);
216
217 let group_by = if select.group_by.len() == 1
219 && let Expr::GroupingSets(grouping_sets) = &select.group_by[0]
220 {
221 GroupBy::GroupingSets(self.bind_grouping_items_expr_in_select(
222 grouping_sets.clone(),
223 &out_name_to_index,
224 &select_items,
225 )?)
226 } else if select.group_by.len() == 1
227 && let Expr::Rollup(rollup) = &select.group_by[0]
228 {
229 GroupBy::Rollup(self.bind_grouping_items_expr_in_select(
230 rollup.clone(),
231 &out_name_to_index,
232 &select_items,
233 )?)
234 } else if select.group_by.len() == 1
235 && let Expr::Cube(cube) = &select.group_by[0]
236 {
237 GroupBy::Cube(self.bind_grouping_items_expr_in_select(
238 cube.clone(),
239 &out_name_to_index,
240 &select_items,
241 )?)
242 } else {
243 if select.group_by.iter().any(|expr| {
244 matches!(expr, Expr::GroupingSets(_))
245 || matches!(expr, Expr::Rollup(_))
246 || matches!(expr, Expr::Cube(_))
247 }) {
248 return Err(ErrorCode::BindError(
249 "Only support one grouping item in group by clause".to_owned(),
250 )
251 .into());
252 }
253 GroupBy::GroupKey(
254 select
255 .group_by
256 .into_iter()
257 .map(|expr| {
258 self.bind_group_by_expr_in_select(expr, &out_name_to_index, &select_items)
259 })
260 .try_collect()?,
261 )
262 };
263 self.context.clause = None;
264
265 self.context.clause = Some(Clause::Having);
267 let having = select
268 .having
269 .map(|expr| {
270 self.bind_expr(expr)
271 .and_then(|expr| expr.enforce_bool_clause("HAVING"))
272 })
273 .transpose()?;
274 self.context.clause = None;
275
276 let fields = select_items
278 .iter()
279 .zip_eq_fast(aliases.iter())
280 .map(|(s, a)| {
281 let name = a.clone().unwrap_or_else(|| UNNAMED_COLUMN.to_owned());
282 Ok(Field::with_name(s.return_type(), name))
283 })
284 .collect::<Result<Vec<Field>>>()?;
285
286 if let Some(Relation::Share(bound)) = &from {
287 if matches!(bound.input, BoundShareInput::ChangeLog(_))
288 && fields.iter().filter(|&x| x.name.eq(CHANGELOG_OP)).count() > 1
289 {
290 return Err(ErrorCode::BindError(
291 "The source table of changelog cannot have `changelog_op`, please rename it first".to_owned()
292 )
293 .into());
294 }
295 }
296
297 Ok(BoundSelect {
298 distinct,
299 select_items,
300 aliases,
301 from,
302 where_clause: selection,
303 group_by,
304 having,
305 schema: Schema { fields },
306 })
307 }
308
309 pub fn bind_select_list(
310 &mut self,
311 select_items: Vec<SelectItem>,
312 ) -> Result<(Vec<ExprImpl>, Vec<Option<String>>)> {
313 let mut select_list = vec![];
314 let mut aliases = vec![];
315 for item in select_items {
316 match item {
317 SelectItem::UnnamedExpr(expr) => {
318 let alias = derive_alias(&expr);
319 let bound = self.bind_expr(expr)?;
320 select_list.push(bound);
321 aliases.push(alias);
322 }
323 SelectItem::ExprWithAlias { expr, alias } => {
324 check_column_name_not_reserved(&alias.real_value())?;
325
326 let expr = self.bind_expr(expr)?;
327 select_list.push(expr);
328 aliases.push(Some(alias.real_value()));
329 }
330 SelectItem::QualifiedWildcard(obj_name, except) => {
331 let table_name = &obj_name.0.last().unwrap().real_value();
332 let except_indices = self.generate_except_indices(except)?;
333 let (begin, end) = self.context.range_of.get(table_name).ok_or_else(|| {
334 ErrorCode::ItemNotFound(format!("relation \"{}\"", table_name))
335 })?;
336 let (exprs, names) = Self::iter_bound_columns(
337 self.context.columns[*begin..*end]
338 .iter()
339 .filter(|c| !c.is_hidden && !except_indices.contains(&c.index)),
340 );
341 select_list.extend(exprs);
342 aliases.extend(names);
343 }
344 SelectItem::ExprQualifiedWildcard(expr, prefix) => {
345 let (exprs, names) = self.bind_wildcard_field_column(expr, prefix)?;
346 select_list.extend(exprs);
347 aliases.extend(names);
348 }
349 SelectItem::Wildcard(except) => {
350 if self.context.range_of.is_empty() {
351 return Err(ErrorCode::BindError(
352 "SELECT * with no tables specified is not valid".into(),
353 )
354 .into());
355 }
356
357 let (exprs, names) = self.iter_column_groups();
361 select_list.extend(exprs);
362 aliases.extend(names);
363
364 let except_indices = self.generate_except_indices(except)?;
365
366 let (exprs, names) =
368 Self::iter_bound_columns(self.context.columns[..].iter().filter(|c| {
369 !c.is_hidden
370 && !self
371 .context
372 .column_group_context
373 .mapping
374 .contains_key(&c.index)
375 && !except_indices.contains(&c.index)
376 }));
377
378 select_list.extend(exprs);
379 aliases.extend(names);
380 }
389 }
390 }
391 assert_eq!(select_list.len(), aliases.len());
392 Ok((select_list, aliases))
393 }
394
395 fn bind_group_by_expr_in_select(
417 &mut self,
418 expr: Expr,
419 name_to_index: &HashMap<String, usize>,
420 select_items: &[ExprImpl],
421 ) -> Result<ExprImpl> {
422 let name = match &expr {
423 Expr::Identifier(ident) => Some(ident.real_value()),
424 _ => None,
425 };
426 match self.bind_expr(expr) {
427 Ok(ExprImpl::Literal(lit)) => match lit.get_data() {
428 Some(ScalarImpl::Int32(idx)) => idx
429 .saturating_sub(1)
430 .try_into()
431 .ok()
432 .and_then(|i: usize| select_items.get(i).cloned())
433 .ok_or_else(|| {
434 ErrorCode::BindError(format!(
435 "GROUP BY position {idx} is not in select list"
436 ))
437 .into()
438 }),
439 _ => Err(ErrorCode::BindError("non-integer constant in GROUP BY".into()).into()),
440 },
441 Ok(e) => Ok(e),
442 Err(e) => match name {
443 None => Err(e),
444 Some(name) => match name_to_index.get(&name) {
445 None => Err(e),
446 Some(&usize::MAX) => Err(ErrorCode::BindError(format!(
447 "GROUP BY \"{name}\" is ambiguous"
448 ))
449 .into()),
450 Some(out_idx) => Ok(select_items[*out_idx].clone()),
451 },
452 },
453 }
454 }
455
456 fn bind_grouping_items_expr_in_select(
457 &mut self,
458 grouping_items: Vec<Vec<Expr>>,
459 name_to_index: &HashMap<String, usize>,
460 select_items: &[ExprImpl],
461 ) -> Result<Vec<Vec<ExprImpl>>> {
462 let mut result = vec![];
463 for set in grouping_items {
464 let mut set_exprs = vec![];
465 for expr in set {
466 let name = match &expr {
467 Expr::Identifier(ident) => Some(ident.real_value()),
468 _ => None,
469 };
470 let expr_impl = match self.bind_expr(expr) {
471 Ok(ExprImpl::Literal(lit)) => match lit.get_data() {
472 Some(ScalarImpl::Int32(idx)) => idx
473 .saturating_sub(1)
474 .try_into()
475 .ok()
476 .and_then(|i: usize| select_items.get(i).cloned())
477 .ok_or_else(|| {
478 ErrorCode::BindError(format!(
479 "GROUP BY position {idx} is not in select list"
480 ))
481 .into()
482 }),
483 _ => Err(
484 ErrorCode::BindError("non-integer constant in GROUP BY".into()).into(),
485 ),
486 },
487 Ok(e) => Ok(e),
488 Err(e) => match name {
489 None => Err(e),
490 Some(name) => match name_to_index.get(&name) {
491 None => Err(e),
492 Some(&usize::MAX) => Err(ErrorCode::BindError(format!(
493 "GROUP BY \"{name}\" is ambiguous"
494 ))
495 .into()),
496 Some(out_idx) => Ok(select_items[*out_idx].clone()),
497 },
498 },
499 };
500
501 set_exprs.push(expr_impl?);
502 }
503 result.push(set_exprs);
504 }
505 Ok(result)
506 }
507
508 pub fn bind_returning_list(
509 &mut self,
510 returning_items: Vec<SelectItem>,
511 ) -> Result<(Vec<ExprImpl>, Vec<Field>)> {
512 let (returning_list, aliases) = self.bind_select_list(returning_items)?;
513 if returning_list
514 .iter()
515 .any(|expr| expr.has_agg_call() || expr.has_window_function())
516 {
517 return Err(RwError::from(ErrorCode::BindError(
518 "should not have agg/window in the `RETURNING` list".to_owned(),
519 )));
520 }
521
522 let fields = returning_list
523 .iter()
524 .zip_eq_fast(aliases.iter())
525 .map(|(s, a)| {
526 let name = a.clone().unwrap_or_else(|| UNNAMED_COLUMN.to_owned());
527 Ok::<Field, RwError>(Field::with_name(s.return_type(), name))
528 })
529 .try_collect()?;
530 Ok((returning_list, fields))
531 }
532
533 pub fn iter_bound_columns<'a>(
534 column_binding: impl Iterator<Item = &'a ColumnBinding>,
535 ) -> (Vec<ExprImpl>, Vec<Option<String>>) {
536 column_binding
537 .map(|c| {
538 (
539 InputRef::new(c.index, c.field.data_type.clone()).into(),
540 Some(c.field.name.clone()),
541 )
542 })
543 .unzip()
544 }
545
546 pub fn iter_column_groups(&self) -> (Vec<ExprImpl>, Vec<Option<String>>) {
547 self.context
548 .column_group_context
549 .groups
550 .values()
551 .rev() .map(|g| {
553 if let Some(col) = &g.non_nullable_column {
554 let c = &self.context.columns[*col];
555 (
556 InputRef::new(c.index, c.field.data_type.clone()).into(),
557 Some(c.field.name.clone()),
558 )
559 } else {
560 let mut input_idxes = g.indices.iter().collect::<Vec<_>>();
561 input_idxes.sort();
562 let inputs = input_idxes
563 .into_iter()
564 .map(|index| {
565 let column = &self.context.columns[*index];
566 InputRef::new(column.index, column.field.data_type.clone()).into()
567 })
568 .collect::<Vec<_>>();
569 let c = &self.context.columns[*g.indices.iter().next().unwrap()];
570 (
571 FunctionCall::new(ExprType::Coalesce, inputs)
572 .expect("Failure binding COALESCE function call")
573 .into(),
574 Some(c.field.name.clone()),
575 )
576 }
577 })
578 .unzip()
579 }
580
581 fn bind_distinct_on(
596 &mut self,
597 distinct: Distinct,
598 name_to_index: &HashMap<String, usize>,
599 select_items: &[ExprImpl],
600 ) -> Result<BoundDistinct> {
601 Ok(match distinct {
602 Distinct::All => BoundDistinct::All,
603 Distinct::Distinct => BoundDistinct::Distinct,
604 Distinct::DistinctOn(exprs) => {
605 let mut bound_exprs = vec![];
606 for expr in exprs {
607 let expr_impl = match expr {
608 Expr::Identifier(name)
609 if let Some(index) = name_to_index.get(&name.real_value()) =>
610 {
611 match *index {
612 usize::MAX => {
613 return Err(ErrorCode::BindError(format!(
614 "DISTINCT ON \"{}\" is ambiguous",
615 name.real_value()
616 ))
617 .into());
618 }
619 _ => select_items[*index].clone(),
620 }
621 }
622 Expr::Value(Value::Number(number)) => match number.parse::<usize>() {
623 Ok(index) if 1 <= index && index <= select_items.len() => {
624 let idx_from_0 = index - 1;
625 select_items[idx_from_0].clone()
626 }
627 _ => {
628 return Err(ErrorCode::InvalidInputSyntax(format!(
629 "Invalid ordinal number in DISTINCT ON: {}",
630 number
631 ))
632 .into());
633 }
634 },
635 expr => self.bind_expr(expr)?,
636 };
637 bound_exprs.push(expr_impl);
638 }
639 BoundDistinct::DistinctOn(bound_exprs)
640 }
641 })
642 }
643
644 fn generate_except_indices(&mut self, except: Option<Vec<Expr>>) -> Result<HashSet<usize>> {
645 let mut except_indices: HashSet<usize> = HashSet::new();
646 if let Some(exprs) = except {
647 for expr in exprs {
648 let bound = self.bind_expr(expr)?;
649 match bound {
650 ExprImpl::InputRef(inner) => {
651 if !except_indices.insert(inner.index) {
652 return Err(ErrorCode::BindError(
653 "Duplicate entry in except list".into(),
654 )
655 .into());
656 }
657 }
658 _ => {
659 return Err(ErrorCode::BindError(
660 "Only support column name in except list".into(),
661 )
662 .into());
663 }
664 }
665 }
666 }
667 Ok(except_indices)
668 }
669}
670
671fn derive_alias(expr: &Expr) -> Option<String> {
672 match expr.clone() {
673 Expr::Identifier(ident) => Some(ident.real_value()),
674 Expr::CompoundIdentifier(idents) => idents.last().map(|ident| ident.real_value()),
675 Expr::FieldIdentifier(_, idents) => idents.last().map(|ident| ident.real_value()),
676 Expr::Function(func) => Some(func.name.real_value()),
677 Expr::Extract { .. } => Some("extract".to_owned()),
678 Expr::Case { .. } => Some("case".to_owned()),
679 Expr::Cast { expr, data_type } => {
680 derive_alias(&expr).or_else(|| data_type_to_alias(&data_type))
681 }
682 Expr::TypedString { data_type, .. } => data_type_to_alias(&data_type),
683 Expr::Value(Value::Interval { .. }) => Some("interval".to_owned()),
684 Expr::Row(_) => Some("row".to_owned()),
685 Expr::Array(_) => Some("array".to_owned()),
686 Expr::Index { obj, index: _ } => derive_alias(&obj),
687 _ => None,
688 }
689}
690
691fn data_type_to_alias(data_type: &AstDataType) -> Option<String> {
692 let alias = match data_type {
693 AstDataType::Char(_) => "bpchar".to_owned(),
694 AstDataType::Varchar => "varchar".to_owned(),
695 AstDataType::Uuid => "uuid".to_owned(),
696 AstDataType::Decimal(_, _) => "numeric".to_owned(),
697 AstDataType::Real | AstDataType::Float(Some(1..=24)) => "float4".to_owned(),
698 AstDataType::Double | AstDataType::Float(Some(25..=53) | None) => "float8".to_owned(),
699 AstDataType::Float(Some(0 | 54..)) => unreachable!(),
700 AstDataType::SmallInt => "int2".to_owned(),
701 AstDataType::Int => "int4".to_owned(),
702 AstDataType::BigInt => "int8".to_owned(),
703 AstDataType::Boolean => "bool".to_owned(),
704 AstDataType::Date => "date".to_owned(),
705 AstDataType::Time(tz) => format!("time{}", if *tz { "z" } else { "" }),
706 AstDataType::Timestamp(tz) => {
707 format!("timestamp{}", if *tz { "tz" } else { "" })
708 }
709 AstDataType::Interval => "interval".to_owned(),
710 AstDataType::Regclass => "regclass".to_owned(),
711 AstDataType::Regproc => "regproc".to_owned(),
712 AstDataType::Text => "text".to_owned(),
713 AstDataType::Bytea => "bytea".to_owned(),
714 AstDataType::Jsonb => "jsonb".to_owned(),
715 AstDataType::Array(ty) => return data_type_to_alias(ty),
716 AstDataType::Custom(ty) => format!("{}", ty),
717 AstDataType::Struct(_) | AstDataType::Map(_) => {
718 return None;
720 }
721 };
722
723 Some(alias)
724}