1use std::cell::RefCell;
16use std::collections::HashMap;
17use std::rc::Rc;
18
19use risingwave_common::catalog::Schema;
20use risingwave_common::types::DataType;
21use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
22use risingwave_sqlparser::ast::{
23 Cte, CteInner, Expr, Fetch, OrderByExpr, Query, SetExpr, SetOperator, Value, With,
24};
25use thiserror_ext::AsReport;
26
27use super::BoundValues;
28use super::bind_context::BindingCteState;
29use super::statement::RewriteExprsRecursive;
30use crate::binder::bind_context::{BindingCte, RecursiveUnion};
31use crate::binder::{Binder, BoundSetExpr};
32use crate::error::{ErrorCode, Result, RwError};
33use crate::expr::{CorrelatedId, Depth, ExprImpl, ExprRewriter};
34
35#[derive(Debug, Clone)]
38pub struct BoundQuery {
39 pub body: BoundSetExpr,
40 pub order: Vec<ColumnOrder>,
41 pub limit: Option<u64>,
42 pub offset: Option<u64>,
43 pub with_ties: bool,
44 pub extra_order_exprs: Vec<ExprImpl>,
45}
46
47impl BoundQuery {
48 pub fn schema(&self) -> std::borrow::Cow<'_, Schema> {
50 self.body.schema()
51 }
52
53 pub fn data_types(&self) -> Vec<DataType> {
55 self.schema().data_types()
56 }
57
58 pub fn is_correlated(&self, depth: Depth) -> bool {
89 self.body.is_correlated(depth + 1)
90 || self
91 .extra_order_exprs
92 .iter()
93 .any(|e| e.has_correlated_input_ref_by_depth(depth + 1))
94 }
95
96 pub fn collect_correlated_indices_by_depth_and_assign_id(
97 &mut self,
98 depth: Depth,
99 correlated_id: CorrelatedId,
100 ) -> Vec<usize> {
101 let mut correlated_indices = vec![];
102
103 correlated_indices.extend(
104 self.body
105 .collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id),
106 );
107
108 correlated_indices.extend(self.extra_order_exprs.iter_mut().flat_map(|expr| {
109 expr.collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id)
110 }));
111 correlated_indices
112 }
113
114 pub fn with_values(values: BoundValues) -> Self {
116 BoundQuery {
117 body: BoundSetExpr::Values(values.into()),
118 order: vec![],
119 limit: None,
120 offset: None,
121 with_ties: false,
122 extra_order_exprs: vec![],
123 }
124 }
125}
126
127impl RewriteExprsRecursive for BoundQuery {
128 fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl ExprRewriter) {
129 let new_extra_order_exprs = std::mem::take(&mut self.extra_order_exprs)
130 .into_iter()
131 .map(|expr| rewriter.rewrite_expr(expr))
132 .collect::<Vec<_>>();
133 self.extra_order_exprs = new_extra_order_exprs;
134
135 self.body.rewrite_exprs_recursive(rewriter);
136 }
137}
138
139impl Binder {
140 pub fn bind_query(&mut self, query: Query) -> Result<BoundQuery> {
147 self.push_context();
148 let result = self.bind_query_inner(query);
149 self.pop_context()?;
150 result
151 }
152
153 pub fn bind_query_for_view(&mut self, query: Query) -> Result<BoundQuery> {
156 self.push_context();
157 self.context.disable_security_invoker = true;
158 let result = self.bind_query_inner(query);
159 self.pop_context()?;
160 result
161 }
162
163 pub(super) fn bind_query_inner(
165 &mut self,
166 Query {
167 with,
168 body,
169 order_by,
170 limit,
171 offset,
172 fetch,
173 }: Query,
174 ) -> Result<BoundQuery> {
175 let mut with_ties = false;
176 let limit = match (limit, fetch) {
177 (None, None) => None,
178 (
179 None,
180 Some(Fetch {
181 with_ties: fetch_with_ties,
182 quantity,
183 }),
184 ) => {
185 with_ties = fetch_with_ties;
186 match quantity {
187 Some(v) => Some(Expr::Value(Value::Number(v))),
188 None => Some(Expr::Value(Value::Number("1".to_owned()))),
189 }
190 }
191 (Some(limit), None) => Some(limit),
192 (Some(_), Some(_)) => unreachable!(), };
194 let limit_expr = limit.map(|expr| self.bind_expr(expr)).transpose()?;
195 let limit = if let Some(limit_expr) = limit_expr {
196 let limit_cast_to_bigint = limit_expr.cast_assign(DataType::Int64).map_err(|_| {
198 RwError::from(ErrorCode::ExprError(
199 "expects an integer or expression that can be evaluated to an integer after LIMIT"
200 .into(),
201 ))
202 })?;
203 let limit = match limit_cast_to_bigint.try_fold_const() {
204 Some(Ok(Some(datum))) => {
205 let value = datum.as_int64();
206 if *value < 0 {
207 return Err(ErrorCode::ExprError(
208 format!("LIMIT must not be negative, but found: {}", *value).into(),
209 )
210 .into());
211 }
212 *value as u64
213 }
214 Some(Ok(None)) => {
216 u64::MAX
217 }
218 None => return Err(ErrorCode::ExprError(
220 "expects an integer or expression that can be evaluated to an integer after LIMIT, but found non-const expression"
221 .into(),
222 ).into()),
223 Some(Err(e)) => {
225 return Err(ErrorCode::ExprError(
226 format!("expects an integer or expression that can be evaluated to an integer after LIMIT,\nbut the evaluation of the expression returns error:{}", e.as_report()
227 ).into(),
228 ).into())
229 }
230 };
231 Some(limit)
232 } else {
233 None
234 };
235
236 let offset = offset
237 .map(|s| parse_non_negative_i64("OFFSET", &s))
238 .transpose()?
239 .map(|v| v as u64);
240
241 if let Some(with) = with {
242 self.bind_with(with)?;
243 }
244 let body = self.bind_set_expr(body)?;
245 let name_to_index =
246 Self::build_name_to_index(body.schema().fields().iter().map(|f| f.name.clone()));
247 let mut extra_order_exprs = vec![];
248 let visible_output_num = body.schema().len();
249 let order = order_by
250 .into_iter()
251 .map(|order_by_expr| {
252 self.bind_order_by_expr_in_query(
253 order_by_expr,
254 &name_to_index,
255 &mut extra_order_exprs,
256 visible_output_num,
257 )
258 })
259 .collect::<Result<_>>()?;
260 Ok(BoundQuery {
261 body,
262 order,
263 limit,
264 offset,
265 with_ties,
266 extra_order_exprs,
267 })
268 }
269
270 pub fn build_name_to_index(names: impl Iterator<Item = String>) -> HashMap<String, usize> {
271 let mut m = HashMap::new();
272 names.enumerate().for_each(|(index, name)| {
273 m.entry(name)
274 .and_modify(|v| *v = usize::MAX)
277 .or_insert(index);
278 });
279 m
280 }
281
282 fn bind_order_by_expr_in_query(
295 &mut self,
296 OrderByExpr {
297 expr,
298 asc,
299 nulls_first,
300 }: OrderByExpr,
301 name_to_index: &HashMap<String, usize>,
302 extra_order_exprs: &mut Vec<ExprImpl>,
303 visible_output_num: usize,
304 ) -> Result<ColumnOrder> {
305 let order_type = OrderType::from_bools(asc, nulls_first);
306 let column_index = match expr {
307 Expr::Identifier(name) if let Some(index) = name_to_index.get(&name.real_value()) => {
308 match *index != usize::MAX {
309 true => *index,
310 false => {
311 return Err(ErrorCode::BindError(format!(
312 "ORDER BY \"{}\" is ambiguous",
313 name.real_value()
314 ))
315 .into());
316 }
317 }
318 }
319 Expr::Value(Value::Number(number)) => match number.parse::<usize>() {
320 Ok(index) if 1 <= index && index <= visible_output_num => index - 1,
321 _ => {
322 return Err(ErrorCode::InvalidInputSyntax(format!(
323 "Invalid ordinal number in ORDER BY: {}",
324 number
325 ))
326 .into());
327 }
328 },
329 expr => {
330 extra_order_exprs.push(self.bind_expr(expr)?);
331 visible_output_num + extra_order_exprs.len() - 1
332 }
333 };
334 Ok(ColumnOrder::new(column_index, order_type))
335 }
336
337 fn bind_with(&mut self, with: With) -> Result<()> {
338 for cte_table in with.cte_tables {
339 let share_id = self.next_share_id();
341 let Cte { alias, cte_inner } = cte_table;
342 let table_name = alias.name.real_value();
343
344 if with.recursive {
345 if let CteInner::Query(query) = cte_inner {
346 let (
347 SetExpr::SetOperation {
348 op: SetOperator::Union,
349 all,
350 corresponding,
351 left,
352 right,
353 },
354 with,
355 ) = Self::validate_rcte(*query)?
356 else {
357 return Err(ErrorCode::BindError(
358 "expect `SetOperation` as the return type of validation".into(),
359 )
360 .into());
361 };
362
363 assert!(
365 !corresponding.is_corresponding(),
366 "`CORRESPONDING` is not supported in recursive CTE"
367 );
368
369 let entry = self
370 .context
371 .cte_to_relation
372 .entry(table_name)
373 .insert_entry(Rc::new(RefCell::new(BindingCte {
374 share_id,
375 state: BindingCteState::Init,
376 alias,
377 })))
378 .get()
379 .clone();
380
381 self.bind_rcte(with, entry, *left, *right, all)?;
382 } else {
383 return Err(ErrorCode::BindError(
384 "RECURSIVE CTE only support query".to_owned(),
385 )
386 .into());
387 }
388 } else {
389 match cte_inner {
390 CteInner::Query(query) => {
391 let bound_query = self.bind_query(*query)?;
392 self.context.cte_to_relation.insert(
393 table_name,
394 Rc::new(RefCell::new(BindingCte {
395 share_id,
396 state: BindingCteState::Bound {
397 query: either::Either::Left(bound_query),
398 },
399 alias,
400 })),
401 );
402 }
403 CteInner::ChangeLog(from_table_name) => {
404 self.push_context();
405 let from_table_relation =
406 self.bind_relation_by_name(from_table_name.clone(), None, None, true)?;
407 self.pop_context()?;
408 self.context.cte_to_relation.insert(
409 table_name,
410 Rc::new(RefCell::new(BindingCte {
411 share_id,
412 state: BindingCteState::ChangeLog {
413 table: from_table_relation,
414 },
415 alias,
416 })),
417 );
418 }
419 }
420 }
421 }
422 Ok(())
423 }
424
425 fn validate_rcte(query: Query) -> Result<(SetExpr, Option<With>)> {
427 let Query {
428 with,
429 body,
430 order_by,
431 limit,
432 offset,
433 fetch,
434 } = query;
435
436 fn should_be_empty<T>(v: Option<T>, clause: &str) -> Result<()> {
438 if v.is_some() {
439 return Err(ErrorCode::BindError(format!(
440 "`{clause}` is not supported in recursive CTE"
441 ))
442 .into());
443 }
444 Ok(())
445 }
446
447 should_be_empty(order_by.first(), "ORDER BY")?;
448 should_be_empty(limit, "LIMIT")?;
449 should_be_empty(offset, "OFFSET")?;
450 should_be_empty(fetch, "FETCH")?;
451
452 let SetExpr::SetOperation {
453 op: SetOperator::Union,
454 all,
455 corresponding,
456 left,
457 right,
458 } = body
459 else {
460 return Err(
461 ErrorCode::BindError("`UNION` is required in recursive CTE".to_owned()).into(),
462 );
463 };
464
465 if !all {
466 return Err(ErrorCode::BindError(
467 "only `UNION ALL` is supported in recursive CTE now".to_owned(),
468 )
469 .into());
470 }
471
472 if corresponding.is_corresponding() {
473 return Err(ErrorCode::BindError(
474 "`CORRESPONDING` is not supported in recursive CTE".to_owned(),
475 )
476 .into());
477 }
478
479 Ok((
480 SetExpr::SetOperation {
481 op: SetOperator::Union,
482 all,
483 corresponding,
484 left,
485 right,
486 },
487 with,
488 ))
489 }
490
491 fn bind_rcte(
492 &mut self,
493 with: Option<With>,
494 entry: Rc<RefCell<BindingCte>>,
495 left: SetExpr,
496 right: SetExpr,
497 all: bool,
498 ) -> Result<()> {
499 self.push_context();
500 let result = self.bind_rcte_inner(with, entry, left, right, all);
501 self.pop_context()?;
502 result
503 }
504
505 fn bind_rcte_inner(
506 &mut self,
507 with: Option<With>,
508 entry: Rc<RefCell<BindingCte>>,
509 left: SetExpr,
510 right: SetExpr,
511 all: bool,
512 ) -> Result<()> {
513 if let Some(with) = with {
514 self.bind_with(with)?;
515 }
516
517 let mut base = self.bind_set_expr(left)?;
521
522 entry.borrow_mut().state = BindingCteState::BaseResolved { base: base.clone() };
523
524 let new_context = std::mem::take(&mut self.context);
526 self.context
527 .cte_to_relation
528 .clone_from(&new_context.cte_to_relation);
529 self.context.disable_security_invoker = new_context.disable_security_invoker;
530 let mut recursive = self.bind_set_expr(right)?;
532 self.context = Default::default();
534 self.context.cte_to_relation = new_context.cte_to_relation;
535 self.context.disable_security_invoker = new_context.disable_security_invoker;
536
537 Self::align_schema(&mut base, &mut recursive, SetOperator::Union)?;
538 let schema = base.schema().into_owned();
539
540 let recursive_union = RecursiveUnion {
541 all,
542 base: Box::new(base),
543 recursive: Box::new(recursive),
544 schema,
545 };
546
547 entry.borrow_mut().state = BindingCteState::Bound {
548 query: either::Either::Right(recursive_union),
549 };
550
551 Ok(())
552 }
553}
554
555fn parse_non_negative_i64(clause: &str, s: &str) -> Result<i64> {
557 match s.parse::<i64>() {
558 Ok(v) => {
559 if v < 0 {
560 Err(ErrorCode::InvalidInputSyntax(format!("{clause} must not be negative")).into())
561 } else {
562 Ok(v)
563 }
564 }
565 Err(e) => Err(ErrorCode::InvalidInputSyntax(e.to_report_string()).into()),
566 }
567}