1use std::cell::RefCell;
16use std::collections::HashMap;
17use std::rc::Rc;
18
19use risingwave_common::catalog::Schema;
20use risingwave_common::types::DataType;
21use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
22use risingwave_sqlparser::ast::{
23 Corresponding, Cte, CteInner, Expr, Fetch, OrderByExpr, Query, SetExpr, SetOperator, Value,
24 With,
25};
26use thiserror_ext::AsReport;
27
28use super::BoundValues;
29use super::bind_context::BindingCteState;
30use super::statement::RewriteExprsRecursive;
31use crate::binder::bind_context::{BindingCte, RecursiveUnion};
32use crate::binder::{Binder, BoundSetExpr};
33use crate::error::{ErrorCode, Result, RwError};
34use crate::expr::{CorrelatedId, Depth, ExprImpl, ExprRewriter};
35
36#[derive(Debug, Clone)]
39pub struct BoundQuery {
40 pub body: BoundSetExpr,
41 pub order: Vec<ColumnOrder>,
42 pub limit: Option<u64>,
43 pub offset: Option<u64>,
44 pub with_ties: bool,
45 pub extra_order_exprs: Vec<ExprImpl>,
46}
47
48impl BoundQuery {
49 pub fn schema(&self) -> std::borrow::Cow<'_, Schema> {
51 self.body.schema()
52 }
53
54 pub fn data_types(&self) -> Vec<DataType> {
56 self.schema().data_types()
57 }
58
59 pub fn is_correlated_by_depth(&self, depth: Depth) -> bool {
90 self.body.is_correlated_by_depth(depth + 1)
91 || self
92 .extra_order_exprs
93 .iter()
94 .any(|e| e.has_correlated_input_ref_by_depth(depth + 1))
95 }
96
97 pub fn is_correlated_by_correlated_id(&self, correlated_id: CorrelatedId) -> bool {
98 self.body.is_correlated_by_correlated_id(correlated_id)
99 || self
100 .extra_order_exprs
101 .iter()
102 .any(|e| e.has_correlated_input_ref_by_correlated_id(correlated_id))
103 }
104
105 pub fn collect_correlated_indices_by_depth_and_assign_id(
106 &mut self,
107 depth: Depth,
108 correlated_id: CorrelatedId,
109 ) -> Vec<usize> {
110 let mut correlated_indices = vec![];
111
112 correlated_indices.extend(
113 self.body
114 .collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id),
115 );
116
117 correlated_indices.extend(self.extra_order_exprs.iter_mut().flat_map(|expr| {
118 expr.collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id)
119 }));
120 correlated_indices
121 }
122
123 pub fn with_values(values: BoundValues) -> Self {
125 BoundQuery {
126 body: BoundSetExpr::Values(values.into()),
127 order: vec![],
128 limit: None,
129 offset: None,
130 with_ties: false,
131 extra_order_exprs: vec![],
132 }
133 }
134}
135
136impl RewriteExprsRecursive for BoundQuery {
137 fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl ExprRewriter) {
138 let new_extra_order_exprs = std::mem::take(&mut self.extra_order_exprs)
139 .into_iter()
140 .map(|expr| rewriter.rewrite_expr(expr))
141 .collect::<Vec<_>>();
142 self.extra_order_exprs = new_extra_order_exprs;
143
144 self.body.rewrite_exprs_recursive(rewriter);
145 }
146}
147
148impl Binder {
149 pub fn bind_query(&mut self, query: &Query) -> Result<BoundQuery> {
156 self.push_context();
157 let result = self.bind_query_inner(query);
158 self.pop_context()?;
159 result
160 }
161
162 pub fn bind_query_for_view(&mut self, query: &Query) -> Result<BoundQuery> {
165 self.push_context();
166 self.context.disable_security_invoker = true;
167 let result = self.bind_query_inner(query);
168 self.pop_context()?;
169 result
170 }
171
172 pub(super) fn bind_query_inner(
174 &mut self,
175 Query {
176 with,
177 body,
178 order_by,
179 limit,
180 offset,
181 fetch,
182 }: &Query,
183 ) -> Result<BoundQuery> {
184 let mut with_ties = false;
185 let limit = match (limit, fetch) {
186 (None, None) => None,
187 (
188 None,
189 Some(Fetch {
190 with_ties: fetch_with_ties,
191 quantity,
192 }),
193 ) => {
194 with_ties = *fetch_with_ties;
195 match quantity {
196 Some(v) => Some(Expr::Value(Value::Number(v.clone()))),
197 None => Some(Expr::Value(Value::Number("1".to_owned()))),
198 }
199 }
200 (Some(limit), None) => Some(limit.clone()),
201 (Some(_), Some(_)) => unreachable!(), };
203 let limit_expr = limit.map(|expr| self.bind_expr(&expr)).transpose()?;
204 let limit = if let Some(limit_expr) = limit_expr {
205 let limit_cast_to_bigint = limit_expr.cast_assign(&DataType::Int64).map_err(|_| {
207 RwError::from(ErrorCode::ExprError(
208 "expects an integer or expression that can be evaluated to an integer after LIMIT"
209 .into(),
210 ))
211 })?;
212 let limit = match limit_cast_to_bigint.try_fold_const() {
213 Some(Ok(Some(datum))) => {
214 let value = datum.as_int64();
215 if *value < 0 {
216 return Err(ErrorCode::ExprError(
217 format!("LIMIT must not be negative, but found: {}", *value).into(),
218 )
219 .into());
220 }
221 *value as u64
222 }
223 Some(Ok(None)) => {
225 u64::MAX
226 }
227 None => return Err(ErrorCode::ExprError(
229 "expects an integer or expression that can be evaluated to an integer after LIMIT, but found non-const expression"
230 .into(),
231 ).into()),
232 Some(Err(e)) => {
234 return Err(ErrorCode::ExprError(
235 format!("expects an integer or expression that can be evaluated to an integer after LIMIT,\nbut the evaluation of the expression returns error:{}", e.as_report()
236 ).into(),
237 ).into())
238 }
239 };
240 Some(limit)
241 } else {
242 None
243 };
244
245 let offset = offset
246 .as_ref()
247 .map(|s| parse_non_negative_i64("OFFSET", s))
248 .transpose()?
249 .map(|v| v as u64);
250
251 if let Some(with) = with {
252 self.bind_with(with)?;
253 }
254 let body = self.bind_set_expr(body)?;
255 let name_to_index =
256 Self::build_name_to_index(body.schema().fields().iter().map(|f| f.name.clone()));
257 let mut extra_order_exprs = vec![];
258 let visible_output_num = body.schema().len();
259 let order = order_by
260 .iter()
261 .map(|order_by_expr| {
262 self.bind_order_by_expr_in_query(
263 order_by_expr,
264 &body,
265 &name_to_index,
266 &mut extra_order_exprs,
267 visible_output_num,
268 )
269 })
270 .collect::<Result<_>>()?;
271 Ok(BoundQuery {
272 body,
273 order,
274 limit,
275 offset,
276 with_ties,
277 extra_order_exprs,
278 })
279 }
280
281 pub fn build_name_to_index(names: impl Iterator<Item = String>) -> HashMap<String, usize> {
282 let mut m = HashMap::new();
283 names.enumerate().for_each(|(index, name)| {
284 m.entry(name)
285 .and_modify(|v| *v = usize::MAX)
288 .or_insert(index);
289 });
290 m
291 }
292
293 fn bind_order_by_expr_in_query(
306 &mut self,
307 OrderByExpr {
308 expr,
309 asc,
310 nulls_first,
311 }: &OrderByExpr,
312 body: &BoundSetExpr,
313 name_to_index: &HashMap<String, usize>,
314 extra_order_exprs: &mut Vec<ExprImpl>,
315 visible_output_num: usize,
316 ) -> Result<ColumnOrder> {
317 let order_type = OrderType::from_bools(*asc, *nulls_first);
318
319 let select_items_for_match = match body {
325 BoundSetExpr::Select(s) => Some(&s.select_items[..]),
326 _ => None,
327 };
328
329 let column_index = match expr {
330 Expr::Identifier(name) if let Some(index) = name_to_index.get(&name.real_value()) => {
331 match *index != usize::MAX {
332 true => *index,
333 false => {
334 return Err(ErrorCode::BindError(format!(
335 "ORDER BY \"{}\" is ambiguous",
336 name.real_value()
337 ))
338 .into());
339 }
340 }
341 }
342 Expr::Value(Value::Number(number)) => match number.parse::<usize>() {
343 Ok(index) if 1 <= index && index <= visible_output_num => index - 1,
344 _ => {
345 return Err(ErrorCode::InvalidInputSyntax(format!(
346 "Invalid ordinal number in ORDER BY: {}",
347 number
348 ))
349 .into());
350 }
351 },
352 expr => {
353 let bound_expr = self.bind_expr(expr)?;
354
355 if bound_expr.is_pure()
356 && let Some(select_items) = select_items_for_match
357 && let Some(existing_idx) = select_items.iter().position(|e| e == &bound_expr)
358 {
359 existing_idx
360 } else {
361 extra_order_exprs.push(bound_expr);
362 visible_output_num + extra_order_exprs.len() - 1
363 }
364 }
365 };
366 Ok(ColumnOrder::new(column_index, order_type))
367 }
368
369 fn bind_with(&mut self, with: &With) -> Result<()> {
370 for cte_table in &with.cte_tables {
371 let share_id = self.next_share_id();
373 let Cte { alias, cte_inner } = cte_table;
374 let table_name = alias.name.real_value();
375
376 if with.recursive {
377 if let CteInner::Query(query) = cte_inner {
378 let (all, corresponding, left, right, with) = Self::validate_rcte(query)?;
379
380 assert!(
382 !corresponding.is_corresponding(),
383 "`CORRESPONDING` is not supported in recursive CTE"
384 );
385
386 let entry = self
387 .context
388 .cte_to_relation
389 .entry(table_name)
390 .insert_entry(Rc::new(RefCell::new(BindingCte {
391 share_id,
392 state: BindingCteState::Init,
393 alias: alias.clone(),
394 })))
395 .get()
396 .clone();
397
398 self.bind_rcte(with, entry, left, right, all)?;
399 } else {
400 return Err(ErrorCode::BindError(
401 "RECURSIVE CTE only support query".to_owned(),
402 )
403 .into());
404 }
405 } else {
406 match cte_inner {
407 CteInner::Query(query) => {
408 let bound_query = self.bind_query(query)?;
409 self.context.cte_to_relation.insert(
410 table_name,
411 Rc::new(RefCell::new(BindingCte {
412 share_id,
413 state: BindingCteState::Bound {
414 query: either::Either::Left(bound_query),
415 },
416 alias: alias.clone(),
417 })),
418 );
419 }
420 CteInner::ChangeLog(from_table_name) => {
421 self.push_context();
422 let from_table_relation =
423 self.bind_relation_by_name(from_table_name, None, None, true)?;
424 self.pop_context()?;
425 self.context.cte_to_relation.insert(
426 table_name,
427 Rc::new(RefCell::new(BindingCte {
428 share_id,
429 state: BindingCteState::ChangeLog {
430 table: from_table_relation,
431 },
432 alias: alias.clone(),
433 })),
434 );
435 }
436 }
437 }
438 }
439 Ok(())
440 }
441
442 fn validate_rcte(
444 query: &Query,
445 ) -> Result<(bool, &Corresponding, &SetExpr, &SetExpr, Option<&With>)> {
446 let Query {
447 with,
448 body,
449 order_by,
450 limit,
451 offset,
452 fetch,
453 } = query;
454
455 fn should_be_empty<T>(v: Option<T>, clause: &str) -> Result<()> {
457 if v.is_some() {
458 return Err(ErrorCode::BindError(format!(
459 "`{clause}` is not supported in recursive CTE"
460 ))
461 .into());
462 }
463 Ok(())
464 }
465
466 should_be_empty(order_by.first(), "ORDER BY")?;
467 should_be_empty(limit.as_ref(), "LIMIT")?;
468 should_be_empty(offset.as_ref(), "OFFSET")?;
469 should_be_empty(fetch.as_ref(), "FETCH")?;
470
471 let SetExpr::SetOperation {
472 op: SetOperator::Union,
473 all,
474 corresponding,
475 left,
476 right,
477 } = body
478 else {
479 return Err(
480 ErrorCode::BindError("`UNION` is required in recursive CTE".to_owned()).into(),
481 );
482 };
483
484 if !all {
485 return Err(ErrorCode::BindError(
486 "only `UNION ALL` is supported in recursive CTE now".to_owned(),
487 )
488 .into());
489 }
490
491 if corresponding.is_corresponding() {
492 return Err(ErrorCode::BindError(
493 "`CORRESPONDING` is not supported in recursive CTE".to_owned(),
494 )
495 .into());
496 }
497
498 Ok((*all, corresponding, left, right, with.as_ref()))
499 }
500
501 fn bind_rcte(
502 &mut self,
503 with: Option<&With>,
504 entry: Rc<RefCell<BindingCte>>,
505 left: &SetExpr,
506 right: &SetExpr,
507 all: bool,
508 ) -> Result<()> {
509 self.push_context();
510 let result = self.bind_rcte_inner(with, entry, left, right, all);
511 self.pop_context()?;
512 result
513 }
514
515 fn bind_rcte_inner(
516 &mut self,
517 with: Option<&With>,
518 entry: Rc<RefCell<BindingCte>>,
519 left: &SetExpr,
520 right: &SetExpr,
521 all: bool,
522 ) -> Result<()> {
523 if let Some(with) = with {
524 self.bind_with(with)?;
525 }
526
527 let mut base = self.bind_set_expr(left)?;
531
532 entry.borrow_mut().state = BindingCteState::BaseResolved { base: base.clone() };
533
534 let new_context = std::mem::take(&mut self.context);
536 self.context
537 .cte_to_relation
538 .clone_from(&new_context.cte_to_relation);
539 self.context.disable_security_invoker = new_context.disable_security_invoker;
540 let mut recursive = self.bind_set_expr(right)?;
542 self.context = Default::default();
544 self.context.cte_to_relation = new_context.cte_to_relation;
545 self.context.disable_security_invoker = new_context.disable_security_invoker;
546
547 Self::align_schema(&mut base, &mut recursive, SetOperator::Union)?;
548 let schema = base.schema().into_owned();
549
550 let recursive_union = RecursiveUnion {
551 all,
552 base: Box::new(base),
553 recursive: Box::new(recursive),
554 schema,
555 };
556
557 entry.borrow_mut().state = BindingCteState::Bound {
558 query: either::Either::Right(recursive_union),
559 };
560
561 Ok(())
562 }
563}
564
565fn parse_non_negative_i64(clause: &str, s: &str) -> Result<i64> {
567 match s.parse::<i64>() {
568 Ok(v) => {
569 if v < 0 {
570 Err(ErrorCode::InvalidInputSyntax(format!("{clause} must not be negative")).into())
571 } else {
572 Ok(v)
573 }
574 }
575 Err(e) => Err(ErrorCode::InvalidInputSyntax(e.to_report_string()).into()),
576 }
577}