1use std::collections::hash_map::Entry;
16use std::ops::Deref;
17
18use either::Either;
19use itertools::{EitherOrBoth, Itertools};
20use risingwave_common::bail;
21use risingwave_common::catalog::{Field, TableId};
22use risingwave_sqlparser::ast::{
23 AsOf, Expr as ParserExpr, FunctionArg, FunctionArgExpr, Ident, ObjectName, TableAlias,
24 TableFactor,
25};
26use thiserror::Error;
27use thiserror_ext::AsReport;
28
29use super::bind_context::ColumnBinding;
30use super::statement::RewriteExprsRecursive;
31use crate::binder::Binder;
32use crate::binder::bind_context::{BindingCte, BindingCteState};
33use crate::error::{ErrorCode, Result, RwError};
34use crate::expr::{ExprImpl, InputRef};
35
36mod cte_ref;
37mod join;
38mod share;
39mod subquery;
40mod table_function;
41mod table_or_source;
42mod watermark;
43mod window_table_function;
44
45pub use cte_ref::BoundBackCteRef;
46pub use join::BoundJoin;
47pub use share::{BoundShare, BoundShareInput};
48pub use subquery::BoundSubquery;
49pub use table_or_source::{BoundBaseTable, BoundSource, BoundSystemTable};
50pub use watermark::BoundWatermark;
51pub use window_table_function::{BoundWindowTableFunction, WindowTableFunctionKind};
52
53use crate::expr::{CorrelatedId, Depth};
54
55#[derive(Debug, Clone)]
58pub enum Relation {
59 Source(Box<BoundSource>),
60 BaseTable(Box<BoundBaseTable>),
61 SystemTable(Box<BoundSystemTable>),
62 Subquery(Box<BoundSubquery>),
63 Join(Box<BoundJoin>),
64 Apply(Box<BoundJoin>),
65 WindowTableFunction(Box<BoundWindowTableFunction>),
66 TableFunction {
68 expr: ExprImpl,
69 with_ordinality: bool,
70 },
71 Watermark(Box<BoundWatermark>),
72 Share(Box<BoundShare>),
74 BackCteRef(Box<BoundBackCteRef>),
75}
76
77impl RewriteExprsRecursive for Relation {
78 fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
79 match self {
80 Relation::Subquery(inner) => inner.rewrite_exprs_recursive(rewriter),
81 Relation::Join(inner) => inner.rewrite_exprs_recursive(rewriter),
82 Relation::Apply(inner) => inner.rewrite_exprs_recursive(rewriter),
83 Relation::WindowTableFunction(inner) => inner.rewrite_exprs_recursive(rewriter),
84 Relation::Watermark(inner) => inner.rewrite_exprs_recursive(rewriter),
85 Relation::Share(inner) => inner.rewrite_exprs_recursive(rewriter),
86 Relation::TableFunction { expr: inner, .. } => {
87 *inner = rewriter.rewrite_expr(inner.take())
88 }
89 Relation::BackCteRef(inner) => inner.rewrite_exprs_recursive(rewriter),
90 _ => {}
91 }
92 }
93}
94
95impl Relation {
96 pub fn is_correlated(&self, depth: Depth) -> bool {
97 match self {
98 Relation::Subquery(subquery) => subquery.query.is_correlated(depth),
99 Relation::Join(join) | Relation::Apply(join) => {
100 join.cond.has_correlated_input_ref_by_depth(depth)
101 || join.left.is_correlated(depth)
102 || join.right.is_correlated(depth)
103 }
104 _ => false,
105 }
106 }
107
108 pub fn collect_correlated_indices_by_depth_and_assign_id(
109 &mut self,
110 depth: Depth,
111 correlated_id: CorrelatedId,
112 ) -> Vec<usize> {
113 match self {
114 Relation::Subquery(subquery) => subquery
115 .query
116 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
117 Relation::Join(join) | Relation::Apply(join) => {
118 let mut correlated_indices = vec![];
119 correlated_indices.extend(
120 join.cond
121 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
122 );
123 correlated_indices.extend(
124 join.left
125 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
126 );
127 correlated_indices.extend(
128 join.right
129 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
130 );
131 correlated_indices
132 }
133 Relation::TableFunction {
134 expr: table_function,
135 with_ordinality: _,
136 } => table_function
137 .collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id),
138 Relation::Share(share) => match &mut share.input {
139 BoundShareInput::Query(query) => match query {
140 Either::Left(query) => query
141 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
142 Either::Right(_) => vec![],
143 },
144 BoundShareInput::ChangeLog(change_log) => change_log
145 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
146 },
147 _ => vec![],
148 }
149 }
150}
151
152#[derive(Debug)]
153#[non_exhaustive]
154pub enum ResolveQualifiedNameErrorKind {
155 QualifiedNameTooLong,
156 NotCurrentDatabase,
157}
158
159#[derive(Debug, Error)]
160pub struct ResolveQualifiedNameError {
161 qualified: String,
162 kind: ResolveQualifiedNameErrorKind,
163}
164
165impl std::fmt::Display for ResolveQualifiedNameError {
166 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167 match self.kind {
168 ResolveQualifiedNameErrorKind::QualifiedNameTooLong => write!(
169 f,
170 "improper qualified name (too many dotted names): {}",
171 self.qualified
172 ),
173 ResolveQualifiedNameErrorKind::NotCurrentDatabase => write!(
174 f,
175 "cross-database references are not implemented: \"{}\"",
176 self.qualified
177 ),
178 }
179 }
180}
181
182impl ResolveQualifiedNameError {
183 pub fn new(qualified: String, kind: ResolveQualifiedNameErrorKind) -> Self {
184 Self { qualified, kind }
185 }
186}
187
188impl From<ResolveQualifiedNameError> for RwError {
189 fn from(e: ResolveQualifiedNameError) -> Self {
190 ErrorCode::InvalidInputSyntax(format!("{}", e.as_report())).into()
191 }
192}
193
194impl Binder {
195 pub fn resolve_schema_qualified_name(
197 db_name: &str,
198 name: ObjectName,
199 ) -> std::result::Result<(Option<String>, String), ResolveQualifiedNameError> {
200 let formatted_name = name.to_string();
201 let mut identifiers = name.0;
202
203 if identifiers.len() > 3 {
204 return Err(ResolveQualifiedNameError::new(
205 formatted_name,
206 ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
207 ));
208 }
209
210 let name = identifiers.pop().unwrap().real_value();
211
212 let schema_name = identifiers.pop().map(|ident| ident.real_value());
213 let database_name = identifiers.pop().map(|ident| ident.real_value());
214
215 if let Some(database_name) = database_name
216 && database_name != db_name
217 {
218 return Err(ResolveQualifiedNameError::new(
219 formatted_name,
220 ResolveQualifiedNameErrorKind::NotCurrentDatabase,
221 ));
222 }
223
224 Ok((schema_name, name))
225 }
226
227 pub fn validate_cross_db_reference(
229 db_name: &str,
230 name: &ObjectName,
231 ) -> std::result::Result<(), ResolveQualifiedNameError> {
232 let formatted_name = name.to_string();
233 let identifiers = &name.0;
234 if identifiers.len() > 3 {
235 return Err(ResolveQualifiedNameError::new(
236 formatted_name,
237 ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
238 ));
239 }
240
241 if identifiers.len() == 3 && identifiers[0].real_value() != db_name {
242 return Err(ResolveQualifiedNameError::new(
243 formatted_name,
244 ResolveQualifiedNameErrorKind::NotCurrentDatabase,
245 ));
246 }
247
248 Ok(())
249 }
250
251 pub fn resolve_db_schema_qualified_name(
253 name: ObjectName,
254 ) -> std::result::Result<(Option<String>, Option<String>, String), ResolveQualifiedNameError>
255 {
256 let formatted_name = name.to_string();
257 let mut identifiers = name.0;
258
259 if identifiers.len() > 3 {
260 return Err(ResolveQualifiedNameError::new(
261 formatted_name,
262 ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
263 ));
264 }
265
266 let name = identifiers.pop().unwrap().real_value();
267 let schema_name = identifiers.pop().map(|ident| ident.real_value());
268 let database_name = identifiers.pop().map(|ident| ident.real_value());
269
270 Ok((database_name, schema_name, name))
271 }
272
273 fn resolve_single_name(mut identifiers: Vec<Ident>, ident_desc: &str) -> Result<String> {
275 if identifiers.len() > 1 {
276 bail!("{} must contain 1 argument", ident_desc);
277 }
278 let name = identifiers.pop().unwrap().real_value();
279
280 Ok(name)
281 }
282
283 pub fn resolve_database_name(name: ObjectName) -> Result<String> {
285 Self::resolve_single_name(name.0, "database name")
286 }
287
288 pub fn resolve_schema_name(name: ObjectName) -> Result<String> {
290 Self::resolve_single_name(name.0, "schema name")
291 }
292
293 pub fn resolve_index_name(name: ObjectName) -> Result<String> {
295 Self::resolve_single_name(name.0, "index name")
296 }
297
298 pub fn resolve_view_name(name: ObjectName) -> Result<String> {
300 Self::resolve_single_name(name.0, "view name")
301 }
302
303 pub fn resolve_sink_name(name: ObjectName) -> Result<String> {
305 Self::resolve_single_name(name.0, "sink name")
306 }
307
308 pub fn resolve_subscription_name(name: ObjectName) -> Result<String> {
310 Self::resolve_single_name(name.0, "subscription name")
311 }
312
313 pub fn resolve_table_name(name: ObjectName) -> Result<String> {
315 Self::resolve_single_name(name.0, "table name")
316 }
317
318 pub fn resolve_source_name(name: ObjectName) -> Result<String> {
320 Self::resolve_single_name(name.0, "source name")
321 }
322
323 pub fn resolve_user_name(name: ObjectName) -> Result<String> {
325 Self::resolve_single_name(name.0, "user name")
326 }
327
328 pub(super) fn bind_table_to_context(
330 &mut self,
331 columns: impl IntoIterator<Item = (bool, Field)>, table_name: String,
333 alias: Option<TableAlias>,
334 ) -> Result<()> {
335 let (table_name, column_aliases) = match alias {
336 None => (table_name, vec![]),
337 Some(TableAlias { name, columns }) => (name.real_value(), columns),
338 };
339
340 let num_col_aliases = column_aliases.len();
341
342 let begin = self.context.columns.len();
343 let mut alias_iter = column_aliases.into_iter().fuse();
346 let mut index = 0;
347 columns.into_iter().for_each(|(is_hidden, mut field)| {
348 let name = match is_hidden {
349 true => field.name.clone(),
350 false => alias_iter
351 .next()
352 .map(|t| t.real_value())
353 .unwrap_or_else(|| field.name.clone()),
354 };
355 field.name.clone_from(&name);
356 self.context.columns.push(ColumnBinding::new(
357 table_name.clone(),
358 begin + index,
359 is_hidden,
360 field,
361 ));
362 self.context
363 .indices_of
364 .entry(name)
365 .or_default()
366 .push(self.context.columns.len() - 1);
367 index += 1;
368 });
369
370 let num_cols = index;
371 if num_cols < num_col_aliases {
372 return Err(ErrorCode::BindError(format!(
373 "table \"{table_name}\" has {num_cols} columns available but {num_col_aliases} column aliases specified",
374 ))
375 .into());
376 }
377
378 match self.context.range_of.entry(table_name.clone()) {
379 Entry::Occupied(_) => Err(ErrorCode::InternalError(format!(
380 "Duplicated table name while binding table to context: {}",
381 table_name
382 ))
383 .into()),
384 Entry::Vacant(entry) => {
385 entry.insert((begin, self.context.columns.len()));
386 Ok(())
387 }
388 }
389 }
390
391 pub fn bind_relation_by_name(
396 &mut self,
397 name: ObjectName,
398 alias: Option<TableAlias>,
399 as_of: Option<AsOf>,
400 allow_cross_db: bool,
401 ) -> Result<Relation> {
402 let (db_name, schema_name, table_name) = if allow_cross_db {
403 Self::resolve_db_schema_qualified_name(name)?
404 } else {
405 let (schema_name, table_name) =
406 Self::resolve_schema_qualified_name(&self.db_name, name)?;
407 (None, schema_name, table_name)
408 };
409
410 if schema_name.is_none()
411 && let Some(item) = self.context.cte_to_relation.get(&table_name)
413 {
414 if as_of.is_some() {
417 return Err(ErrorCode::BindError(
418 "Right table of a temporal join should not be a CTE. \
419 It should be a table, index, or materialized view"
420 .to_owned(),
421 )
422 .into());
423 }
424
425 let BindingCte {
426 share_id,
427 state: cte_state,
428 alias: mut original_alias,
429 } = item.deref().borrow().clone();
430
431 debug_assert_eq!(original_alias.name.real_value(), table_name);
433
434 if let Some(from_alias) = alias {
435 original_alias.name = from_alias.name;
436 original_alias.columns = original_alias
437 .columns
438 .into_iter()
439 .zip_longest(from_alias.columns)
440 .map(EitherOrBoth::into_right)
441 .collect();
442 }
443
444 match cte_state {
445 BindingCteState::Init => {
446 Err(ErrorCode::BindError("Base term of recursive CTE not found, consider writing it to left side of the `UNION ALL` operator".to_owned()).into())
447 }
448 BindingCteState::BaseResolved { base } => {
449 self.bind_table_to_context(
450 base.schema().fields.iter().map(|f| (false, f.clone())),
451 table_name.clone(),
452 Some(original_alias),
453 )?;
454 Ok(Relation::BackCteRef(Box::new(BoundBackCteRef { share_id, base })))
455 }
456 BindingCteState::Bound { query } => {
457 let input = BoundShareInput::Query(query);
458 self.bind_table_to_context(
459 input.fields()?,
460 table_name.clone(),
461 Some(original_alias),
462 )?;
463 Ok(Relation::Share(Box::new(BoundShare { share_id, input})))
466 }
467 BindingCteState::ChangeLog { table } => {
468 let input = BoundShareInput::ChangeLog(table);
469 self.bind_table_to_context(
470 input.fields()?,
471 table_name.clone(),
472 Some(original_alias),
473 )?;
474 Ok(Relation::Share(Box::new(BoundShare { share_id, input })))
475 },
476 }
477 } else {
478 self.bind_relation_by_name_inner(
479 db_name.as_deref(),
480 schema_name.as_deref(),
481 &table_name,
482 alias,
483 as_of,
484 )
485 }
486 }
487
488 fn bind_relation_by_function_arg(
490 &mut self,
491 arg: Option<FunctionArg>,
492 err_msg: &str,
493 ) -> Result<(Relation, ObjectName)> {
494 let Some(FunctionArg::Unnamed(FunctionArgExpr::Expr(expr))) = arg else {
495 return Err(ErrorCode::BindError(err_msg.to_owned()).into());
496 };
497 let table_name = match expr {
498 ParserExpr::Identifier(ident) => Ok::<_, RwError>(ObjectName(vec![ident])),
499 ParserExpr::CompoundIdentifier(idents) => Ok(ObjectName(idents)),
500 _ => Err(ErrorCode::BindError(err_msg.to_owned()).into()),
501 }?;
502
503 Ok((
504 self.bind_relation_by_name(table_name.clone(), None, None, true)?,
505 table_name,
506 ))
507 }
508
509 fn bind_column_by_function_args(
511 &mut self,
512 arg: Option<FunctionArg>,
513 err_msg: &str,
514 ) -> Result<Box<InputRef>> {
515 if let Some(time_col_arg) = arg
516 && let Some(ExprImpl::InputRef(time_col)) =
517 self.bind_function_arg(time_col_arg)?.into_iter().next()
518 {
519 Ok(time_col)
520 } else {
521 Err(ErrorCode::BindError(err_msg.to_owned()).into())
522 }
523 }
524
525 fn bind_internal_table(
527 &mut self,
528 args: Vec<FunctionArg>,
529 alias: Option<TableAlias>,
530 ) -> Result<Relation> {
531 if args.is_empty() || args.len() > 2 {
532 return Err(
533 ErrorCode::BindError("usage: rw_table(table_id[,schema_name])".to_owned()).into(),
534 );
535 }
536
537 let table_id: TableId = args[0]
538 .to_string()
539 .parse::<u32>()
540 .map_err(|err| {
541 RwError::from(ErrorCode::BindError(format!(
542 "invalid table id: {}",
543 err.as_report()
544 )))
545 })?
546 .into();
547
548 let schema = args.get(1).map(|arg| arg.to_string());
549
550 let table_name = self.catalog.get_table_name_by_id(table_id)?;
551 self.bind_relation_by_name_inner(None, schema.as_deref(), &table_name, alias, None)
552 }
553
554 pub(super) fn bind_table_factor(&mut self, table_factor: TableFactor) -> Result<Relation> {
555 match table_factor {
556 TableFactor::Table { name, alias, as_of } => {
557 self.bind_relation_by_name(name, alias, as_of, true)
558 }
559 TableFactor::TableFunction {
560 name,
561 alias,
562 args,
563 with_ordinality,
564 } => {
565 self.try_mark_lateral_as_visible();
566 let result = self.bind_table_function(name, alias, args, with_ordinality);
567 self.try_mark_lateral_as_invisible();
568 result
569 }
570 TableFactor::Derived {
571 lateral,
572 subquery,
573 alias,
574 } => {
575 if lateral {
576 self.try_mark_lateral_as_visible();
578
579 let bound_subquery = self.bind_subquery_relation(*subquery, alias, true)?;
581
582 self.try_mark_lateral_as_invisible();
584 Ok(Relation::Subquery(Box::new(bound_subquery)))
585 } else {
586 self.push_lateral_context();
588 let bound_subquery = self.bind_subquery_relation(*subquery, alias, false)?;
589 self.pop_and_merge_lateral_context()?;
590 Ok(Relation::Subquery(Box::new(bound_subquery)))
591 }
592 }
593 TableFactor::NestedJoin(table_with_joins) => {
594 self.push_lateral_context();
595 let bound_join = self.bind_table_with_joins(*table_with_joins)?;
596 self.pop_and_merge_lateral_context()?;
597 Ok(bound_join)
598 }
599 }
600 }
601}