1use std::collections::hash_map::Entry;
16use std::ops::Deref;
17
18use itertools::{EitherOrBoth, Itertools};
19use risingwave_common::bail;
20use risingwave_common::catalog::{Field, TableId};
21use risingwave_sqlparser::ast::{
22 AsOf, Expr as ParserExpr, FunctionArg, FunctionArgExpr, Ident, ObjectName, TableAlias,
23 TableFactor,
24};
25use thiserror::Error;
26use thiserror_ext::AsReport;
27
28use super::bind_context::ColumnBinding;
29use super::statement::RewriteExprsRecursive;
30use crate::binder::Binder;
31use crate::binder::bind_context::{BindingCte, BindingCteState};
32use crate::error::{ErrorCode, Result, RwError};
33use crate::expr::{ExprImpl, InputRef};
34
35mod gap_fill;
36mod join;
37mod share;
38mod subquery;
39mod table_function;
40mod table_or_source;
41mod watermark;
42mod window_table_function;
43
44pub use gap_fill::BoundGapFill;
45pub use join::BoundJoin;
46pub use share::{BoundShare, BoundShareInput};
47pub use subquery::BoundSubquery;
48pub use table_or_source::{BoundBaseTable, BoundSource, BoundSystemTable};
49pub use watermark::BoundWatermark;
50pub use window_table_function::{BoundWindowTableFunction, WindowTableFunctionKind};
51
52use crate::expr::{CorrelatedId, Depth};
53
54#[derive(Debug, Clone)]
57pub enum Relation {
58 Source(Box<BoundSource>),
59 BaseTable(Box<BoundBaseTable>),
60 SystemTable(Box<BoundSystemTable>),
61 Subquery(Box<BoundSubquery>),
62 Join(Box<BoundJoin>),
63 Apply(Box<BoundJoin>),
64 WindowTableFunction(Box<BoundWindowTableFunction>),
65 TableFunction {
67 expr: ExprImpl,
68 with_ordinality: bool,
69 },
70 Watermark(Box<BoundWatermark>),
71 Share(Box<BoundShare>),
72 GapFill(Box<BoundGapFill>),
73}
74
75impl RewriteExprsRecursive for Relation {
76 fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
77 match self {
78 Relation::Subquery(inner) => inner.rewrite_exprs_recursive(rewriter),
79 Relation::Join(inner) => inner.rewrite_exprs_recursive(rewriter),
80 Relation::Apply(inner) => inner.rewrite_exprs_recursive(rewriter),
81 Relation::WindowTableFunction(inner) => inner.rewrite_exprs_recursive(rewriter),
82 Relation::Watermark(inner) => inner.rewrite_exprs_recursive(rewriter),
83 Relation::Share(inner) => inner.rewrite_exprs_recursive(rewriter),
84 Relation::TableFunction { expr: inner, .. } => {
85 *inner = rewriter.rewrite_expr(inner.take())
86 }
87 _ => {}
88 }
89 }
90}
91
92impl Relation {
93 pub fn is_correlated_by_depth(&self, depth: Depth) -> bool {
94 match self {
95 Relation::Subquery(subquery) => subquery.query.is_correlated_by_depth(depth),
96 Relation::Join(join) | Relation::Apply(join) => {
97 join.cond.has_correlated_input_ref_by_depth(depth)
98 || join.left.is_correlated_by_depth(depth)
99 || join.right.is_correlated_by_depth(depth)
100 }
101 _ => false,
102 }
103 }
104
105 pub fn is_correlated_by_correlated_id(&self, correlated_id: CorrelatedId) -> bool {
106 match self {
107 Relation::Subquery(subquery) => {
108 subquery.query.is_correlated_by_correlated_id(correlated_id)
109 }
110 Relation::Join(join) | Relation::Apply(join) => {
111 join.cond
112 .has_correlated_input_ref_by_correlated_id(correlated_id)
113 || join.left.is_correlated_by_correlated_id(correlated_id)
114 || join.right.is_correlated_by_correlated_id(correlated_id)
115 }
116 _ => false,
117 }
118 }
119
120 pub fn collect_correlated_indices_by_depth_and_assign_id(
121 &mut self,
122 depth: Depth,
123 correlated_id: CorrelatedId,
124 ) -> Vec<usize> {
125 match self {
126 Relation::Subquery(subquery) => subquery
127 .query
128 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
129 Relation::Join(join) | Relation::Apply(join) => {
130 let mut correlated_indices = vec![];
131 correlated_indices.extend(
132 join.cond
133 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
134 );
135 correlated_indices.extend(
136 join.left
137 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
138 );
139 correlated_indices.extend(
140 join.right
141 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
142 );
143 correlated_indices
144 }
145 Relation::TableFunction {
146 expr: table_function,
147 with_ordinality: _,
148 } => table_function
149 .collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id),
150 Relation::Share(share) => match &mut share.input {
151 BoundShareInput::Query(query) => {
152 query.collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id)
153 }
154 BoundShareInput::ChangeLog(change_log) => change_log
155 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
156 },
157 _ => vec![],
158 }
159 }
160}
161
162#[derive(Debug)]
163#[non_exhaustive]
164pub enum ResolveQualifiedNameErrorKind {
165 QualifiedNameTooLong,
166 NotCurrentDatabase,
167}
168
169#[derive(Debug, Error)]
170pub struct ResolveQualifiedNameError {
171 qualified: String,
172 kind: ResolveQualifiedNameErrorKind,
173}
174
175impl std::fmt::Display for ResolveQualifiedNameError {
176 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177 match self.kind {
178 ResolveQualifiedNameErrorKind::QualifiedNameTooLong => write!(
179 f,
180 "improper qualified name (too many dotted names): {}",
181 self.qualified
182 ),
183 ResolveQualifiedNameErrorKind::NotCurrentDatabase => write!(
184 f,
185 "cross-database references are not implemented: \"{}\"",
186 self.qualified
187 ),
188 }
189 }
190}
191
192impl ResolveQualifiedNameError {
193 pub fn new(qualified: String, kind: ResolveQualifiedNameErrorKind) -> Self {
194 Self { qualified, kind }
195 }
196}
197
198impl From<ResolveQualifiedNameError> for RwError {
199 fn from(e: ResolveQualifiedNameError) -> Self {
200 ErrorCode::InvalidInputSyntax(format!("{}", e.as_report())).into()
201 }
202}
203
204impl Binder {
205 pub fn resolve_schema_qualified_name(
207 db_name: &str,
208 name: &ObjectName,
209 ) -> std::result::Result<(Option<String>, String), ResolveQualifiedNameError> {
210 let formatted_name = name.to_string();
211 let mut identifiers = name.0.clone();
212
213 if identifiers.len() > 3 {
214 return Err(ResolveQualifiedNameError::new(
215 formatted_name,
216 ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
217 ));
218 }
219
220 let name = identifiers.pop().unwrap().real_value();
221
222 let schema_name = identifiers.pop().map(|ident| ident.real_value());
223 let database_name = identifiers.pop().map(|ident| ident.real_value());
224
225 if let Some(database_name) = database_name
226 && database_name != db_name
227 {
228 return Err(ResolveQualifiedNameError::new(
229 formatted_name,
230 ResolveQualifiedNameErrorKind::NotCurrentDatabase,
231 ));
232 }
233
234 Ok((schema_name, name))
235 }
236
237 pub fn validate_cross_db_reference(
239 db_name: &str,
240 name: &ObjectName,
241 ) -> std::result::Result<(), ResolveQualifiedNameError> {
242 let formatted_name = name.to_string();
243 let identifiers = &name.0;
244 if identifiers.len() > 3 {
245 return Err(ResolveQualifiedNameError::new(
246 formatted_name,
247 ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
248 ));
249 }
250
251 if identifiers.len() == 3 && identifiers[0].real_value() != db_name {
252 return Err(ResolveQualifiedNameError::new(
253 formatted_name,
254 ResolveQualifiedNameErrorKind::NotCurrentDatabase,
255 ));
256 }
257
258 Ok(())
259 }
260
261 pub fn resolve_db_schema_qualified_name(
263 name: &ObjectName,
264 ) -> std::result::Result<(Option<String>, Option<String>, String), ResolveQualifiedNameError>
265 {
266 let formatted_name = name.to_string();
267 let mut identifiers = name.0.clone();
268
269 if identifiers.len() > 3 {
270 return Err(ResolveQualifiedNameError::new(
271 formatted_name,
272 ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
273 ));
274 }
275
276 let name = identifiers.pop().unwrap().real_value();
277 let schema_name = identifiers.pop().map(|ident| ident.real_value());
278 let database_name = identifiers.pop().map(|ident| ident.real_value());
279
280 Ok((database_name, schema_name, name))
281 }
282
283 fn resolve_single_name(mut identifiers: Vec<Ident>, ident_desc: &str) -> Result<String> {
285 if identifiers.len() > 1 {
286 bail!("{} must contain 1 argument", ident_desc);
287 }
288 let name = identifiers.pop().unwrap().real_value();
289
290 Ok(name)
291 }
292
293 pub fn resolve_database_name(name: ObjectName) -> Result<String> {
295 Self::resolve_single_name(name.0, "database name")
296 }
297
298 pub fn resolve_schema_name(name: ObjectName) -> Result<String> {
300 Self::resolve_single_name(name.0, "schema name")
301 }
302
303 pub fn resolve_index_name(name: ObjectName) -> Result<String> {
305 Self::resolve_single_name(name.0, "index name")
306 }
307
308 pub fn resolve_view_name(name: ObjectName) -> Result<String> {
310 Self::resolve_single_name(name.0, "view name")
311 }
312
313 pub fn resolve_sink_name(name: ObjectName) -> Result<String> {
315 Self::resolve_single_name(name.0, "sink name")
316 }
317
318 pub fn resolve_subscription_name(name: ObjectName) -> Result<String> {
320 Self::resolve_single_name(name.0, "subscription name")
321 }
322
323 pub fn resolve_table_name(name: ObjectName) -> Result<String> {
325 Self::resolve_single_name(name.0, "table name")
326 }
327
328 pub fn resolve_source_name(name: ObjectName) -> Result<String> {
330 Self::resolve_single_name(name.0, "source name")
331 }
332
333 pub fn resolve_user_name(name: ObjectName) -> Result<String> {
335 Self::resolve_single_name(name.0, "user name")
336 }
337
338 pub(super) fn bind_table_to_context(
340 &mut self,
341 columns: impl IntoIterator<Item = (bool, Field)>, table_name: String,
343 schema_name: Option<String>,
344 alias: Option<&TableAlias>,
345 ) -> Result<()> {
346 const EMPTY: [Ident; 0] = [];
347 let (resolved_schema_name, table_name, column_aliases, table_alias) = match alias {
348 None => (schema_name.clone(), table_name, &EMPTY[..], None),
349 Some(TableAlias { name, columns }) => (
350 None,
351 name.real_value(),
352 columns.as_slice(),
353 Some(table_name),
354 ),
355 };
356
357 let num_col_aliases = column_aliases.len();
358
359 let begin = self.context.columns.len();
360 let mut alias_iter = column_aliases.iter().fuse();
363 let mut index = 0;
364 columns.into_iter().for_each(|(is_hidden, mut field)| {
365 let name = match is_hidden {
366 true => field.name.clone(),
367 false => alias_iter
368 .next()
369 .map(|t| t.real_value())
370 .unwrap_or_else(|| field.name.clone()),
371 };
372 field.name.clone_from(&name);
373 self.context.columns.push(ColumnBinding::new(
374 table_name.clone(),
375 schema_name.clone(),
376 table_alias.clone(),
377 begin + index,
378 is_hidden,
379 field,
380 ));
381 self.context
382 .indices_of
383 .entry(name)
384 .or_default()
385 .push(self.context.columns.len() - 1);
386 index += 1;
387 });
388
389 let num_cols = index;
390 if num_cols < num_col_aliases {
391 return Err(ErrorCode::BindError(format!(
392 "table \"{table_name}\" has {num_cols} columns available but {num_col_aliases} column aliases specified",
393 ))
394 .into());
395 }
396
397 match self
398 .context
399 .range_of
400 .entry((resolved_schema_name, table_name.clone()))
401 {
402 Entry::Occupied(_) => Err(ErrorCode::InternalError(format!(
403 "Duplicated table name while binding table to context: {}",
404 table_name
405 ))
406 .into()),
407 Entry::Vacant(entry) => {
408 entry.insert((begin, self.context.columns.len()));
409 Ok(())
410 }
411 }
412 }
413
414 pub fn bind_relation_by_name(
419 &mut self,
420 name: &ObjectName,
421 alias: Option<&TableAlias>,
422 as_of: Option<&AsOf>,
423 allow_cross_db: bool,
424 ) -> Result<Relation> {
425 let (db_name, schema_name, table_name) = if allow_cross_db {
426 Self::resolve_db_schema_qualified_name(name)?
427 } else {
428 let (schema_name, table_name) =
429 Self::resolve_schema_qualified_name(&self.db_name, name)?;
430 (None, schema_name, table_name)
431 };
432
433 if schema_name.is_none()
434 && let Some(item) = self.context.cte_to_relation.get(&table_name)
436 {
437 if as_of.is_some() {
440 return Err(ErrorCode::BindError(
441 "Right table of a temporal join should not be a CTE. \
442 It should be a table, index, or materialized view"
443 .to_owned(),
444 )
445 .into());
446 }
447
448 let BindingCte {
449 share_id,
450 state: cte_state,
451 alias: mut original_alias,
452 } = item.deref().borrow().clone();
453
454 debug_assert_eq!(original_alias.name.real_value(), table_name);
456
457 if let Some(from_alias) = alias {
458 original_alias.name = from_alias.name.clone();
459 original_alias.columns = original_alias
460 .columns
461 .into_iter()
462 .zip_longest(from_alias.columns.iter().cloned())
463 .map(EitherOrBoth::into_right)
464 .collect();
465 }
466
467 match cte_state {
468 BindingCteState::Bound { query } => {
469 let input = BoundShareInput::Query(query);
470 self.bind_table_to_context(
471 input.fields()?,
472 table_name,
473 None,
474 Some(&original_alias),
475 )?;
476 Ok(Relation::Share(Box::new(BoundShare { share_id, input })))
479 }
480 BindingCteState::ChangeLog { table } => {
481 let input = BoundShareInput::ChangeLog(table);
482 self.bind_table_to_context(
483 input.fields()?,
484 table_name,
485 None,
486 Some(&original_alias),
487 )?;
488 Ok(Relation::Share(Box::new(BoundShare { share_id, input })))
489 }
490 }
491 } else {
492 self.bind_catalog_relation_by_name(
493 db_name.as_deref(),
494 schema_name.as_deref(),
495 &table_name,
496 alias,
497 as_of,
498 false,
499 )
500 }
501 }
502
503 fn bind_relation_by_function_arg(
505 &mut self,
506 arg: Option<&FunctionArg>,
507 err_msg: &str,
508 ) -> Result<(Relation, ObjectName)> {
509 let Some(FunctionArg::Unnamed(FunctionArgExpr::Expr(expr))) = arg else {
510 return Err(ErrorCode::BindError(err_msg.to_owned()).into());
511 };
512 let table_name = match expr {
513 ParserExpr::Identifier(ident) => Ok::<_, RwError>(ObjectName(vec![ident.clone()])),
514 ParserExpr::CompoundIdentifier(idents) => Ok(ObjectName(idents.clone())),
515 _ => Err(ErrorCode::BindError(err_msg.to_owned()).into()),
516 }?;
517
518 Ok((
519 self.bind_relation_by_name(&table_name, None, None, true)?,
520 table_name,
521 ))
522 }
523
524 fn bind_column_by_function_args(
526 &mut self,
527 arg: Option<&FunctionArg>,
528 err_msg: &str,
529 ) -> Result<Box<InputRef>> {
530 if let Some(time_col_arg) = arg
531 && let Some(ExprImpl::InputRef(time_col)) =
532 self.bind_function_arg(time_col_arg)?.into_iter().next()
533 {
534 Ok(time_col)
535 } else {
536 Err(ErrorCode::BindError(err_msg.to_owned()).into())
537 }
538 }
539
540 fn bind_internal_table(
542 &mut self,
543 args: &[FunctionArg],
544 alias: Option<&TableAlias>,
545 ) -> Result<Relation> {
546 if args.is_empty() || args.len() > 2 {
547 return Err(
548 ErrorCode::BindError("usage: rw_table(table_id[,schema_name])".to_owned()).into(),
549 );
550 }
551
552 let table_id: TableId = args[0]
553 .to_string()
554 .parse::<u32>()
555 .map_err(|err| {
556 RwError::from(ErrorCode::BindError(format!(
557 "invalid table id: {}",
558 err.as_report()
559 )))
560 })?
561 .into();
562
563 let schema = args.get(1).map(|arg| arg.to_string());
564
565 let table_name = self.catalog.get_table_name_by_id(table_id)?;
566 self.bind_catalog_relation_by_name(None, schema.as_deref(), &table_name, alias, None, false)
567 }
568
569 pub(super) fn bind_table_factor(&mut self, table_factor: &TableFactor) -> Result<Relation> {
570 match table_factor {
571 TableFactor::Table { name, alias, as_of } => {
572 self.bind_relation_by_name(name, alias.as_ref(), as_of.as_ref(), true)
573 }
574 TableFactor::TableFunction {
575 name,
576 alias,
577 args,
578 with_ordinality,
579 } => {
580 self.try_mark_lateral_as_visible();
581 let result = self.bind_table_function(name, alias.as_ref(), args, *with_ordinality);
582 self.try_mark_lateral_as_invisible();
583 result
584 }
585 TableFactor::Derived {
586 lateral,
587 subquery,
588 alias,
589 } => {
590 if *lateral {
591 self.try_mark_lateral_as_visible();
593
594 let bound_subquery =
596 self.bind_subquery_relation(subquery, alias.as_ref(), true)?;
597
598 self.try_mark_lateral_as_invisible();
600 Ok(Relation::Subquery(Box::new(bound_subquery)))
601 } else {
602 self.push_lateral_context();
604 let bound_subquery =
605 self.bind_subquery_relation(subquery, alias.as_ref(), false)?;
606 self.pop_and_merge_lateral_context()?;
607 Ok(Relation::Subquery(Box::new(bound_subquery)))
608 }
609 }
610 TableFactor::NestedJoin(table_with_joins) => {
611 self.push_lateral_context();
612 let bound_join = self.bind_table_with_joins(table_with_joins)?;
613 self.pop_and_merge_lateral_context()?;
614 Ok(bound_join)
615 }
616 }
617 }
618}