1use std::collections::hash_map::Entry;
16use std::ops::Deref;
17
18use either::Either;
19use itertools::{EitherOrBoth, Itertools};
20use risingwave_common::bail;
21use risingwave_common::catalog::{Field, TableId};
22use risingwave_sqlparser::ast::{
23 AsOf, Expr as ParserExpr, FunctionArg, FunctionArgExpr, Ident, ObjectName, TableAlias,
24 TableFactor,
25};
26use thiserror::Error;
27use thiserror_ext::AsReport;
28
29use super::bind_context::ColumnBinding;
30use super::statement::RewriteExprsRecursive;
31use crate::binder::Binder;
32use crate::binder::bind_context::{BindingCte, BindingCteState};
33use crate::error::{ErrorCode, Result, RwError};
34use crate::expr::{ExprImpl, InputRef};
35
36mod cte_ref;
37mod gap_fill;
38mod join;
39mod share;
40mod subquery;
41mod table_function;
42mod table_or_source;
43mod watermark;
44mod window_table_function;
45
46pub use cte_ref::BoundBackCteRef;
47pub use gap_fill::BoundGapFill;
48pub use join::BoundJoin;
49pub use share::{BoundShare, BoundShareInput};
50pub use subquery::BoundSubquery;
51pub use table_or_source::{BoundBaseTable, BoundSource, BoundSystemTable};
52pub use watermark::BoundWatermark;
53pub use window_table_function::{BoundWindowTableFunction, WindowTableFunctionKind};
54
55use crate::expr::{CorrelatedId, Depth};
56
57#[derive(Debug, Clone)]
60pub enum Relation {
61 Source(Box<BoundSource>),
62 BaseTable(Box<BoundBaseTable>),
63 SystemTable(Box<BoundSystemTable>),
64 Subquery(Box<BoundSubquery>),
65 Join(Box<BoundJoin>),
66 Apply(Box<BoundJoin>),
67 WindowTableFunction(Box<BoundWindowTableFunction>),
68 TableFunction {
70 expr: ExprImpl,
71 with_ordinality: bool,
72 },
73 Watermark(Box<BoundWatermark>),
74 Share(Box<BoundShare>),
76 BackCteRef(Box<BoundBackCteRef>),
77 GapFill(Box<BoundGapFill>),
78}
79
80impl RewriteExprsRecursive for Relation {
81 fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
82 match self {
83 Relation::Subquery(inner) => inner.rewrite_exprs_recursive(rewriter),
84 Relation::Join(inner) => inner.rewrite_exprs_recursive(rewriter),
85 Relation::Apply(inner) => inner.rewrite_exprs_recursive(rewriter),
86 Relation::WindowTableFunction(inner) => inner.rewrite_exprs_recursive(rewriter),
87 Relation::Watermark(inner) => inner.rewrite_exprs_recursive(rewriter),
88 Relation::Share(inner) => inner.rewrite_exprs_recursive(rewriter),
89 Relation::TableFunction { expr: inner, .. } => {
90 *inner = rewriter.rewrite_expr(inner.take())
91 }
92 Relation::BackCteRef(inner) => inner.rewrite_exprs_recursive(rewriter),
93 _ => {}
94 }
95 }
96}
97
98impl Relation {
99 pub fn is_correlated_by_depth(&self, depth: Depth) -> bool {
100 match self {
101 Relation::Subquery(subquery) => subquery.query.is_correlated_by_depth(depth),
102 Relation::Join(join) | Relation::Apply(join) => {
103 join.cond.has_correlated_input_ref_by_depth(depth)
104 || join.left.is_correlated_by_depth(depth)
105 || join.right.is_correlated_by_depth(depth)
106 }
107 _ => false,
108 }
109 }
110
111 pub fn is_correlated_by_correlated_id(&self, correlated_id: CorrelatedId) -> bool {
112 match self {
113 Relation::Subquery(subquery) => {
114 subquery.query.is_correlated_by_correlated_id(correlated_id)
115 }
116 Relation::Join(join) | Relation::Apply(join) => {
117 join.cond
118 .has_correlated_input_ref_by_correlated_id(correlated_id)
119 || join.left.is_correlated_by_correlated_id(correlated_id)
120 || join.right.is_correlated_by_correlated_id(correlated_id)
121 }
122 _ => false,
123 }
124 }
125
126 pub fn collect_correlated_indices_by_depth_and_assign_id(
127 &mut self,
128 depth: Depth,
129 correlated_id: CorrelatedId,
130 ) -> Vec<usize> {
131 match self {
132 Relation::Subquery(subquery) => subquery
133 .query
134 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
135 Relation::Join(join) | Relation::Apply(join) => {
136 let mut correlated_indices = vec![];
137 correlated_indices.extend(
138 join.cond
139 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
140 );
141 correlated_indices.extend(
142 join.left
143 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
144 );
145 correlated_indices.extend(
146 join.right
147 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
148 );
149 correlated_indices
150 }
151 Relation::TableFunction {
152 expr: table_function,
153 with_ordinality: _,
154 } => table_function
155 .collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id),
156 Relation::Share(share) => match &mut share.input {
157 BoundShareInput::Query(query) => match query {
158 Either::Left(query) => query
159 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
160 Either::Right(_) => vec![],
161 },
162 BoundShareInput::ChangeLog(change_log) => change_log
163 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
164 },
165 _ => vec![],
166 }
167 }
168}
169
170#[derive(Debug)]
171#[non_exhaustive]
172pub enum ResolveQualifiedNameErrorKind {
173 QualifiedNameTooLong,
174 NotCurrentDatabase,
175}
176
177#[derive(Debug, Error)]
178pub struct ResolveQualifiedNameError {
179 qualified: String,
180 kind: ResolveQualifiedNameErrorKind,
181}
182
183impl std::fmt::Display for ResolveQualifiedNameError {
184 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
185 match self.kind {
186 ResolveQualifiedNameErrorKind::QualifiedNameTooLong => write!(
187 f,
188 "improper qualified name (too many dotted names): {}",
189 self.qualified
190 ),
191 ResolveQualifiedNameErrorKind::NotCurrentDatabase => write!(
192 f,
193 "cross-database references are not implemented: \"{}\"",
194 self.qualified
195 ),
196 }
197 }
198}
199
200impl ResolveQualifiedNameError {
201 pub fn new(qualified: String, kind: ResolveQualifiedNameErrorKind) -> Self {
202 Self { qualified, kind }
203 }
204}
205
206impl From<ResolveQualifiedNameError> for RwError {
207 fn from(e: ResolveQualifiedNameError) -> Self {
208 ErrorCode::InvalidInputSyntax(format!("{}", e.as_report())).into()
209 }
210}
211
212impl Binder {
213 pub fn resolve_schema_qualified_name(
215 db_name: &str,
216 name: &ObjectName,
217 ) -> std::result::Result<(Option<String>, String), ResolveQualifiedNameError> {
218 let formatted_name = name.to_string();
219 let mut identifiers = name.0.clone();
220
221 if identifiers.len() > 3 {
222 return Err(ResolveQualifiedNameError::new(
223 formatted_name,
224 ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
225 ));
226 }
227
228 let name = identifiers.pop().unwrap().real_value();
229
230 let schema_name = identifiers.pop().map(|ident| ident.real_value());
231 let database_name = identifiers.pop().map(|ident| ident.real_value());
232
233 if let Some(database_name) = database_name
234 && database_name != db_name
235 {
236 return Err(ResolveQualifiedNameError::new(
237 formatted_name,
238 ResolveQualifiedNameErrorKind::NotCurrentDatabase,
239 ));
240 }
241
242 Ok((schema_name, name))
243 }
244
245 pub fn validate_cross_db_reference(
247 db_name: &str,
248 name: &ObjectName,
249 ) -> std::result::Result<(), ResolveQualifiedNameError> {
250 let formatted_name = name.to_string();
251 let identifiers = &name.0;
252 if identifiers.len() > 3 {
253 return Err(ResolveQualifiedNameError::new(
254 formatted_name,
255 ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
256 ));
257 }
258
259 if identifiers.len() == 3 && identifiers[0].real_value() != db_name {
260 return Err(ResolveQualifiedNameError::new(
261 formatted_name,
262 ResolveQualifiedNameErrorKind::NotCurrentDatabase,
263 ));
264 }
265
266 Ok(())
267 }
268
269 pub fn resolve_db_schema_qualified_name(
271 name: &ObjectName,
272 ) -> std::result::Result<(Option<String>, Option<String>, String), ResolveQualifiedNameError>
273 {
274 let formatted_name = name.to_string();
275 let mut identifiers = name.0.clone();
276
277 if identifiers.len() > 3 {
278 return Err(ResolveQualifiedNameError::new(
279 formatted_name,
280 ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
281 ));
282 }
283
284 let name = identifiers.pop().unwrap().real_value();
285 let schema_name = identifiers.pop().map(|ident| ident.real_value());
286 let database_name = identifiers.pop().map(|ident| ident.real_value());
287
288 Ok((database_name, schema_name, name))
289 }
290
291 fn resolve_single_name(mut identifiers: Vec<Ident>, ident_desc: &str) -> Result<String> {
293 if identifiers.len() > 1 {
294 bail!("{} must contain 1 argument", ident_desc);
295 }
296 let name = identifiers.pop().unwrap().real_value();
297
298 Ok(name)
299 }
300
301 pub fn resolve_database_name(name: ObjectName) -> Result<String> {
303 Self::resolve_single_name(name.0, "database name")
304 }
305
306 pub fn resolve_schema_name(name: ObjectName) -> Result<String> {
308 Self::resolve_single_name(name.0, "schema name")
309 }
310
311 pub fn resolve_index_name(name: ObjectName) -> Result<String> {
313 Self::resolve_single_name(name.0, "index name")
314 }
315
316 pub fn resolve_view_name(name: ObjectName) -> Result<String> {
318 Self::resolve_single_name(name.0, "view name")
319 }
320
321 pub fn resolve_sink_name(name: ObjectName) -> Result<String> {
323 Self::resolve_single_name(name.0, "sink name")
324 }
325
326 pub fn resolve_subscription_name(name: ObjectName) -> Result<String> {
328 Self::resolve_single_name(name.0, "subscription name")
329 }
330
331 pub fn resolve_table_name(name: ObjectName) -> Result<String> {
333 Self::resolve_single_name(name.0, "table name")
334 }
335
336 pub fn resolve_source_name(name: ObjectName) -> Result<String> {
338 Self::resolve_single_name(name.0, "source name")
339 }
340
341 pub fn resolve_user_name(name: ObjectName) -> Result<String> {
343 Self::resolve_single_name(name.0, "user name")
344 }
345
346 pub(super) fn bind_table_to_context(
348 &mut self,
349 columns: impl IntoIterator<Item = (bool, Field)>, table_name: String,
351 alias: Option<&TableAlias>,
352 ) -> Result<()> {
353 const EMPTY: [Ident; 0] = [];
354 let (table_name, column_aliases) = match alias {
355 None => (table_name, &EMPTY[..]),
356 Some(TableAlias { name, columns }) => (name.real_value(), columns.as_slice()),
357 };
358
359 let num_col_aliases = column_aliases.len();
360
361 let begin = self.context.columns.len();
362 let mut alias_iter = column_aliases.iter().fuse();
365 let mut index = 0;
366 columns.into_iter().for_each(|(is_hidden, mut field)| {
367 let name = match is_hidden {
368 true => field.name.clone(),
369 false => alias_iter
370 .next()
371 .map(|t| t.real_value())
372 .unwrap_or_else(|| field.name.clone()),
373 };
374 field.name.clone_from(&name);
375 self.context.columns.push(ColumnBinding::new(
376 table_name.clone(),
377 begin + index,
378 is_hidden,
379 field,
380 ));
381 self.context
382 .indices_of
383 .entry(name)
384 .or_default()
385 .push(self.context.columns.len() - 1);
386 index += 1;
387 });
388
389 let num_cols = index;
390 if num_cols < num_col_aliases {
391 return Err(ErrorCode::BindError(format!(
392 "table \"{table_name}\" has {num_cols} columns available but {num_col_aliases} column aliases specified",
393 ))
394 .into());
395 }
396
397 match self.context.range_of.entry(table_name.clone()) {
398 Entry::Occupied(_) => Err(ErrorCode::InternalError(format!(
399 "Duplicated table name while binding table to context: {}",
400 table_name
401 ))
402 .into()),
403 Entry::Vacant(entry) => {
404 entry.insert((begin, self.context.columns.len()));
405 Ok(())
406 }
407 }
408 }
409
410 pub fn bind_relation_by_name(
415 &mut self,
416 name: &ObjectName,
417 alias: Option<&TableAlias>,
418 as_of: Option<&AsOf>,
419 allow_cross_db: bool,
420 ) -> Result<Relation> {
421 let (db_name, schema_name, table_name) = if allow_cross_db {
422 Self::resolve_db_schema_qualified_name(name)?
423 } else {
424 let (schema_name, table_name) =
425 Self::resolve_schema_qualified_name(&self.db_name, name)?;
426 (None, schema_name, table_name)
427 };
428
429 if schema_name.is_none()
430 && let Some(item) = self.context.cte_to_relation.get(&table_name)
432 {
433 if as_of.is_some() {
436 return Err(ErrorCode::BindError(
437 "Right table of a temporal join should not be a CTE. \
438 It should be a table, index, or materialized view"
439 .to_owned(),
440 )
441 .into());
442 }
443
444 let BindingCte {
445 share_id,
446 state: cte_state,
447 alias: mut original_alias,
448 } = item.deref().borrow().clone();
449
450 debug_assert_eq!(original_alias.name.real_value(), table_name);
452
453 if let Some(from_alias) = alias {
454 original_alias.name = from_alias.name.clone();
455 original_alias.columns = original_alias
456 .columns
457 .into_iter()
458 .zip_longest(from_alias.columns.iter().cloned())
459 .map(EitherOrBoth::into_right)
460 .collect();
461 }
462
463 match cte_state {
464 BindingCteState::Init => {
465 Err(ErrorCode::BindError("Base term of recursive CTE not found, consider writing it to left side of the `UNION ALL` operator".to_owned()).into())
466 }
467 BindingCteState::BaseResolved { base } => {
468 self.bind_table_to_context(
469 base.schema().fields.iter().map(|f| (false, f.clone())),
470 table_name,
471 Some(&original_alias),
472 )?;
473 Ok(Relation::BackCteRef(Box::new(BoundBackCteRef { share_id, base })))
474 }
475 BindingCteState::Bound { query } => {
476 let input = BoundShareInput::Query(query);
477 self.bind_table_to_context(
478 input.fields()?,
479 table_name,
480 Some(&original_alias),
481 )?;
482 Ok(Relation::Share(Box::new(BoundShare { share_id, input})))
485 }
486 BindingCteState::ChangeLog { table } => {
487 let input = BoundShareInput::ChangeLog(table);
488 self.bind_table_to_context(
489 input.fields()?,
490 table_name,
491 Some(&original_alias),
492 )?;
493 Ok(Relation::Share(Box::new(BoundShare { share_id, input })))
494 },
495 }
496 } else {
497 self.bind_catalog_relation_by_name(
498 db_name.as_deref(),
499 schema_name.as_deref(),
500 &table_name,
501 alias,
502 as_of,
503 false,
504 )
505 }
506 }
507
508 fn bind_relation_by_function_arg(
510 &mut self,
511 arg: Option<&FunctionArg>,
512 err_msg: &str,
513 ) -> Result<(Relation, ObjectName)> {
514 let Some(FunctionArg::Unnamed(FunctionArgExpr::Expr(expr))) = arg else {
515 return Err(ErrorCode::BindError(err_msg.to_owned()).into());
516 };
517 let table_name = match expr {
518 ParserExpr::Identifier(ident) => Ok::<_, RwError>(ObjectName(vec![ident.clone()])),
519 ParserExpr::CompoundIdentifier(idents) => Ok(ObjectName(idents.clone())),
520 _ => Err(ErrorCode::BindError(err_msg.to_owned()).into()),
521 }?;
522
523 Ok((
524 self.bind_relation_by_name(&table_name, None, None, true)?,
525 table_name,
526 ))
527 }
528
529 fn bind_column_by_function_args(
531 &mut self,
532 arg: Option<&FunctionArg>,
533 err_msg: &str,
534 ) -> Result<Box<InputRef>> {
535 if let Some(time_col_arg) = arg
536 && let Some(ExprImpl::InputRef(time_col)) =
537 self.bind_function_arg(time_col_arg)?.into_iter().next()
538 {
539 Ok(time_col)
540 } else {
541 Err(ErrorCode::BindError(err_msg.to_owned()).into())
542 }
543 }
544
545 fn bind_internal_table(
547 &mut self,
548 args: &[FunctionArg],
549 alias: Option<&TableAlias>,
550 ) -> Result<Relation> {
551 if args.is_empty() || args.len() > 2 {
552 return Err(
553 ErrorCode::BindError("usage: rw_table(table_id[,schema_name])".to_owned()).into(),
554 );
555 }
556
557 let table_id: TableId = args[0]
558 .to_string()
559 .parse::<u32>()
560 .map_err(|err| {
561 RwError::from(ErrorCode::BindError(format!(
562 "invalid table id: {}",
563 err.as_report()
564 )))
565 })?
566 .into();
567
568 let schema = args.get(1).map(|arg| arg.to_string());
569
570 let table_name = self.catalog.get_table_name_by_id(table_id)?;
571 self.bind_catalog_relation_by_name(None, schema.as_deref(), &table_name, alias, None, false)
572 }
573
574 pub(super) fn bind_table_factor(&mut self, table_factor: &TableFactor) -> Result<Relation> {
575 match table_factor {
576 TableFactor::Table { name, alias, as_of } => {
577 self.bind_relation_by_name(name, alias.as_ref(), as_of.as_ref(), true)
578 }
579 TableFactor::TableFunction {
580 name,
581 alias,
582 args,
583 with_ordinality,
584 } => {
585 self.try_mark_lateral_as_visible();
586 let result = self.bind_table_function(name, alias.as_ref(), args, *with_ordinality);
587 self.try_mark_lateral_as_invisible();
588 result
589 }
590 TableFactor::Derived {
591 lateral,
592 subquery,
593 alias,
594 } => {
595 if *lateral {
596 self.try_mark_lateral_as_visible();
598
599 let bound_subquery =
601 self.bind_subquery_relation(subquery, alias.as_ref(), true)?;
602
603 self.try_mark_lateral_as_invisible();
605 Ok(Relation::Subquery(Box::new(bound_subquery)))
606 } else {
607 self.push_lateral_context();
609 let bound_subquery =
610 self.bind_subquery_relation(subquery, alias.as_ref(), false)?;
611 self.pop_and_merge_lateral_context()?;
612 Ok(Relation::Subquery(Box::new(bound_subquery)))
613 }
614 }
615 TableFactor::NestedJoin(table_with_joins) => {
616 self.push_lateral_context();
617 let bound_join = self.bind_table_with_joins(table_with_joins)?;
618 self.pop_and_merge_lateral_context()?;
619 Ok(bound_join)
620 }
621 }
622 }
623}