1use std::collections::hash_map::Entry;
16use std::ops::Deref;
17
18use itertools::{EitherOrBoth, Itertools};
19use risingwave_common::bail;
20use risingwave_common::catalog::{Field, TableId};
21use risingwave_sqlparser::ast::{
22 AsOf, Expr as ParserExpr, FunctionArg, FunctionArgExpr, Ident, ObjectName, TableAlias,
23 TableFactor,
24};
25use thiserror::Error;
26use thiserror_ext::AsReport;
27
28use super::bind_context::ColumnBinding;
29use super::statement::RewriteExprsRecursive;
30use crate::binder::Binder;
31use crate::binder::bind_context::{BindingCte, BindingCteState};
32use crate::error::{ErrorCode, Result, RwError};
33use crate::expr::{ExprImpl, InputRef};
34
35mod gap_fill;
36mod join;
37mod share;
38mod subquery;
39mod table_function;
40mod table_or_source;
41mod watermark;
42mod window_table_function;
43
44pub use gap_fill::BoundGapFill;
45pub use join::BoundJoin;
46pub use share::{BoundShare, BoundShareInput};
47pub use subquery::BoundSubquery;
48pub use table_or_source::{BoundBaseTable, BoundSource, BoundSystemTable};
49pub use watermark::BoundWatermark;
50pub use window_table_function::{BoundWindowTableFunction, WindowTableFunctionKind};
51
52use crate::expr::{CorrelatedId, Depth};
53
54#[derive(Debug, Clone)]
57pub enum Relation {
58 Source(Box<BoundSource>),
59 BaseTable(Box<BoundBaseTable>),
60 SystemTable(Box<BoundSystemTable>),
61 Subquery(Box<BoundSubquery>),
62 Join(Box<BoundJoin>),
63 Apply(Box<BoundJoin>),
64 WindowTableFunction(Box<BoundWindowTableFunction>),
65 TableFunction {
67 expr: ExprImpl,
68 with_ordinality: bool,
69 },
70 Watermark(Box<BoundWatermark>),
71 Share(Box<BoundShare>),
72 GapFill(Box<BoundGapFill>),
73}
74
75impl RewriteExprsRecursive for Relation {
76 fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
77 match self {
78 Relation::Subquery(inner) => inner.rewrite_exprs_recursive(rewriter),
79 Relation::Join(inner) => inner.rewrite_exprs_recursive(rewriter),
80 Relation::Apply(inner) => inner.rewrite_exprs_recursive(rewriter),
81 Relation::WindowTableFunction(inner) => inner.rewrite_exprs_recursive(rewriter),
82 Relation::Watermark(inner) => inner.rewrite_exprs_recursive(rewriter),
83 Relation::Share(inner) => inner.rewrite_exprs_recursive(rewriter),
84 Relation::TableFunction { expr: inner, .. } => {
85 *inner = rewriter.rewrite_expr(inner.take())
86 }
87 _ => {}
88 }
89 }
90}
91
92impl Relation {
93 pub fn is_correlated_by_depth(&self, depth: Depth) -> bool {
94 match self {
95 Relation::Subquery(subquery) => subquery.query.is_correlated_by_depth(depth),
96 Relation::Join(join) | Relation::Apply(join) => {
97 join.cond.has_correlated_input_ref_by_depth(depth)
98 || join.left.is_correlated_by_depth(depth)
99 || join.right.is_correlated_by_depth(depth)
100 }
101 _ => false,
102 }
103 }
104
105 pub fn is_correlated_by_correlated_id(&self, correlated_id: CorrelatedId) -> bool {
106 match self {
107 Relation::Subquery(subquery) => {
108 subquery.query.is_correlated_by_correlated_id(correlated_id)
109 }
110 Relation::Join(join) | Relation::Apply(join) => {
111 join.cond
112 .has_correlated_input_ref_by_correlated_id(correlated_id)
113 || join.left.is_correlated_by_correlated_id(correlated_id)
114 || join.right.is_correlated_by_correlated_id(correlated_id)
115 }
116 _ => false,
117 }
118 }
119
120 pub fn collect_correlated_indices_by_depth_and_assign_id(
121 &mut self,
122 depth: Depth,
123 correlated_id: CorrelatedId,
124 ) -> Vec<usize> {
125 match self {
126 Relation::Subquery(subquery) => subquery
127 .query
128 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
129 Relation::Join(join) | Relation::Apply(join) => {
130 let mut correlated_indices = vec![];
131 correlated_indices.extend(
132 join.cond
133 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
134 );
135 correlated_indices.extend(
136 join.left
137 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
138 );
139 correlated_indices.extend(
140 join.right
141 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
142 );
143 correlated_indices
144 }
145 Relation::TableFunction {
146 expr: table_function,
147 with_ordinality: _,
148 } => table_function
149 .collect_correlated_indices_by_depth_and_assign_id(depth + 1, correlated_id),
150 Relation::Share(share) => match &mut share.input {
151 BoundShareInput::Query(query) => {
152 query.collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id)
153 }
154 BoundShareInput::ChangeLog(change_log) => change_log
155 .collect_correlated_indices_by_depth_and_assign_id(depth, correlated_id),
156 },
157 _ => vec![],
158 }
159 }
160}
161
162#[derive(Debug)]
163#[non_exhaustive]
164pub enum ResolveQualifiedNameErrorKind {
165 QualifiedNameTooLong,
166 NotCurrentDatabase,
167}
168
169#[derive(Debug, Error)]
170pub struct ResolveQualifiedNameError {
171 qualified: String,
172 kind: ResolveQualifiedNameErrorKind,
173}
174
175impl std::fmt::Display for ResolveQualifiedNameError {
176 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177 match self.kind {
178 ResolveQualifiedNameErrorKind::QualifiedNameTooLong => write!(
179 f,
180 "improper qualified name (too many dotted names): {}",
181 self.qualified
182 ),
183 ResolveQualifiedNameErrorKind::NotCurrentDatabase => write!(
184 f,
185 "cross-database references are not implemented: \"{}\"",
186 self.qualified
187 ),
188 }
189 }
190}
191
192impl ResolveQualifiedNameError {
193 pub fn new(qualified: String, kind: ResolveQualifiedNameErrorKind) -> Self {
194 Self { qualified, kind }
195 }
196}
197
198impl From<ResolveQualifiedNameError> for RwError {
199 fn from(e: ResolveQualifiedNameError) -> Self {
200 ErrorCode::InvalidInputSyntax(format!("{}", e.as_report())).into()
201 }
202}
203
204impl Binder {
205 pub fn resolve_schema_qualified_name(
207 db_name: &str,
208 name: &ObjectName,
209 ) -> std::result::Result<(Option<String>, String), ResolveQualifiedNameError> {
210 let formatted_name = name.to_string();
211 let mut identifiers = name.0.clone();
212
213 if identifiers.len() > 3 {
214 return Err(ResolveQualifiedNameError::new(
215 formatted_name,
216 ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
217 ));
218 }
219
220 let name = identifiers.pop().unwrap().real_value();
221
222 let schema_name = identifiers.pop().map(|ident| ident.real_value());
223 let database_name = identifiers.pop().map(|ident| ident.real_value());
224
225 if let Some(database_name) = database_name
226 && database_name != db_name
227 {
228 return Err(ResolveQualifiedNameError::new(
229 formatted_name,
230 ResolveQualifiedNameErrorKind::NotCurrentDatabase,
231 ));
232 }
233
234 Ok((schema_name, name))
235 }
236
237 pub fn validate_cross_db_reference(
239 db_name: &str,
240 name: &ObjectName,
241 ) -> std::result::Result<(), ResolveQualifiedNameError> {
242 let formatted_name = name.to_string();
243 let identifiers = &name.0;
244 if identifiers.len() > 3 {
245 return Err(ResolveQualifiedNameError::new(
246 formatted_name,
247 ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
248 ));
249 }
250
251 if identifiers.len() == 3 && identifiers[0].real_value() != db_name {
252 return Err(ResolveQualifiedNameError::new(
253 formatted_name,
254 ResolveQualifiedNameErrorKind::NotCurrentDatabase,
255 ));
256 }
257
258 Ok(())
259 }
260
261 pub fn resolve_db_schema_qualified_name(
263 name: &ObjectName,
264 ) -> std::result::Result<(Option<String>, Option<String>, String), ResolveQualifiedNameError>
265 {
266 let formatted_name = name.to_string();
267 let mut identifiers = name.0.clone();
268
269 if identifiers.len() > 3 {
270 return Err(ResolveQualifiedNameError::new(
271 formatted_name,
272 ResolveQualifiedNameErrorKind::QualifiedNameTooLong,
273 ));
274 }
275
276 let name = identifiers.pop().unwrap().real_value();
277 let schema_name = identifiers.pop().map(|ident| ident.real_value());
278 let database_name = identifiers.pop().map(|ident| ident.real_value());
279
280 Ok((database_name, schema_name, name))
281 }
282
283 fn resolve_single_name(mut identifiers: Vec<Ident>, ident_desc: &str) -> Result<String> {
285 if identifiers.len() > 1 {
286 bail!("{} must contain 1 argument", ident_desc);
287 }
288 let name = identifiers.pop().unwrap().real_value();
289
290 Ok(name)
291 }
292
293 pub fn resolve_database_name(name: ObjectName) -> Result<String> {
295 Self::resolve_single_name(name.0, "database name")
296 }
297
298 pub fn resolve_schema_name(name: ObjectName) -> Result<String> {
300 Self::resolve_single_name(name.0, "schema name")
301 }
302
303 pub fn resolve_index_name(name: ObjectName) -> Result<String> {
305 Self::resolve_single_name(name.0, "index name")
306 }
307
308 pub fn resolve_view_name(name: ObjectName) -> Result<String> {
310 Self::resolve_single_name(name.0, "view name")
311 }
312
313 pub fn resolve_sink_name(name: ObjectName) -> Result<String> {
315 Self::resolve_single_name(name.0, "sink name")
316 }
317
318 pub fn resolve_subscription_name(name: ObjectName) -> Result<String> {
320 Self::resolve_single_name(name.0, "subscription name")
321 }
322
323 pub fn resolve_table_name(name: ObjectName) -> Result<String> {
325 Self::resolve_single_name(name.0, "table name")
326 }
327
328 pub fn resolve_source_name(name: ObjectName) -> Result<String> {
330 Self::resolve_single_name(name.0, "source name")
331 }
332
333 pub fn resolve_user_name(name: ObjectName) -> Result<String> {
335 Self::resolve_single_name(name.0, "user name")
336 }
337
338 pub(super) fn bind_table_to_context(
340 &mut self,
341 columns: impl IntoIterator<Item = (bool, Field)>, table_name: String,
343 alias: Option<&TableAlias>,
344 ) -> Result<()> {
345 const EMPTY: [Ident; 0] = [];
346 let (table_name, column_aliases) = match alias {
347 None => (table_name, &EMPTY[..]),
348 Some(TableAlias { name, columns }) => (name.real_value(), columns.as_slice()),
349 };
350
351 let num_col_aliases = column_aliases.len();
352
353 let begin = self.context.columns.len();
354 let mut alias_iter = column_aliases.iter().fuse();
357 let mut index = 0;
358 columns.into_iter().for_each(|(is_hidden, mut field)| {
359 let name = match is_hidden {
360 true => field.name.clone(),
361 false => alias_iter
362 .next()
363 .map(|t| t.real_value())
364 .unwrap_or_else(|| field.name.clone()),
365 };
366 field.name.clone_from(&name);
367 self.context.columns.push(ColumnBinding::new(
368 table_name.clone(),
369 begin + index,
370 is_hidden,
371 field,
372 ));
373 self.context
374 .indices_of
375 .entry(name)
376 .or_default()
377 .push(self.context.columns.len() - 1);
378 index += 1;
379 });
380
381 let num_cols = index;
382 if num_cols < num_col_aliases {
383 return Err(ErrorCode::BindError(format!(
384 "table \"{table_name}\" has {num_cols} columns available but {num_col_aliases} column aliases specified",
385 ))
386 .into());
387 }
388
389 match self.context.range_of.entry(table_name.clone()) {
390 Entry::Occupied(_) => Err(ErrorCode::InternalError(format!(
391 "Duplicated table name while binding table to context: {}",
392 table_name
393 ))
394 .into()),
395 Entry::Vacant(entry) => {
396 entry.insert((begin, self.context.columns.len()));
397 Ok(())
398 }
399 }
400 }
401
402 pub fn bind_relation_by_name(
407 &mut self,
408 name: &ObjectName,
409 alias: Option<&TableAlias>,
410 as_of: Option<&AsOf>,
411 allow_cross_db: bool,
412 ) -> Result<Relation> {
413 let (db_name, schema_name, table_name) = if allow_cross_db {
414 Self::resolve_db_schema_qualified_name(name)?
415 } else {
416 let (schema_name, table_name) =
417 Self::resolve_schema_qualified_name(&self.db_name, name)?;
418 (None, schema_name, table_name)
419 };
420
421 if schema_name.is_none()
422 && let Some(item) = self.context.cte_to_relation.get(&table_name)
424 {
425 if as_of.is_some() {
428 return Err(ErrorCode::BindError(
429 "Right table of a temporal join should not be a CTE. \
430 It should be a table, index, or materialized view"
431 .to_owned(),
432 )
433 .into());
434 }
435
436 let BindingCte {
437 share_id,
438 state: cte_state,
439 alias: mut original_alias,
440 } = item.deref().borrow().clone();
441
442 debug_assert_eq!(original_alias.name.real_value(), table_name);
444
445 if let Some(from_alias) = alias {
446 original_alias.name = from_alias.name.clone();
447 original_alias.columns = original_alias
448 .columns
449 .into_iter()
450 .zip_longest(from_alias.columns.iter().cloned())
451 .map(EitherOrBoth::into_right)
452 .collect();
453 }
454
455 match cte_state {
456 BindingCteState::Bound { query } => {
457 let input = BoundShareInput::Query(query);
458 self.bind_table_to_context(input.fields()?, table_name, Some(&original_alias))?;
459 Ok(Relation::Share(Box::new(BoundShare { share_id, input })))
462 }
463 BindingCteState::ChangeLog { table } => {
464 let input = BoundShareInput::ChangeLog(table);
465 self.bind_table_to_context(input.fields()?, table_name, Some(&original_alias))?;
466 Ok(Relation::Share(Box::new(BoundShare { share_id, input })))
467 }
468 }
469 } else {
470 self.bind_catalog_relation_by_name(
471 db_name.as_deref(),
472 schema_name.as_deref(),
473 &table_name,
474 alias,
475 as_of,
476 false,
477 )
478 }
479 }
480
481 fn bind_relation_by_function_arg(
483 &mut self,
484 arg: Option<&FunctionArg>,
485 err_msg: &str,
486 ) -> Result<(Relation, ObjectName)> {
487 let Some(FunctionArg::Unnamed(FunctionArgExpr::Expr(expr))) = arg else {
488 return Err(ErrorCode::BindError(err_msg.to_owned()).into());
489 };
490 let table_name = match expr {
491 ParserExpr::Identifier(ident) => Ok::<_, RwError>(ObjectName(vec![ident.clone()])),
492 ParserExpr::CompoundIdentifier(idents) => Ok(ObjectName(idents.clone())),
493 _ => Err(ErrorCode::BindError(err_msg.to_owned()).into()),
494 }?;
495
496 Ok((
497 self.bind_relation_by_name(&table_name, None, None, true)?,
498 table_name,
499 ))
500 }
501
502 fn bind_column_by_function_args(
504 &mut self,
505 arg: Option<&FunctionArg>,
506 err_msg: &str,
507 ) -> Result<Box<InputRef>> {
508 if let Some(time_col_arg) = arg
509 && let Some(ExprImpl::InputRef(time_col)) =
510 self.bind_function_arg(time_col_arg)?.into_iter().next()
511 {
512 Ok(time_col)
513 } else {
514 Err(ErrorCode::BindError(err_msg.to_owned()).into())
515 }
516 }
517
518 fn bind_internal_table(
520 &mut self,
521 args: &[FunctionArg],
522 alias: Option<&TableAlias>,
523 ) -> Result<Relation> {
524 if args.is_empty() || args.len() > 2 {
525 return Err(
526 ErrorCode::BindError("usage: rw_table(table_id[,schema_name])".to_owned()).into(),
527 );
528 }
529
530 let table_id: TableId = args[0]
531 .to_string()
532 .parse::<u32>()
533 .map_err(|err| {
534 RwError::from(ErrorCode::BindError(format!(
535 "invalid table id: {}",
536 err.as_report()
537 )))
538 })?
539 .into();
540
541 let schema = args.get(1).map(|arg| arg.to_string());
542
543 let table_name = self.catalog.get_table_name_by_id(table_id)?;
544 self.bind_catalog_relation_by_name(None, schema.as_deref(), &table_name, alias, None, false)
545 }
546
547 pub(super) fn bind_table_factor(&mut self, table_factor: &TableFactor) -> Result<Relation> {
548 match table_factor {
549 TableFactor::Table { name, alias, as_of } => {
550 self.bind_relation_by_name(name, alias.as_ref(), as_of.as_ref(), true)
551 }
552 TableFactor::TableFunction {
553 name,
554 alias,
555 args,
556 with_ordinality,
557 } => {
558 self.try_mark_lateral_as_visible();
559 let result = self.bind_table_function(name, alias.as_ref(), args, *with_ordinality);
560 self.try_mark_lateral_as_invisible();
561 result
562 }
563 TableFactor::Derived {
564 lateral,
565 subquery,
566 alias,
567 } => {
568 if *lateral {
569 self.try_mark_lateral_as_visible();
571
572 let bound_subquery =
574 self.bind_subquery_relation(subquery, alias.as_ref(), true)?;
575
576 self.try_mark_lateral_as_invisible();
578 Ok(Relation::Subquery(Box::new(bound_subquery)))
579 } else {
580 self.push_lateral_context();
582 let bound_subquery =
583 self.bind_subquery_relation(subquery, alias.as_ref(), false)?;
584 self.pop_and_merge_lateral_context()?;
585 Ok(Relation::Subquery(Box::new(bound_subquery)))
586 }
587 }
588 TableFactor::NestedJoin(table_with_joins) => {
589 self.push_lateral_context();
590 let bound_join = self.bind_table_with_joins(table_with_joins)?;
591 self.pop_and_merge_lateral_context()?;
592 Ok(bound_join)
593 }
594 }
595 }
596}