risingwave_frontend/planner/
relation.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::ops::Deref;
17use std::rc::Rc;
18
19use either::Either;
20use itertools::Itertools;
21use risingwave_common::catalog::{
22    ColumnCatalog, Engine, Field, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, Schema,
23};
24use risingwave_common::types::{DataType, Interval, ScalarImpl};
25use risingwave_common::{bail, bail_not_implemented};
26use risingwave_sqlparser::ast::AsOf;
27
28use crate::binder::{
29    BoundBackCteRef, BoundBaseTable, BoundJoin, BoundShare, BoundShareInput, BoundSource,
30    BoundSystemTable, BoundWatermark, BoundWindowTableFunction, Relation, WindowTableFunctionKind,
31};
32use crate::error::{ErrorCode, Result};
33use crate::expr::{CastContext, Expr, ExprImpl, ExprType, FunctionCall, InputRef, Literal};
34use crate::optimizer::plan_node::generic::SourceNodeKind;
35use crate::optimizer::plan_node::{
36    LogicalApply, LogicalCteRef, LogicalHopWindow, LogicalJoin, LogicalPlanRef as PlanRef,
37    LogicalProject, LogicalScan, LogicalShare, LogicalSource, LogicalSysScan, LogicalTableFunction,
38    LogicalValues,
39};
40use crate::optimizer::property::Cardinality;
41use crate::planner::{PlanFor, Planner};
42use crate::utils::Condition;
43
44const ERROR_WINDOW_SIZE_ARG: &str =
45    "The size arg of window table function should be an interval literal.";
46
47impl Planner {
48    pub fn plan_relation(&mut self, relation: Relation) -> Result<PlanRef> {
49        match relation {
50            Relation::BaseTable(t) => self.plan_base_table(&t),
51            Relation::SystemTable(st) => self.plan_sys_table(*st),
52            // TODO: order is ignored in the subquery
53            Relation::Subquery(q) => Ok(self.plan_query(q.query)?.into_unordered_subplan()),
54            Relation::Join(join) => self.plan_join(*join),
55            Relation::Apply(join) => self.plan_apply(*join),
56            Relation::WindowTableFunction(tf) => self.plan_window_table_function(*tf),
57            Relation::Source(s) => self.plan_source(*s),
58            Relation::TableFunction {
59                expr: tf,
60                with_ordinality,
61            } => self.plan_table_function(tf, with_ordinality),
62            Relation::Watermark(tf) => self.plan_watermark(*tf),
63            // note that rcte (i.e., RecursiveUnion) is included *implicitly* in share.
64            Relation::Share(share) => self.plan_share(*share),
65            Relation::BackCteRef(cte_ref) => self.plan_cte_ref(*cte_ref),
66        }
67    }
68
69    pub(crate) fn plan_sys_table(&mut self, sys_table: BoundSystemTable) -> Result<PlanRef> {
70        Ok(LogicalSysScan::create(
71            sys_table.sys_table_catalog,
72            self.ctx(),
73            Cardinality::unknown(), // TODO(card): cardinality of system table
74        )
75        .into())
76    }
77
78    pub(super) fn plan_base_table(&mut self, base_table: &BoundBaseTable) -> Result<PlanRef> {
79        let as_of = base_table.as_of.clone();
80        let scan = LogicalScan::from_base_table(base_table, self.ctx(), as_of.clone());
81
82        match base_table.table_catalog.engine {
83            Engine::Hummock => {
84                match as_of {
85                    None
86                    | Some(AsOf::ProcessTime)
87                    | Some(AsOf::TimestampNum(_))
88                    | Some(AsOf::TimestampString(_))
89                    | Some(AsOf::ProcessTimeWithInterval(_)) => {}
90                    Some(AsOf::VersionNum(_)) | Some(AsOf::VersionString(_)) => {
91                        bail_not_implemented!("As Of Version is not supported yet.")
92                    }
93                };
94                Ok(scan.into())
95            }
96            Engine::Iceberg => {
97                let is_append_only = base_table.table_catalog.append_only;
98                let use_iceberg_source = match (self.plan_for(), is_append_only) {
99                    (PlanFor::StreamIcebergEngineInternal, _) => false,
100                    (PlanFor::BatchDql, _) => true,
101                    (PlanFor::Stream | PlanFor::Batch, is_append_only) => is_append_only,
102                };
103
104                if !use_iceberg_source {
105                    match as_of {
106                        None
107                        | Some(AsOf::VersionNum(_))
108                        | Some(AsOf::TimestampString(_))
109                        | Some(AsOf::TimestampNum(_)) => {}
110                        Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
111                            bail_not_implemented!("As Of ProcessTime() is not supported yet.")
112                        }
113                        Some(AsOf::VersionString(_)) => {
114                            bail_not_implemented!("As Of Version is not supported yet.")
115                        }
116                    }
117                    Ok(scan.into())
118                } else {
119                    match as_of {
120                        None
121                        | Some(AsOf::VersionNum(_))
122                        | Some(AsOf::TimestampString(_))
123                        | Some(AsOf::TimestampNum(_)) => {}
124                        Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
125                            bail_not_implemented!("As Of ProcessTime() is not supported yet.")
126                        }
127                        Some(AsOf::VersionString(_)) => {
128                            bail_not_implemented!("As Of Version is not supported yet.")
129                        }
130                    }
131                    let opt_ctx = self.ctx();
132                    let session = opt_ctx.session_ctx();
133                    let db_name = &session.database();
134                    let catalog_reader = session.env().catalog_reader().read_guard();
135                    let mut source_catalog = None;
136                    for schema in catalog_reader.iter_schemas(db_name).unwrap() {
137                        if schema
138                            .get_table_by_id(&base_table.table_catalog.id)
139                            .is_some()
140                        {
141                            source_catalog = schema.get_source_by_name(
142                                &base_table.table_catalog.iceberg_source_name().unwrap(),
143                            );
144                            break;
145                        }
146                    }
147                    if let Some(source_catalog) = source_catalog {
148                        let column_map: HashMap<String, (usize, ColumnCatalog)> = source_catalog
149                            .columns
150                            .clone()
151                            .into_iter()
152                            .enumerate()
153                            .map(|(i, column)| (column.name().to_owned(), (i, column)))
154                            .collect();
155                        let exprs = scan
156                            .table()
157                            .column_schema()
158                            .fields()
159                            .iter()
160                            .map(|field| {
161                                let source_filed_name = if field.name == ROW_ID_COLUMN_NAME {
162                                    RISINGWAVE_ICEBERG_ROW_ID
163                                } else {
164                                    &field.name
165                                };
166                                if let Some((i, source_column)) = column_map.get(source_filed_name)
167                                {
168                                    if source_column.column_desc.data_type == field.data_type {
169                                        ExprImpl::InputRef(
170                                            InputRef::new(*i, field.data_type.clone()).into(),
171                                        )
172                                    } else {
173                                        let mut input_ref = ExprImpl::InputRef(
174                                            InputRef::new(
175                                                *i,
176                                                source_column.column_desc.data_type.clone(),
177                                            )
178                                            .into(),
179                                        );
180                                        FunctionCall::cast_mut(
181                                            &mut input_ref,
182                                            &field.data_type(),
183                                            CastContext::Explicit,
184                                        )
185                                        .unwrap();
186                                        input_ref
187                                    }
188                                } else {
189                                    // fields like `_rw_timestamp`, would not be found in source.
190                                    ExprImpl::Literal(
191                                        Literal::new(None, field.data_type.clone()).into(),
192                                    )
193                                }
194                            })
195                            .collect_vec();
196                        let logical_source = LogicalSource::with_catalog(
197                            Rc::new(source_catalog.deref().clone()),
198                            SourceNodeKind::CreateMViewOrBatch,
199                            self.ctx(),
200                            as_of,
201                        )?;
202                        Ok(LogicalProject::new(logical_source.into(), exprs).into())
203                    } else {
204                        bail!(
205                            "failed to plan a iceberg engine table: {}. Can't find the corresponding iceberg source. Maybe you need to recreate the table",
206                            base_table.table_catalog.name()
207                        );
208                    }
209                }
210            }
211        }
212    }
213
214    pub(super) fn plan_source(&mut self, source: BoundSource) -> Result<PlanRef> {
215        if source.is_shareable_cdc_connector() {
216            Err(ErrorCode::InternalError(
217                "Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source. HINT: create TABLE from the source instead.".to_owned(),
218            )
219            .into())
220        } else {
221            let as_of = source.as_of.clone();
222            match as_of {
223                None
224                | Some(AsOf::VersionNum(_))
225                | Some(AsOf::TimestampString(_))
226                | Some(AsOf::TimestampNum(_)) => {}
227                Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
228                    bail_not_implemented!("As Of ProcessTime() is not supported yet.")
229                }
230                Some(AsOf::VersionString(_)) => {
231                    bail_not_implemented!("As Of Version is not supported yet.")
232                }
233            }
234
235            // validate the source has pk. We raise an error here to avoid panic in expect_stream_key later
236            // for a nicer error message.
237            if matches!(self.plan_for(), PlanFor::Stream) {
238                let has_pk =
239                    source.catalog.row_id_index.is_some() || !source.catalog.pk_col_ids.is_empty();
240                if !has_pk {
241                    // in older version, iceberg source doesn't have row_id, thus may hit this
242                    let is_iceberg = source.catalog.is_iceberg_connector();
243                    // only iceberg should hit this.
244                    debug_assert!(is_iceberg);
245                    if is_iceberg {
246                        return Err(ErrorCode::BindError(format!(
247                        "Cannot create a stream job from an iceberg source without a primary key.\nThe iceberg source might be created in an older version of RisingWave. Please try recreating the source.\nSource: {:?}",
248                        source.catalog
249                    ))
250                    .into());
251                    } else {
252                        return Err(ErrorCode::BindError(format!(
253                            "Cannot create a stream job from a source without a primary key.
254This is a bug. We would appreciate a bug report at:
255https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Fbug&template=bug_report.yml
256
257source: {:?}",
258                            source.catalog
259                        ))
260                        .into());
261                    }
262                }
263            }
264            Ok(LogicalSource::with_catalog(
265                Rc::new(source.catalog),
266                SourceNodeKind::CreateMViewOrBatch,
267                self.ctx(),
268                as_of,
269            )?
270            .into())
271        }
272    }
273
274    pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result<PlanRef> {
275        let left = self.plan_relation(join.left)?;
276        let right = self.plan_relation(join.right)?;
277        let join_type = join.join_type;
278        let on_clause = join.cond;
279        if on_clause.has_subquery() {
280            bail_not_implemented!("Subquery in join on condition");
281        } else {
282            Ok(LogicalJoin::create(left, right, join_type, on_clause))
283        }
284    }
285
286    pub(super) fn plan_apply(&mut self, mut join: BoundJoin) -> Result<PlanRef> {
287        let join_type = join.join_type;
288        let on_clause = join.cond;
289        if on_clause.has_subquery() {
290            bail_not_implemented!("Subquery in join on condition");
291        }
292
293        let correlated_id = self.ctx.next_correlated_id();
294        let correlated_indices = join
295            .right
296            .collect_correlated_indices_by_depth_and_assign_id(0, correlated_id);
297        let left = self.plan_relation(join.left)?;
298        let right = self.plan_relation(join.right)?;
299
300        Ok(LogicalApply::create(
301            left,
302            right,
303            join_type,
304            Condition::with_expr(on_clause),
305            correlated_id,
306            correlated_indices,
307            false,
308        ))
309    }
310
311    pub(super) fn plan_window_table_function(
312        &mut self,
313        table_function: BoundWindowTableFunction,
314    ) -> Result<PlanRef> {
315        use WindowTableFunctionKind::*;
316        match table_function.kind {
317            Tumble => self.plan_tumble_window(
318                table_function.input,
319                table_function.time_col,
320                table_function.args,
321            ),
322            Hop => self.plan_hop_window(
323                table_function.input,
324                table_function.time_col,
325                table_function.args,
326            ),
327        }
328    }
329
330    pub(super) fn plan_table_function(
331        &mut self,
332        table_function: ExprImpl,
333        with_ordinality: bool,
334    ) -> Result<PlanRef> {
335        // TODO: maybe we can unify LogicalTableFunction with LogicalValues
336        match table_function {
337            ExprImpl::TableFunction(tf) => {
338                Ok(LogicalTableFunction::new(*tf, with_ordinality, self.ctx()).into())
339            }
340            expr => {
341                let schema = Schema {
342                    // TODO: should be named
343                    fields: vec![Field::unnamed(expr.return_type())],
344                };
345                let expr_return_type = expr.return_type();
346                let root = LogicalValues::create(vec![vec![expr]], schema, self.ctx());
347                let input_ref = ExprImpl::from(InputRef::new(0, expr_return_type.clone()));
348                let mut exprs = if let DataType::Struct(st) = expr_return_type {
349                    st.iter()
350                        .enumerate()
351                        .map(|(i, (_, ty))| {
352                            let idx = ExprImpl::literal_int(i.try_into().unwrap());
353                            let args = vec![input_ref.clone(), idx];
354                            FunctionCall::new_unchecked(ExprType::Field, args, ty.clone()).into()
355                        })
356                        .collect()
357                } else {
358                    vec![input_ref]
359                };
360                if with_ordinality {
361                    exprs.push(ExprImpl::literal_bigint(1));
362                }
363                Ok(LogicalProject::create(root, exprs))
364            }
365        }
366    }
367
368    pub(super) fn plan_share(&mut self, share: BoundShare) -> Result<PlanRef> {
369        match share.input {
370            BoundShareInput::Query(Either::Left(nonrecursive_query)) => {
371                let id = share.share_id;
372                match self.share_cache.get(&id) {
373                    None => {
374                        let result = self
375                            .plan_query(nonrecursive_query)?
376                            .into_unordered_subplan();
377                        let logical_share = LogicalShare::create(result);
378                        self.share_cache.insert(id, logical_share.clone());
379                        Ok(logical_share)
380                    }
381                    Some(result) => Ok(result.clone()),
382                }
383            }
384            // for the recursive union in rcte
385            BoundShareInput::Query(Either::Right(recursive_union)) => self.plan_recursive_union(
386                *recursive_union.base,
387                *recursive_union.recursive,
388                share.share_id,
389            ),
390            BoundShareInput::ChangeLog(relation) => {
391                let id = share.share_id;
392                let result = self.plan_changelog(relation)?;
393                let logical_share = LogicalShare::create(result);
394                self.share_cache.insert(id, logical_share.clone());
395                Ok(logical_share)
396            }
397        }
398    }
399
400    pub(super) fn plan_watermark(&mut self, _watermark: BoundWatermark) -> Result<PlanRef> {
401        todo!("plan watermark");
402    }
403
404    pub(super) fn plan_cte_ref(&mut self, cte_ref: BoundBackCteRef) -> Result<PlanRef> {
405        // TODO: this is actually duplicated from `plan_recursive_union`, refactor?
406        let base = self.plan_set_expr(cte_ref.base, vec![], &[])?;
407        Ok(LogicalCteRef::create(cte_ref.share_id, base))
408    }
409
410    fn collect_col_data_types_for_tumble_window(relation: &Relation) -> Result<Vec<DataType>> {
411        let col_data_types = match relation {
412            Relation::Source(s) => s
413                .catalog
414                .columns
415                .iter()
416                .map(|col| col.data_type().clone())
417                .collect(),
418            Relation::BaseTable(t) => t
419                .table_catalog
420                .columns
421                .iter()
422                .map(|col| col.data_type().clone())
423                .collect(),
424            Relation::Subquery(q) => q.query.schema().data_types(),
425            Relation::Share(share) => share
426                .input
427                .fields()?
428                .into_iter()
429                .map(|(_, f)| f.data_type)
430                .collect(),
431            r => {
432                return Err(ErrorCode::BindError(format!(
433                    "Invalid input relation to tumble: {r:?}"
434                ))
435                .into());
436            }
437        };
438        Ok(col_data_types)
439    }
440
441    fn plan_tumble_window(
442        &mut self,
443        input: Relation,
444        time_col: InputRef,
445        args: Vec<ExprImpl>,
446    ) -> Result<PlanRef> {
447        let mut args = args.into_iter();
448        let col_data_types: Vec<_> = Self::collect_col_data_types_for_tumble_window(&input)?;
449
450        match (args.next(), args.next(), args.next()) {
451            (Some(window_size @ ExprImpl::Literal(_)), None, None) => {
452                let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
453                for (idx, col_dt) in col_data_types.iter().enumerate() {
454                    exprs.push(InputRef::new(idx, col_dt.clone()).into());
455                }
456                let window_start: ExprImpl = FunctionCall::new(
457                    ExprType::TumbleStart,
458                    vec![ExprImpl::InputRef(Box::new(time_col)), window_size.clone()],
459                )?
460                .into();
461                // TODO: `window_end` may be optimized to avoid double calculation of
462                // `tumble_start`, or we can depends on common expression
463                // optimization.
464                let window_end =
465                    FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
466                        .into();
467                exprs.push(window_start);
468                exprs.push(window_end);
469                let base = self.plan_relation(input)?;
470                let project = LogicalProject::create(base, exprs);
471                Ok(project)
472            }
473            (
474                Some(window_size @ ExprImpl::Literal(_)),
475                Some(window_offset @ ExprImpl::Literal(_)),
476                None,
477            ) => {
478                let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
479                for (idx, col_dt) in col_data_types.iter().enumerate() {
480                    exprs.push(InputRef::new(idx, col_dt.clone()).into());
481                }
482                let window_start: ExprImpl = FunctionCall::new(
483                    ExprType::TumbleStart,
484                    vec![
485                        ExprImpl::InputRef(Box::new(time_col)),
486                        window_size.clone(),
487                        window_offset,
488                    ],
489                )?
490                .into();
491                // TODO: `window_end` may be optimized to avoid double calculation of
492                // `tumble_start`, or we can depends on common expression
493                // optimization.
494                let window_end =
495                    FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
496                        .into();
497                exprs.push(window_start);
498                exprs.push(window_end);
499                let base = self.plan_relation(input)?;
500                let project = LogicalProject::create(base, exprs);
501                Ok(project)
502            }
503            _ => Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
504        }
505    }
506
507    fn plan_hop_window(
508        &mut self,
509        input: Relation,
510        time_col: InputRef,
511        args: Vec<ExprImpl>,
512    ) -> Result<PlanRef> {
513        let input = self.plan_relation(input)?;
514        let mut args = args.into_iter();
515        let Some((ExprImpl::Literal(window_slide), ExprImpl::Literal(window_size))) =
516            args.next_tuple()
517        else {
518            return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
519        };
520
521        let Some(ScalarImpl::Interval(window_slide)) = *window_slide.get_data() else {
522            return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
523        };
524        let Some(ScalarImpl::Interval(window_size)) = *window_size.get_data() else {
525            return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
526        };
527
528        let window_offset = match (args.next(), args.next()) {
529            (Some(ExprImpl::Literal(window_offset)), None) => match *window_offset.get_data() {
530                Some(ScalarImpl::Interval(window_offset)) => window_offset,
531                _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
532            },
533            (None, None) => Interval::from_month_day_usec(0, 0, 0),
534            _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
535        };
536
537        if !window_size.is_positive() || !window_slide.is_positive() {
538            return Err(ErrorCode::BindError(format!(
539                "window_size {} and window_slide {} must be positive",
540                window_size, window_slide
541            ))
542            .into());
543        }
544
545        if window_size.exact_div(&window_slide).is_none() {
546            return Err(ErrorCode::BindError(format!("Invalid arguments for HOP window function: window_size {} cannot be divided by window_slide {}",window_size, window_slide)).into());
547        }
548
549        Ok(LogicalHopWindow::create(
550            input,
551            time_col,
552            window_slide,
553            window_size,
554            window_offset,
555        ))
556    }
557}