1use std::collections::hash_map::Entry;
16use std::ops::Deref;
17
18use either::Either;
19use itertools::{EitherOrBoth, Itertools};
20use risingwave_common::bail;
21use risingwave_common::catalog::{Field, TableId};
22use risingwave_sqlparser::ast::{
23 AsOf, Expr as ParserExpr, FunctionArg, FunctionArgExpr, Ident, ObjectName, TableAlias,
24 TableFactor,
25};
26use thiserror::Error;
27use thiserror_ext::AsReport;
28
29use super::bind_context::ColumnBinding;
30use super::statement::RewriteExprsRecursive;
31use crate::binder::Binder;
32use crate::binder::bind_context::{BindingCte, BindingCteState};
33use crate::error::{ErrorCode, Result, RwError};
34use crate::expr::{ExprImpl, InputRef};
35
36mod cte_ref;
37mod join;
38mod share;
39mod subquery;
40mod table_function;
41mod table_or_source;
42mod watermark;
43mod window_table_function;
44
45pub use cte_ref::BoundBackCteRef;
46pub use join::BoundJoin;
47pub use share::{BoundShare, BoundShareInput};
48pub use subquery::BoundSubquery;
49pub use table_or_source::{BoundBaseTable, BoundSource, BoundSystemTable};
50pub use watermark::BoundWatermark;
51pub use window_table_function::{BoundWindowTableFunction, WindowTableFunctionKind};
52
53use crate::expr::{CorrelatedId, Depth};
54
55#[derive(Debug, Clone)]
58pub enum Relation {
59 Source(Box<BoundSource>),
60 BaseTable(Box<BoundBaseTable>),
61 SystemTable(Box<BoundSystemTable>),
62 Subquery(Box<BoundSubquery>),
63 Join(Box<BoundJoin>),
64 Apply(Box<BoundJoin>),
65 WindowTableFunction(Box<BoundWindowTableFunction>),
66 TableFunction {
68 expr: ExprImpl,
69 with_ordinality: bool,
70 },
71 Watermark(Box<BoundWatermark>),
72 Share(Box<BoundShare>),
74 BackCteRef(Box<BoundBackCteRef>),
75}
76
77impl RewriteExprsRecursive for Relation {
78 fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
79 match self {
80 Relation::Subquery(inner) => inner.rewrite_exprs_recursive(rewriter),
81 Relation::Join(inner) => inner.rewrite_exprs_recursive(rewriter),
82 Relation::Apply(inner) => inner.rewrite_exprs_recursive(rewriter),
83 Relation::WindowTableFunction(inner) => inner.rewrite_exprs_recursive(rewriter),
84 Relation::Watermark(inner) => inner.rewrite_exprs_recursive(rewriter),
85 Relation::Share(inner) => inner.rewrite_exprs_recursive(rewriter),
86 Relation::TableFunction { expr: inner, .. } => {
87 *inner = rewriter.rewrite_expr(inner.take())
88 }
89 Relation::BackCteRef(inner) => inner.rewrite_exprs_recursive(rewriter),
90 _ => {}
91 }
92 }
93}
94
95impl Relation {
96 pub fn is_correlated_by_depth(&self, depth: Depth) -> bool {
97 match self {
98 Relation::Subquery(subquery) => subquery.query.is_correlated_by_depth(depth),
99 Relation::Join(join) | Relation::Apply(join) => {
100 join.cond.has_correlated_input_ref_by_depth(depth)
101 || join.left.is_correlated_by_depth(depth)
102 || join.right.is_correlated_by_depth(depth)
103 }
104 _ => false,
105 }
106 }
107
108 pub fn is_correlated_by_correlated_id(&self, correlated_id: CorrelatedId) -> bool {
109 match self {
110 Relation::Subquery(subquery) => {
111 subquery.query.is_correlated_by_correlated_id(correlated_id)
112 }
113 Relation::Join(join) | Relation::Apply(join) => {
114 join.cond
115 .has_correlated_input_ref_by_correlated_id(correlated_id)
116 || join.left.is_correlated_by_correlated_id(correlated_id)
117 || join.right.is_correlated_by_correlated_id(correlated_id)
118 }
119 _ => false,
120 }
121 }
122
123 pub fn collect_correlated_indices_by_depth_and_assign_id(
124 &mut self,
125 depth: Depth,
126 correlated_id: CorrelatedId,
127 ) -> Vec<usize> {
128 match self {
129 Relation::Subquery(subquery) => subquery
130 .query
131 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
132 Relation::Join(join) | Relation::Apply(join) => {
133 let mut correlated_indices = vec![];
134 correlated_indices.extend(
135 join.cond
136 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
137 );
138 correlated_indices.extend(
139 join.left
140 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
141 );
142 correlated_indices.extend(
143 join.right
144 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
145 );
146 correlated_indices
147 }
148 Relation::TableFunction {
149 expr: table_function,
150 with_ordinality: _,
151 } => table_function
152 .collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id),
153 Relation::Share(share) => match &mut share.input {
154 BoundShareInput::Query(query) => match query {
155 Either::Left(query) => query
156 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
157 Either::Right(_) => vec![],
158 },
159 BoundShareInput::ChangeLog(change_log) => change_log
160 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
161 },
162 _ => vec![],
163 }
164 }
165}
166
167#[derive(Debug)]
168#[non_exhaustive]
169pub enum ResolveQualifiedNameErrorKind {
170 QualifiedNameTooLong,
171 NotCurrentDatabase,
172}
173
174#[derive(Debug, Error)]
175pub struct ResolveQualifiedNameError {
176 qualified: String,
177 kind: ResolveQualifiedNameErrorKind,
178}
179
180impl std::fmt::Display for ResolveQualifiedNameError {
181 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182 match self.kind {
183 ResolveQualifiedNameErrorKind::QualifiedNameTooLong => write!(
184 f,
185 "improper qualified name (too many dotted names): {}",
186 self.qualified
187 ),
188 ResolveQualifiedNameErrorKind::NotCurrentDatabase => write!(
189 f,
190 "cross-database references are not implemented: \"{}\"",
191 self.qualified
192 ),
193 }
194 }
195}
196
197impl ResolveQualifiedNameError {
198 pub fn new(qualified: String, kind: ResolveQualifiedNameErrorKind) -> Self {
199 Self { qualified, kind }
200 }
201}
202
203impl From<ResolveQualifiedNameError> for RwError {
204 fn from(e: ResolveQualifiedNameError) -> Self {
205 ErrorCode::InvalidInputSyntax(format!("{}", e.as_report())).into()
206 }
207}
208
209impl Binder {
210 pub fn resolve_schema_qualified_name(
212 db_name: &str,
213 name: &ObjectName,
214 ) -> std::result::Result<(Option<String>, String), ResolveQualifiedNameError> {
215 let formatted_name = name.to_string();
216 let mut identifiers = name.0.clone();
217
218 if identifiers.len() > 3 {
219 return Err(ResolveQualifiedNameError::new(
220 formatted_name,
221 ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
222 ));
223 }
224
225 let name = identifiers.pop().unwrap().real_value();
226
227 let schema_name = identifiers.pop().map(|ident| ident.real_value());
228 let database_name = identifiers.pop().map(|ident| ident.real_value());
229
230 if let Some(database_name) = database_name
231 && database_name != db_name
232 {
233 return Err(ResolveQualifiedNameError::new(
234 formatted_name,
235 ResolveQualifiedNameErrorKind::NotCurrentDatabase,
236 ));
237 }
238
239 Ok((schema_name, name))
240 }
241
242 pub fn validate_cross_db_reference(
244 db_name: &str,
245 name: &ObjectName,
246 ) -> std::result::Result<(), ResolveQualifiedNameError> {
247 let formatted_name = name.to_string();
248 let identifiers = &name.0;
249 if identifiers.len() > 3 {
250 return Err(ResolveQualifiedNameError::new(
251 formatted_name,
252 ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
253 ));
254 }
255
256 if identifiers.len() == 3 && identifiers[0].real_value() != db_name {
257 return Err(ResolveQualifiedNameError::new(
258 formatted_name,
259 ResolveQualifiedNameErrorKind::NotCurrentDatabase,
260 ));
261 }
262
263 Ok(())
264 }
265
266 pub fn resolve_db_schema_qualified_name(
268 name: &ObjectName,
269 ) -> std::result::Result<(Option<String>, Option<String>, String), ResolveQualifiedNameError>
270 {
271 let formatted_name = name.to_string();
272 let mut identifiers = name.0.clone();
273
274 if identifiers.len() > 3 {
275 return Err(ResolveQualifiedNameError::new(
276 formatted_name,
277 ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
278 ));
279 }
280
281 let name = identifiers.pop().unwrap().real_value();
282 let schema_name = identifiers.pop().map(|ident| ident.real_value());
283 let database_name = identifiers.pop().map(|ident| ident.real_value());
284
285 Ok((database_name, schema_name, name))
286 }
287
288 fn resolve_single_name(mut identifiers: Vec<Ident>, ident_desc: &str) -> Result<String> {
290 if identifiers.len() > 1 {
291 bail!("{} must contain 1 argument", ident_desc);
292 }
293 let name = identifiers.pop().unwrap().real_value();
294
295 Ok(name)
296 }
297
298 pub fn resolve_database_name(name: ObjectName) -> Result<String> {
300 Self::resolve_single_name(name.0, "database name")
301 }
302
303 pub fn resolve_schema_name(name: ObjectName) -> Result<String> {
305 Self::resolve_single_name(name.0, "schema name")
306 }
307
308 pub fn resolve_index_name(name: ObjectName) -> Result<String> {
310 Self::resolve_single_name(name.0, "index name")
311 }
312
313 pub fn resolve_view_name(name: ObjectName) -> Result<String> {
315 Self::resolve_single_name(name.0, "view name")
316 }
317
318 pub fn resolve_sink_name(name: ObjectName) -> Result<String> {
320 Self::resolve_single_name(name.0, "sink name")
321 }
322
323 pub fn resolve_subscription_name(name: ObjectName) -> Result<String> {
325 Self::resolve_single_name(name.0, "subscription name")
326 }
327
328 pub fn resolve_table_name(name: ObjectName) -> Result<String> {
330 Self::resolve_single_name(name.0, "table name")
331 }
332
333 pub fn resolve_source_name(name: ObjectName) -> Result<String> {
335 Self::resolve_single_name(name.0, "source name")
336 }
337
338 pub fn resolve_user_name(name: ObjectName) -> Result<String> {
340 Self::resolve_single_name(name.0, "user name")
341 }
342
343 pub(super) fn bind_table_to_context(
345 &mut self,
346 columns: impl IntoIterator<Item = (bool, Field)>, table_name: String,
348 alias: Option<&TableAlias>,
349 ) -> Result<()> {
350 const EMPTY: [Ident; 0] = [];
351 let (table_name, column_aliases) = match alias {
352 None => (table_name, &EMPTY[..]),
353 Some(TableAlias { name, columns }) => (name.real_value(), columns.as_slice()),
354 };
355
356 let num_col_aliases = column_aliases.len();
357
358 let begin = self.context.columns.len();
359 let mut alias_iter = column_aliases.iter().fuse();
362 let mut index = 0;
363 columns.into_iter().for_each(|(is_hidden, mut field)| {
364 let name = match is_hidden {
365 true => field.name.clone(),
366 false => alias_iter
367 .next()
368 .map(|t| t.real_value())
369 .unwrap_or_else(|| field.name.clone()),
370 };
371 field.name.clone_from(&name);
372 self.context.columns.push(ColumnBinding::new(
373 table_name.clone(),
374 begin + index,
375 is_hidden,
376 field,
377 ));
378 self.context
379 .indices_of
380 .entry(name)
381 .or_default()
382 .push(self.context.columns.len() - 1);
383 index += 1;
384 });
385
386 let num_cols = index;
387 if num_cols < num_col_aliases {
388 return Err(ErrorCode::BindError(format!(
389 "table \"{table_name}\" has {num_cols} columns available but {num_col_aliases} column aliases specified",
390 ))
391 .into());
392 }
393
394 match self.context.range_of.entry(table_name.clone()) {
395 Entry::Occupied(_) => Err(ErrorCode::InternalError(format!(
396 "Duplicated table name while binding table to context: {}",
397 table_name
398 ))
399 .into()),
400 Entry::Vacant(entry) => {
401 entry.insert((begin, self.context.columns.len()));
402 Ok(())
403 }
404 }
405 }
406
407 pub fn bind_relation_by_name(
412 &mut self,
413 name: &ObjectName,
414 alias: Option<&TableAlias>,
415 as_of: Option<&AsOf>,
416 allow_cross_db: bool,
417 ) -> Result<Relation> {
418 let (db_name, schema_name, table_name) = if allow_cross_db {
419 Self::resolve_db_schema_qualified_name(name)?
420 } else {
421 let (schema_name, table_name) =
422 Self::resolve_schema_qualified_name(&self.db_name, name)?;
423 (None, schema_name, table_name)
424 };
425
426 if schema_name.is_none()
427 && let Some(item) = self.context.cte_to_relation.get(&table_name)
429 {
430 if as_of.is_some() {
433 return Err(ErrorCode::BindError(
434 "Right table of a temporal join should not be a CTE. \
435 It should be a table, index, or materialized view"
436 .to_owned(),
437 )
438 .into());
439 }
440
441 let BindingCte {
442 share_id,
443 state: cte_state,
444 alias: mut original_alias,
445 } = item.deref().borrow().clone();
446
447 debug_assert_eq!(original_alias.name.real_value(), table_name);
449
450 if let Some(from_alias) = alias {
451 original_alias.name = from_alias.name.clone();
452 original_alias.columns = original_alias
453 .columns
454 .into_iter()
455 .zip_longest(from_alias.columns.iter().cloned())
456 .map(EitherOrBoth::into_right)
457 .collect();
458 }
459
460 match cte_state {
461 BindingCteState::Init => {
462 Err(ErrorCode::BindError("Base term of recursive CTE not found, consider writing it to left side of the `UNION ALL` operator".to_owned()).into())
463 }
464 BindingCteState::BaseResolved { base } => {
465 self.bind_table_to_context(
466 base.schema().fields.iter().map(|f| (false, f.clone())),
467 table_name,
468 Some(&original_alias),
469 )?;
470 Ok(Relation::BackCteRef(Box::new(BoundBackCteRef { share_id, base })))
471 }
472 BindingCteState::Bound { query } => {
473 let input = BoundShareInput::Query(query);
474 self.bind_table_to_context(
475 input.fields()?,
476 table_name,
477 Some(&original_alias),
478 )?;
479 Ok(Relation::Share(Box::new(BoundShare { share_id, input})))
482 }
483 BindingCteState::ChangeLog { table } => {
484 let input = BoundShareInput::ChangeLog(table);
485 self.bind_table_to_context(
486 input.fields()?,
487 table_name,
488 Some(&original_alias),
489 )?;
490 Ok(Relation::Share(Box::new(BoundShare { share_id, input })))
491 },
492 }
493 } else {
494 self.bind_catalog_relation_by_name(
495 db_name.as_deref(),
496 schema_name.as_deref(),
497 &table_name,
498 alias,
499 as_of,
500 false,
501 )
502 }
503 }
504
505 fn bind_relation_by_function_arg(
507 &mut self,
508 arg: Option<&FunctionArg>,
509 err_msg: &str,
510 ) -> Result<(Relation, ObjectName)> {
511 let Some(FunctionArg::Unnamed(FunctionArgExpr::Expr(expr))) = arg else {
512 return Err(ErrorCode::BindError(err_msg.to_owned()).into());
513 };
514 let table_name = match expr {
515 ParserExpr::Identifier(ident) => Ok::<_, RwError>(ObjectName(vec![ident.clone()])),
516 ParserExpr::CompoundIdentifier(idents) => Ok(ObjectName(idents.clone())),
517 _ => Err(ErrorCode::BindError(err_msg.to_owned()).into()),
518 }?;
519
520 Ok((
521 self.bind_relation_by_name(&table_name, None, None, true)?,
522 table_name,
523 ))
524 }
525
526 fn bind_column_by_function_args(
528 &mut self,
529 arg: Option<&FunctionArg>,
530 err_msg: &str,
531 ) -> Result<Box<InputRef>> {
532 if let Some(time_col_arg) = arg
533 && let Some(ExprImpl::InputRef(time_col)) =
534 self.bind_function_arg(time_col_arg)?.into_iter().next()
535 {
536 Ok(time_col)
537 } else {
538 Err(ErrorCode::BindError(err_msg.to_owned()).into())
539 }
540 }
541
542 fn bind_internal_table(
544 &mut self,
545 args: &[FunctionArg],
546 alias: Option<&TableAlias>,
547 ) -> Result<Relation> {
548 if args.is_empty() || args.len() > 2 {
549 return Err(
550 ErrorCode::BindError("usage: rw_table(table_id[,schema_name])".to_owned()).into(),
551 );
552 }
553
554 let table_id: TableId = args[0]
555 .to_string()
556 .parse::<u32>()
557 .map_err(|err| {
558 RwError::from(ErrorCode::BindError(format!(
559 "invalid table id: {}",
560 err.as_report()
561 )))
562 })?
563 .into();
564
565 let schema = args.get(1).map(|arg| arg.to_string());
566
567 let table_name = self.catalog.get_table_name_by_id(table_id)?;
568 self.bind_catalog_relation_by_name(None, schema.as_deref(), &table_name, alias, None, false)
569 }
570
571 pub(super) fn bind_table_factor(&mut self, table_factor: &TableFactor) -> Result<Relation> {
572 match table_factor {
573 TableFactor::Table { name, alias, as_of } => {
574 self.bind_relation_by_name(name, alias.as_ref(), as_of.as_ref(), true)
575 }
576 TableFactor::TableFunction {
577 name,
578 alias,
579 args,
580 with_ordinality,
581 } => {
582 self.try_mark_lateral_as_visible();
583 let result = self.bind_table_function(name, alias.as_ref(), args, *with_ordinality);
584 self.try_mark_lateral_as_invisible();
585 result
586 }
587 TableFactor::Derived {
588 lateral,
589 subquery,
590 alias,
591 } => {
592 if *lateral {
593 self.try_mark_lateral_as_visible();
595
596 let bound_subquery =
598 self.bind_subquery_relation(subquery, alias.as_ref(), true)?;
599
600 self.try_mark_lateral_as_invisible();
602 Ok(Relation::Subquery(Box::new(bound_subquery)))
603 } else {
604 self.push_lateral_context();
606 let bound_subquery =
607 self.bind_subquery_relation(subquery, alias.as_ref(), false)?;
608 self.pop_and_merge_lateral_context()?;
609 Ok(Relation::Subquery(Box::new(bound_subquery)))
610 }
611 }
612 TableFactor::NestedJoin(table_with_joins) => {
613 self.push_lateral_context();
614 let bound_join = self.bind_table_with_joins(table_with_joins)?;
615 self.pop_and_merge_lateral_context()?;
616 Ok(bound_join)
617 }
618 }
619 }
620}