risingwave_frontend/binder/
query.rs1use std::cell::RefCell;
16use std::collections::HashMap;
17use std::rc::Rc;
18
19use risingwave_common::catalog::Schema;
20use risingwave_common::types::DataType;
21use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
22use risingwave_sqlparser::ast::{Cte, CteInner, Expr, Fetch, OrderByExpr, Query, Value, With};
23use thiserror_ext::AsReport;
24
25use super::BoundValues;
26use super::bind_context::BindingCteState;
27use super::statement::RewriteExprsRecursive;
28use crate::binder::bind_context::BindingCte;
29use crate::binder::{Binder, BoundSetExpr};
30use crate::error::{ErrorCode, Result, RwError};
31use crate::expr::{CorrelatedId, Depth, ExprImpl, ExprRewriter};
32
33#[derive(Debug, Clone)]
36pub struct BoundQuery {
37 pub body: BoundSetExpr,
38 pub order: Vec<ColumnOrder>,
39 pub limit: Option<u64>,
40 pub offset: Option<u64>,
41 pub with_ties: bool,
42 pub extra_order_exprs: Vec<ExprImpl>,
43}
44
45impl BoundQuery {
46 pub fn schema(&self) -> std::borrow::Cow<'_, Schema> {
48 self.body.schema()
49 }
50
51 pub fn data_types(&self) -> Vec<DataType> {
53 self.schema().data_types()
54 }
55
56 pub fn is_correlated_by_depth(&self, depth: Depth) -> bool {
87 self.body.is_correlated_by_depth(depth + 1)
88 || self
89 .extra_order_exprs
90 .iter()
91 .any(|e| e.has_correlated_input_ref_by_depth(depth + 1))
92 }
93
94 pub fn is_correlated_by_correlated_id(&self, correlated_id: CorrelatedId) -> bool {
95 self.body.is_correlated_by_correlated_id(correlated_id)
96 || self
97 .extra_order_exprs
98 .iter()
99 .any(|e| e.has_correlated_input_ref_by_correlated_id(correlated_id))
100 }
101
102 pub fn collect_correlated_indices_by_depth_and_assign_id(
103 &mut self,
104 depth: Depth,
105 correlated_id: CorrelatedId,
106 ) -> Vec<usize> {
107 let mut correlated_indices = vec![];
108
109 correlated_indices.extend(
110 self.body
111 .collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id),
112 );
113
114 correlated_indices.extend(self.extra_order_exprs.iter_mut().flat_map(|expr| {
115 expr.collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id)
116 }));
117 correlated_indices
118 }
119
120 pub fn with_values(values: BoundValues) -> Self {
122 BoundQuery {
123 body: BoundSetExpr::Values(values.into()),
124 order: vec![],
125 limit: None,
126 offset: None,
127 with_ties: false,
128 extra_order_exprs: vec![],
129 }
130 }
131}
132
133impl RewriteExprsRecursive for BoundQuery {
134 fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl ExprRewriter) {
135 let new_extra_order_exprs = std::mem::take(&mut self.extra_order_exprs)
136 .into_iter()
137 .map(|expr| rewriter.rewrite_expr(expr))
138 .collect::<Vec<_>>();
139 self.extra_order_exprs = new_extra_order_exprs;
140
141 self.body.rewrite_exprs_recursive(rewriter);
142 }
143}
144
145impl Binder {
146 pub fn bind_query(&mut self, query: &Query) -> Result<BoundQuery> {
153 self.push_context();
154 let result = self.bind_query_inner(query);
155 self.pop_context()?;
156 result
157 }
158
159 pub fn bind_query_for_view(&mut self, query: &Query) -> Result<BoundQuery> {
162 self.push_context();
163 self.context.disable_security_invoker = true;
164 let result = self.bind_query_inner(query);
165 self.pop_context()?;
166 result
167 }
168
169 pub(super) fn bind_query_inner(
171 &mut self,
172 Query {
173 with,
174 body,
175 order_by,
176 limit,
177 offset,
178 fetch,
179 }: &Query,
180 ) -> Result<BoundQuery> {
181 let mut with_ties = false;
182 let limit = match (limit, fetch) {
183 (None, None) => None,
184 (
185 None,
186 Some(Fetch {
187 with_ties: fetch_with_ties,
188 quantity,
189 }),
190 ) => {
191 with_ties = *fetch_with_ties;
192 match quantity {
193 Some(v) => Some(Expr::Value(Value::Number(v.clone()))),
194 None => Some(Expr::Value(Value::Number("1".to_owned()))),
195 }
196 }
197 (Some(limit), None) => Some(limit.clone()),
198 (Some(_), Some(_)) => unreachable!(), };
200 let limit_expr = limit.map(|expr| self.bind_expr(&expr)).transpose()?;
201 let limit = if let Some(limit_expr) = limit_expr {
202 let limit_cast_to_bigint = limit_expr.cast_assign(&DataType::Int64).map_err(|_| {
204 RwError::from(ErrorCode::ExprError(
205 "expects an integer or expression that can be evaluated to an integer after LIMIT"
206 .into(),
207 ))
208 })?;
209 let limit = match limit_cast_to_bigint.try_fold_const() {
210 Some(Ok(Some(datum))) => {
211 let value = datum.as_int64();
212 if *value < 0 {
213 return Err(ErrorCode::ExprError(
214 format!("LIMIT must not be negative, but found: {}", *value).into(),
215 )
216 .into());
217 }
218 *value as u64
219 }
220 Some(Ok(None)) => {
222 u64::MAX
223 }
224 None => return Err(ErrorCode::ExprError(
226 "expects an integer or expression that can be evaluated to an integer after LIMIT, but found non-const expression"
227 .into(),
228 ).into()),
229 Some(Err(e)) => {
231 return Err(ErrorCode::ExprError(
232 format!("expects an integer or expression that can be evaluated to an integer after LIMIT,\nbut the evaluation of the expression returns error:{}", e.as_report()
233 ).into(),
234 ).into())
235 }
236 };
237 Some(limit)
238 } else {
239 None
240 };
241
242 let offset = offset
243 .as_ref()
244 .map(|s| parse_non_negative_i64("OFFSET", s))
245 .transpose()?
246 .map(|v| v as u64);
247
248 if let Some(with) = with {
249 self.bind_with(with)?;
250 }
251 let body = self.bind_set_expr(body)?;
252 let name_to_index =
253 Self::build_name_to_index(body.schema().fields().iter().map(|f| f.name.clone()));
254 let mut extra_order_exprs = vec![];
255 let visible_output_num = body.schema().len();
256 let order = order_by
257 .iter()
258 .map(|order_by_expr| {
259 self.bind_order_by_expr_in_query(
260 order_by_expr,
261 &body,
262 &name_to_index,
263 &mut extra_order_exprs,
264 visible_output_num,
265 )
266 })
267 .collect::<Result<_>>()?;
268 Ok(BoundQuery {
269 body,
270 order,
271 limit,
272 offset,
273 with_ties,
274 extra_order_exprs,
275 })
276 }
277
278 pub fn build_name_to_index(names: impl Iterator<Item = String>) -> HashMap<String, usize> {
279 let mut m = HashMap::new();
280 names.enumerate().for_each(|(index, name)| {
281 m.entry(name)
282 .and_modify(|v| *v = usize::MAX)
285 .or_insert(index);
286 });
287 m
288 }
289
290 fn bind_order_by_expr_in_query(
303 &mut self,
304 OrderByExpr {
305 expr,
306 asc,
307 nulls_first,
308 }: &OrderByExpr,
309 body: &BoundSetExpr,
310 name_to_index: &HashMap<String, usize>,
311 extra_order_exprs: &mut Vec<ExprImpl>,
312 visible_output_num: usize,
313 ) -> Result<ColumnOrder> {
314 let order_type = OrderType::from_bools(*asc, *nulls_first);
315
316 let select_items_for_match = match body {
322 BoundSetExpr::Select(s) => Some(&s.select_items[..]),
323 _ => None,
324 };
325
326 let column_index = match expr {
327 Expr::Identifier(name) if let Some(index) = name_to_index.get(&name.real_value()) => {
328 match *index != usize::MAX {
329 true => *index,
330 false => {
331 return Err(ErrorCode::BindError(format!(
332 "ORDER BY \"{}\" is ambiguous",
333 name.real_value()
334 ))
335 .into());
336 }
337 }
338 }
339 Expr::Value(Value::Number(number)) => match number.parse::<usize>() {
340 Ok(index) if 1 <= index && index <= visible_output_num => index - 1,
341 _ => {
342 return Err(ErrorCode::InvalidInputSyntax(format!(
343 "Invalid ordinal number in ORDER BY: {}",
344 number
345 ))
346 .into());
347 }
348 },
349 expr => {
350 let bound_expr = self.bind_expr(expr)?;
351
352 if bound_expr.is_pure()
353 && let Some(select_items) = select_items_for_match
354 && let Some(existing_idx) = select_items.iter().position(|e| e == &bound_expr)
355 {
356 existing_idx
357 } else {
358 extra_order_exprs.push(bound_expr);
359 visible_output_num + extra_order_exprs.len() - 1
360 }
361 }
362 };
363 Ok(ColumnOrder::new(column_index, order_type))
364 }
365
366 fn bind_with(&mut self, with: &With) -> Result<()> {
367 if with.recursive {
368 return Err(ErrorCode::BindError("RECURSIVE CTE is not supported".to_owned()).into());
369 }
370
371 for cte_table in &with.cte_tables {
372 let share_id = self.next_share_id();
373 let Cte { alias, cte_inner } = cte_table;
374 let table_name = alias.name.real_value();
375
376 match cte_inner {
377 CteInner::Query(query) => {
378 let bound_query = self.bind_query(query)?;
379 self.context.cte_to_relation.insert(
380 table_name,
381 Rc::new(RefCell::new(BindingCte {
382 share_id,
383 state: BindingCteState::Bound { query: bound_query },
384 alias: alias.clone(),
385 })),
386 );
387 }
388 CteInner::ChangeLog(from_table_name) => {
389 self.push_context();
390 let from_table_relation =
391 self.bind_relation_by_name(from_table_name, None, None, true)?;
392 self.pop_context()?;
393 self.context.cte_to_relation.insert(
394 table_name,
395 Rc::new(RefCell::new(BindingCte {
396 share_id,
397 state: BindingCteState::ChangeLog {
398 table: from_table_relation,
399 },
400 alias: alias.clone(),
401 })),
402 );
403 }
404 }
405 }
406 Ok(())
407 }
408}
409
410fn parse_non_negative_i64(clause: &str, s: &str) -> Result<i64> {
412 match s.parse::<i64>() {
413 Ok(v) => {
414 if v < 0 {
415 Err(ErrorCode::InvalidInputSyntax(format!("{clause} must not be negative")).into())
416 } else {
417 Ok(v)
418 }
419 }
420 Err(e) => Err(ErrorCode::InvalidInputSyntax(e.to_report_string()).into()),
421 }
422}