1use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17
18use itertools::Itertools;
19use risingwave_common::catalog::{Field, Schema};
20use risingwave_common::types::ScalarImpl;
21use risingwave_common::util::iter_util::ZipEqFast;
22use risingwave_sqlparser::ast::{
23 DataType as AstDataType, Distinct, Expr, Select, SelectItem, Value, WindowSpec,
24};
25
26use super::bind_context::{Clause, ColumnBinding};
27use super::statement::RewriteExprsRecursive;
28use super::{BoundShareInput, UNNAMED_COLUMN};
29use crate::binder::{Binder, Relation};
30use crate::catalog::check_column_name_not_reserved;
31use crate::error::{ErrorCode, Result, RwError};
32use crate::expr::{CorrelatedId, Depth, Expr as _, ExprImpl, ExprType, FunctionCall, InputRef};
33use crate::optimizer::plan_node::generic::CHANGELOG_OP;
34use crate::utils::group_by::GroupBy;
35
36#[derive(Debug, Clone)]
37pub struct BoundSelect {
38 pub distinct: BoundDistinct,
39 pub select_items: Vec<ExprImpl>,
40 pub aliases: Vec<Option<String>>,
41 pub from: Option<Relation>,
42 pub where_clause: Option<ExprImpl>,
43 pub group_by: GroupBy,
44 pub having: Option<ExprImpl>,
45 pub window: HashMap<String, WindowSpec>,
46 pub schema: Schema,
47}
48
49impl RewriteExprsRecursive for BoundSelect {
50 fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
51 self.distinct.rewrite_exprs_recursive(rewriter);
52
53 let new_select_items = std::mem::take(&mut self.select_items)
54 .into_iter()
55 .map(|expr| rewriter.rewrite_expr(expr))
56 .collect::<Vec<_>>();
57 self.select_items = new_select_items;
58
59 if let Some(from) = &mut self.from {
60 from.rewrite_exprs_recursive(rewriter);
61 }
62
63 self.where_clause =
64 std::mem::take(&mut self.where_clause).map(|expr| rewriter.rewrite_expr(expr));
65
66 let new_group_by = match &mut self.group_by {
67 GroupBy::GroupKey(group_key) => GroupBy::GroupKey(
68 std::mem::take(group_key)
69 .into_iter()
70 .map(|expr| rewriter.rewrite_expr(expr))
71 .collect::<Vec<_>>(),
72 ),
73 GroupBy::GroupingSets(grouping_sets) => GroupBy::GroupingSets(
74 std::mem::take(grouping_sets)
75 .into_iter()
76 .map(|set| {
77 set.into_iter()
78 .map(|expr| rewriter.rewrite_expr(expr))
79 .collect()
80 })
81 .collect::<Vec<_>>(),
82 ),
83 GroupBy::Rollup(rollup) => GroupBy::Rollup(
84 std::mem::take(rollup)
85 .into_iter()
86 .map(|set| {
87 set.into_iter()
88 .map(|expr| rewriter.rewrite_expr(expr))
89 .collect()
90 })
91 .collect::<Vec<_>>(),
92 ),
93 GroupBy::Cube(cube) => GroupBy::Cube(
94 std::mem::take(cube)
95 .into_iter()
96 .map(|set| {
97 set.into_iter()
98 .map(|expr| rewriter.rewrite_expr(expr))
99 .collect()
100 })
101 .collect::<Vec<_>>(),
102 ),
103 };
104 self.group_by = new_group_by;
105
106 self.having = std::mem::take(&mut self.having).map(|expr| rewriter.rewrite_expr(expr));
107 }
108}
109
110impl BoundSelect {
111 pub fn schema(&self) -> &Schema {
113 &self.schema
114 }
115
116 pub fn exprs(&self) -> impl Iterator<Item = &ExprImpl> {
117 self.select_items
118 .iter()
119 .chain(self.group_by.iter())
120 .chain(self.where_clause.iter())
121 .chain(self.having.iter())
122 }
123
124 pub fn exprs_mut(&mut self) -> impl Iterator<Item = &mut ExprImpl> {
125 self.select_items
126 .iter_mut()
127 .chain(self.group_by.iter_mut())
128 .chain(self.where_clause.iter_mut())
129 .chain(self.having.iter_mut())
130 }
131
132 pub fn is_correlated_by_depth(&self, depth: Depth) -> bool {
133 self.exprs()
134 .any(|expr| expr.has_correlated_input_ref_by_depth(depth))
135 || match self.from.as_ref() {
136 Some(relation) => relation.is_correlated_by_depth(depth),
137 None => false,
138 }
139 }
140
141 pub fn is_correlated_by_correlated_id(&self, correlated_id: CorrelatedId) -> bool {
142 self.exprs()
143 .any(|expr| expr.has_correlated_input_ref_by_correlated_id(correlated_id))
144 || match self.from.as_ref() {
145 Some(relation) => relation.is_correlated_by_correlated_id(correlated_id),
146 None => false,
147 }
148 }
149
150 pub fn collect_correlated_indices_by_depth_and_assign_id(
151 &mut self,
152 depth: Depth,
153 correlated_id: CorrelatedId,
154 ) -> Vec<usize> {
155 let mut correlated_indices = self
156 .exprs_mut()
157 .flat_map(|expr| {
158 expr.collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id)
159 })
160 .collect_vec();
161
162 if let Some(relation) = self.from.as_mut() {
163 correlated_indices.extend(
164 relation.collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
165 );
166 }
167
168 correlated_indices
169 }
170}
171
172#[derive(Debug, Clone)]
173pub enum BoundDistinct {
174 All,
175 Distinct,
176 DistinctOn(Vec<ExprImpl>),
177}
178
179impl RewriteExprsRecursive for BoundDistinct {
180 fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
181 if let Self::DistinctOn(exprs) = self {
182 let new_exprs = std::mem::take(exprs)
183 .into_iter()
184 .map(|expr| rewriter.rewrite_expr(expr))
185 .collect::<Vec<_>>();
186 exprs.extend(new_exprs);
187 }
188 }
189}
190
191impl BoundDistinct {
192 pub const fn is_all(&self) -> bool {
193 matches!(self, Self::All)
194 }
195
196 pub const fn is_distinct(&self) -> bool {
197 matches!(self, Self::Distinct)
198 }
199}
200
201impl Binder {
202 pub(super) fn bind_select(&mut self, select: &Select) -> Result<BoundSelect> {
203 let from = self.bind_vec_table_with_joins(&select.from)?;
205
206 let mut named_windows = HashMap::new();
208 for named_window in &select.window {
209 let window_name = named_window.name.real_value();
210 if named_windows.contains_key(&window_name) {
211 return Err(ErrorCode::InvalidInputSyntax(format!(
212 "window \"{}\" is already defined",
213 window_name
214 ))
215 .into());
216 }
217 named_windows.insert(window_name, named_window.window_spec.clone());
218 }
219
220 self.context.named_windows = named_windows.clone();
222
223 let (select_items, aliases) = self.bind_select_list(&select.projection)?;
225 let out_name_to_index = Self::build_name_to_index(aliases.iter().filter_map(Clone::clone));
226
227 let distinct =
229 self.bind_distinct_on(&select.distinct, &out_name_to_index, &select_items)?;
230
231 self.context.clause = Some(Clause::Where);
233 let selection = select
234 .selection
235 .as_ref()
236 .map(|expr| {
237 self.bind_expr(expr)
238 .and_then(|expr| expr.enforce_bool_clause("WHERE"))
239 })
240 .transpose()?;
241 self.context.clause = None;
242
243 self.context.clause = Some(Clause::GroupBy);
245
246 let group_by = if select.group_by.len() == 1
248 && let Expr::GroupingSets(grouping_sets) = &select.group_by[0]
249 {
250 GroupBy::GroupingSets(self.bind_grouping_items_expr_in_select(
251 grouping_sets.clone(),
252 &out_name_to_index,
253 &select_items,
254 )?)
255 } else if select.group_by.len() == 1
256 && let Expr::Rollup(rollup) = &select.group_by[0]
257 {
258 GroupBy::Rollup(self.bind_grouping_items_expr_in_select(
259 rollup.clone(),
260 &out_name_to_index,
261 &select_items,
262 )?)
263 } else if select.group_by.len() == 1
264 && let Expr::Cube(cube) = &select.group_by[0]
265 {
266 GroupBy::Cube(self.bind_grouping_items_expr_in_select(
267 cube.clone(),
268 &out_name_to_index,
269 &select_items,
270 )?)
271 } else {
272 if select.group_by.iter().any(|expr| {
273 matches!(expr, Expr::GroupingSets(_))
274 || matches!(expr, Expr::Rollup(_))
275 || matches!(expr, Expr::Cube(_))
276 }) {
277 return Err(ErrorCode::BindError(
278 "Only support one grouping item in group by clause".to_owned(),
279 )
280 .into());
281 }
282 GroupBy::GroupKey(
283 select
284 .group_by
285 .iter()
286 .map(|expr| {
287 self.bind_group_by_expr_in_select(expr, &out_name_to_index, &select_items)
288 })
289 .try_collect()?,
290 )
291 };
292 self.context.clause = None;
293
294 self.context.clause = Some(Clause::Having);
296 let having = select
297 .having
298 .as_ref()
299 .map(|expr| {
300 self.bind_expr(expr)
301 .and_then(|expr| expr.enforce_bool_clause("HAVING"))
302 })
303 .transpose()?;
304 self.context.clause = None;
305
306 let fields = select_items
308 .iter()
309 .zip_eq_fast(aliases.iter())
310 .map(|(s, a)| {
311 let name = a.clone().unwrap_or_else(|| UNNAMED_COLUMN.to_owned());
312 Ok(Field::with_name(s.return_type(), name))
313 })
314 .collect::<Result<Vec<Field>>>()?;
315
316 if let Some(Relation::Share(bound)) = &from
317 && matches!(bound.input, BoundShareInput::ChangeLog(_))
318 && fields.iter().filter(|&x| x.name.eq(CHANGELOG_OP)).count() > 1
319 {
320 return Err(ErrorCode::BindError(
321 "The source table of changelog cannot have `changelog_op`, please rename it first"
322 .to_owned(),
323 )
324 .into());
325 }
326
327 Ok(BoundSelect {
328 distinct,
329 select_items,
330 aliases,
331 from,
332 where_clause: selection,
333 group_by,
334 having,
335 window: named_windows,
336 schema: Schema { fields },
337 })
338 }
339
340 pub fn bind_select_list(
341 &mut self,
342 select_items: &[SelectItem],
343 ) -> Result<(Vec<ExprImpl>, Vec<Option<String>>)> {
344 let mut select_list = vec![];
345 let mut aliases = vec![];
346 for item in select_items {
347 match item {
348 SelectItem::UnnamedExpr(expr) => {
349 let alias = derive_alias(expr);
350 let bound = self.bind_expr(expr)?;
351 select_list.push(bound);
352 aliases.push(alias);
353 }
354 SelectItem::ExprWithAlias { expr, alias } => {
355 check_column_name_not_reserved(&alias.real_value())?;
356
357 let expr = self.bind_expr(expr)?;
358 select_list.push(expr);
359 aliases.push(Some(alias.real_value()));
360 }
361 SelectItem::QualifiedWildcard(obj_name, except) => {
362 let (schema_name, table_name) =
363 Binder::resolve_schema_qualified_name(&self.db_name, obj_name)?;
364 let except_indices = self.generate_except_indices(except.as_deref())?;
365 let (begin, end) = self
366 .context
367 .range_of
368 .get(&(schema_name, table_name.clone()))
369 .ok_or_else(|| {
370 ErrorCode::ItemNotFound(format!("relation \"{}\"", table_name))
371 })?;
372 let (exprs, names) = Self::iter_bound_columns(
373 self.context.columns[*begin..*end]
374 .iter()
375 .filter(|c| !c.is_hidden && !except_indices.contains(&c.index)),
376 );
377 select_list.extend(exprs);
378 aliases.extend(names);
379 }
380 SelectItem::ExprQualifiedWildcard(expr, prefix) => {
381 let (exprs, names) = self.bind_wildcard_field_column(expr, prefix)?;
382 select_list.extend(exprs);
383 aliases.extend(names);
384 }
385 SelectItem::Wildcard(except) => {
386 if self.context.range_of.is_empty() {
387 return Err(ErrorCode::BindError(
388 "SELECT * with no tables specified is not valid".into(),
389 )
390 .into());
391 }
392
393 let (exprs, names) = self.iter_column_groups();
397 select_list.extend(exprs);
398 aliases.extend(names);
399
400 let except_indices = self.generate_except_indices(except.as_deref())?;
401
402 let (exprs, names) =
404 Self::iter_bound_columns(self.context.columns[..].iter().filter(|c| {
405 !c.is_hidden
406 && !self
407 .context
408 .column_group_context
409 .mapping
410 .contains_key(&c.index)
411 && !except_indices.contains(&c.index)
412 }));
413
414 select_list.extend(exprs);
415 aliases.extend(names);
416 }
425 }
426 }
427 assert_eq!(select_list.len(), aliases.len());
428 Ok((select_list, aliases))
429 }
430
431 fn bind_group_by_expr_in_select(
453 &mut self,
454 expr: &Expr,
455 name_to_index: &HashMap<String, usize>,
456 select_items: &[ExprImpl],
457 ) -> Result<ExprImpl> {
458 let name = match &expr {
459 Expr::Identifier(ident) => Some(ident.real_value()),
460 _ => None,
461 };
462 match self.bind_expr(expr) {
463 Ok(ExprImpl::Literal(lit)) => match lit.get_data() {
464 Some(ScalarImpl::Int32(idx)) => idx
465 .saturating_sub(1)
466 .try_into()
467 .ok()
468 .and_then(|i: usize| select_items.get(i).cloned())
469 .ok_or_else(|| {
470 ErrorCode::BindError(format!(
471 "GROUP BY position {idx} is not in select list"
472 ))
473 .into()
474 }),
475 _ => Err(ErrorCode::BindError("non-integer constant in GROUP BY".into()).into()),
476 },
477 Ok(e) => Ok(e),
478 Err(e) => match name {
479 None => Err(e),
480 Some(name) => match name_to_index.get(&name) {
481 None => Err(e),
482 Some(&usize::MAX) => Err(ErrorCode::BindError(format!(
483 "GROUP BY \"{name}\" is ambiguous"
484 ))
485 .into()),
486 Some(out_idx) => Ok(select_items[*out_idx].clone()),
487 },
488 },
489 }
490 }
491
492 fn bind_grouping_items_expr_in_select(
493 &mut self,
494 grouping_items: Vec<Vec<Expr>>,
495 name_to_index: &HashMap<String, usize>,
496 select_items: &[ExprImpl],
497 ) -> Result<Vec<Vec<ExprImpl>>> {
498 let mut result = vec![];
499 for set in grouping_items {
500 let mut set_exprs = vec![];
501 for expr in set {
502 let name = match &expr {
503 Expr::Identifier(ident) => Some(ident.real_value()),
504 _ => None,
505 };
506 let expr_impl = match self.bind_expr(&expr) {
507 Ok(ExprImpl::Literal(lit)) => match lit.get_data() {
508 Some(ScalarImpl::Int32(idx)) => idx
509 .saturating_sub(1)
510 .try_into()
511 .ok()
512 .and_then(|i: usize| select_items.get(i).cloned())
513 .ok_or_else(|| {
514 ErrorCode::BindError(format!(
515 "GROUP BY position {idx} is not in select list"
516 ))
517 .into()
518 }),
519 _ => Err(
520 ErrorCode::BindError("non-integer constant in GROUP BY".into()).into(),
521 ),
522 },
523 Ok(e) => Ok(e),
524 Err(e) => match name {
525 None => Err(e),
526 Some(name) => match name_to_index.get(&name) {
527 None => Err(e),
528 Some(&usize::MAX) => Err(ErrorCode::BindError(format!(
529 "GROUP BY \"{name}\" is ambiguous"
530 ))
531 .into()),
532 Some(out_idx) => Ok(select_items[*out_idx].clone()),
533 },
534 },
535 };
536
537 set_exprs.push(expr_impl?);
538 }
539 result.push(set_exprs);
540 }
541 Ok(result)
542 }
543
544 pub fn bind_returning_list(
545 &mut self,
546 returning_items: Vec<SelectItem>,
547 ) -> Result<(Vec<ExprImpl>, Vec<Field>)> {
548 let (returning_list, aliases) = self.bind_select_list(&returning_items)?;
549 if returning_list
550 .iter()
551 .any(|expr| expr.has_agg_call() || expr.has_window_function())
552 {
553 return Err(RwError::from(ErrorCode::BindError(
554 "should not have agg/window in the `RETURNING` list".to_owned(),
555 )));
556 }
557
558 let fields = returning_list
559 .iter()
560 .zip_eq_fast(aliases.iter())
561 .map(|(s, a)| {
562 let name = a.clone().unwrap_or_else(|| UNNAMED_COLUMN.to_owned());
563 Ok::<Field, RwError>(Field::with_name(s.return_type(), name))
564 })
565 .try_collect()?;
566 Ok((returning_list, fields))
567 }
568
569 pub fn iter_bound_columns<'a>(
570 column_binding: impl Iterator<Item = &'a ColumnBinding>,
571 ) -> (Vec<ExprImpl>, Vec<Option<String>>) {
572 column_binding
573 .map(|c| {
574 (
575 InputRef::new(c.index, c.field.data_type.clone()).into(),
576 Some(c.field.name.clone()),
577 )
578 })
579 .unzip()
580 }
581
582 pub fn iter_column_groups(&self) -> (Vec<ExprImpl>, Vec<Option<String>>) {
583 self.context
584 .column_group_context
585 .groups
586 .values()
587 .rev() .map(|g| {
589 if let Some(col) = &g.non_nullable_column {
590 let c = &self.context.columns[*col];
591 (
592 InputRef::new(c.index, c.field.data_type.clone()).into(),
593 Some(c.field.name.clone()),
594 )
595 } else {
596 let mut input_idxes = g.indices.iter().collect::<Vec<_>>();
597 input_idxes.sort();
598 let inputs = input_idxes
599 .into_iter()
600 .map(|index| {
601 let column = &self.context.columns[*index];
602 InputRef::new(column.index, column.field.data_type.clone()).into()
603 })
604 .collect::<Vec<_>>();
605 let c = &self.context.columns[*g.indices.iter().next().unwrap()];
606 (
607 FunctionCall::new(ExprType::Coalesce, inputs)
608 .expect("Failure binding COALESCE function call")
609 .into(),
610 Some(c.field.name.clone()),
611 )
612 }
613 })
614 .unzip()
615 }
616
617 fn bind_distinct_on(
632 &mut self,
633 distinct: &Distinct,
634 name_to_index: &HashMap<String, usize>,
635 select_items: &[ExprImpl],
636 ) -> Result<BoundDistinct> {
637 Ok(match distinct {
638 Distinct::All => BoundDistinct::All,
639 Distinct::Distinct => BoundDistinct::Distinct,
640 Distinct::DistinctOn(exprs) => {
641 let mut bound_exprs = vec![];
642 for expr in exprs {
643 let expr_impl = match expr {
644 Expr::Identifier(name)
645 if let Some(index) = name_to_index.get(&name.real_value()) =>
646 {
647 match *index {
648 usize::MAX => {
649 return Err(ErrorCode::BindError(format!(
650 "DISTINCT ON \"{}\" is ambiguous",
651 name.real_value()
652 ))
653 .into());
654 }
655 _ => select_items[*index].clone(),
656 }
657 }
658 Expr::Value(Value::Number(number)) => match number.parse::<usize>() {
659 Ok(index) if 1 <= index && index <= select_items.len() => {
660 let idx_from_0 = index - 1;
661 select_items[idx_from_0].clone()
662 }
663 _ => {
664 return Err(ErrorCode::InvalidInputSyntax(format!(
665 "Invalid ordinal number in DISTINCT ON: {}",
666 number
667 ))
668 .into());
669 }
670 },
671 expr => self.bind_expr(expr)?,
672 };
673 bound_exprs.push(expr_impl);
674 }
675 BoundDistinct::DistinctOn(bound_exprs)
676 }
677 })
678 }
679
680 fn generate_except_indices(&mut self, except: Option<&[Expr]>) -> Result<HashSet<usize>> {
681 let mut except_indices: HashSet<usize> = HashSet::new();
682 if let Some(exprs) = except {
683 for expr in exprs {
684 let bound = self.bind_expr(expr)?;
685 match bound {
686 ExprImpl::InputRef(inner) => {
687 if !except_indices.insert(inner.index) {
688 return Err(ErrorCode::BindError(
689 "Duplicate entry in except list".into(),
690 )
691 .into());
692 }
693 }
694 _ => {
695 return Err(ErrorCode::BindError(
696 "Only support column name in except list".into(),
697 )
698 .into());
699 }
700 }
701 }
702 }
703 Ok(except_indices)
704 }
705}
706
707fn derive_alias(expr: &Expr) -> Option<String> {
708 match expr.clone() {
709 Expr::Identifier(ident) => Some(ident.real_value()),
710 Expr::CompoundIdentifier(idents) => idents.last().map(|ident| ident.real_value()),
711 Expr::FieldIdentifier(_, idents) => idents.last().map(|ident| ident.real_value()),
712 Expr::Function(func) => Some(func.name.real_value()),
713 Expr::Extract { .. } => Some("extract".to_owned()),
714 Expr::Case { .. } => Some("case".to_owned()),
715 Expr::Cast { expr, data_type } => {
716 derive_alias(&expr).or_else(|| data_type_to_alias(&data_type))
717 }
718 Expr::TypedString { data_type, .. } => data_type_to_alias(&data_type),
719 Expr::Value(Value::Interval { .. }) => Some("interval".to_owned()),
720 Expr::Row(_) => Some("row".to_owned()),
721 Expr::Array(_) => Some("array".to_owned()),
722 Expr::Index { obj, index: _ } => derive_alias(&obj),
723 _ => None,
724 }
725}
726
727fn data_type_to_alias(data_type: &AstDataType) -> Option<String> {
728 let alias = match data_type {
729 AstDataType::Char(_) => "bpchar".to_owned(),
730 AstDataType::Varchar => "varchar".to_owned(),
731 AstDataType::Uuid => "uuid".to_owned(),
732 AstDataType::Decimal(_, _) => "numeric".to_owned(),
733 AstDataType::Real | AstDataType::Float(Some(1..=24)) => "float4".to_owned(),
734 AstDataType::Double | AstDataType::Float(Some(25..=53) | None) => "float8".to_owned(),
735 AstDataType::Float(Some(0 | 54..)) => unreachable!(),
736 AstDataType::SmallInt => "int2".to_owned(),
737 AstDataType::Int => "int4".to_owned(),
738 AstDataType::BigInt => "int8".to_owned(),
739 AstDataType::Boolean => "bool".to_owned(),
740 AstDataType::Date => "date".to_owned(),
741 AstDataType::Time(tz) => format!("time{}", if *tz { "z" } else { "" }),
742 AstDataType::Timestamp(tz) => {
743 format!("timestamp{}", if *tz { "tz" } else { "" })
744 }
745 AstDataType::Interval => "interval".to_owned(),
746 AstDataType::Regclass => "regclass".to_owned(),
747 AstDataType::Regproc => "regproc".to_owned(),
748 AstDataType::Text => "text".to_owned(),
749 AstDataType::Bytea => "bytea".to_owned(),
750 AstDataType::Jsonb => "jsonb".to_owned(),
751 AstDataType::Array(ty) => return data_type_to_alias(ty),
752 AstDataType::Custom(ty) => format!("{}", ty),
753 AstDataType::Vector(_) => "vector".to_owned(),
754 AstDataType::Struct(_) | AstDataType::Map(_) => {
755 return None;
757 }
758 };
759
760 Some(alias)
761}