risingwave_frontend/planner/
relation.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::ops::Deref;
17use std::rc::Rc;
18
19use either::Either;
20use itertools::Itertools;
21use risingwave_common::catalog::{
22    ColumnCatalog, Engine, Field, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, Schema,
23};
24use risingwave_common::types::{DataType, Interval, ScalarImpl};
25use risingwave_common::{bail, bail_not_implemented};
26use risingwave_sqlparser::ast::AsOf;
27
28use crate::binder::{
29    BoundBackCteRef, BoundBaseTable, BoundJoin, BoundShare, BoundShareInput, BoundSource,
30    BoundSystemTable, BoundWatermark, BoundWindowTableFunction, Relation, WindowTableFunctionKind,
31};
32use crate::error::{ErrorCode, Result};
33use crate::expr::{CastContext, Expr, ExprImpl, ExprType, FunctionCall, InputRef, Literal};
34use crate::optimizer::plan_node::generic::SourceNodeKind;
35use crate::optimizer::plan_node::{
36    LogicalApply, LogicalCteRef, LogicalHopWindow, LogicalJoin, LogicalProject, LogicalScan,
37    LogicalShare, LogicalSource, LogicalSysScan, LogicalTableFunction, LogicalValues, PlanRef,
38};
39use crate::optimizer::property::Cardinality;
40use crate::planner::{PlanFor, Planner};
41use crate::utils::Condition;
42
43const ERROR_WINDOW_SIZE_ARG: &str =
44    "The size arg of window table function should be an interval literal.";
45
46impl Planner {
47    pub fn plan_relation(&mut self, relation: Relation) -> Result<PlanRef> {
48        match relation {
49            Relation::BaseTable(t) => self.plan_base_table(&t),
50            Relation::SystemTable(st) => self.plan_sys_table(*st),
51            // TODO: order is ignored in the subquery
52            Relation::Subquery(q) => Ok(self.plan_query(q.query)?.into_unordered_subplan()),
53            Relation::Join(join) => self.plan_join(*join),
54            Relation::Apply(join) => self.plan_apply(*join),
55            Relation::WindowTableFunction(tf) => self.plan_window_table_function(*tf),
56            Relation::Source(s) => self.plan_source(*s),
57            Relation::TableFunction {
58                expr: tf,
59                with_ordinality,
60            } => self.plan_table_function(tf, with_ordinality),
61            Relation::Watermark(tf) => self.plan_watermark(*tf),
62            // note that rcte (i.e., RecursiveUnion) is included *implicitly* in share.
63            Relation::Share(share) => self.plan_share(*share),
64            Relation::BackCteRef(cte_ref) => self.plan_cte_ref(*cte_ref),
65        }
66    }
67
68    pub(crate) fn plan_sys_table(&mut self, sys_table: BoundSystemTable) -> Result<PlanRef> {
69        Ok(LogicalSysScan::create(
70            sys_table.sys_table_catalog.name().to_owned(),
71            Rc::new(sys_table.sys_table_catalog.table_desc()),
72            self.ctx(),
73            Cardinality::unknown(), // TODO(card): cardinality of system table
74        )
75        .into())
76    }
77
78    pub(super) fn plan_base_table(&mut self, base_table: &BoundBaseTable) -> Result<PlanRef> {
79        let as_of = base_table.as_of.clone();
80        let table_cardinality = base_table.table_catalog.cardinality;
81        let scan = LogicalScan::create(
82            base_table.table_catalog.name().to_owned(),
83            base_table.table_catalog.clone(),
84            base_table
85                .table_indexes
86                .iter()
87                .map(|x| x.as_ref().clone().into())
88                .collect(),
89            self.ctx(),
90            as_of.clone(),
91            table_cardinality,
92        );
93
94        match base_table.table_catalog.engine {
95            Engine::Hummock => {
96                match as_of {
97                    None
98                    | Some(AsOf::ProcessTime)
99                    | Some(AsOf::TimestampNum(_))
100                    | Some(AsOf::TimestampString(_))
101                    | Some(AsOf::ProcessTimeWithInterval(_)) => {}
102                    Some(AsOf::VersionNum(_)) | Some(AsOf::VersionString(_)) => {
103                        bail_not_implemented!("As Of Version is not supported yet.")
104                    }
105                };
106                Ok(scan.into())
107            }
108            Engine::Iceberg => {
109                let is_append_only = base_table.table_catalog.append_only;
110                let use_iceberg_source = match (self.plan_for(), is_append_only) {
111                    (PlanFor::StreamIcebergEngineInternal, _) => false,
112                    (PlanFor::BatchDql, _) => true,
113                    (PlanFor::Stream | PlanFor::Batch, is_append_only) => is_append_only,
114                };
115
116                if !use_iceberg_source {
117                    match as_of {
118                        None
119                        | Some(AsOf::VersionNum(_))
120                        | Some(AsOf::TimestampString(_))
121                        | Some(AsOf::TimestampNum(_)) => {}
122                        Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
123                            bail_not_implemented!("As Of ProcessTime() is not supported yet.")
124                        }
125                        Some(AsOf::VersionString(_)) => {
126                            bail_not_implemented!("As Of Version is not supported yet.")
127                        }
128                    }
129                    Ok(scan.into())
130                } else {
131                    match as_of {
132                        None
133                        | Some(AsOf::VersionNum(_))
134                        | Some(AsOf::TimestampString(_))
135                        | Some(AsOf::TimestampNum(_)) => {}
136                        Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
137                            bail_not_implemented!("As Of ProcessTime() is not supported yet.")
138                        }
139                        Some(AsOf::VersionString(_)) => {
140                            bail_not_implemented!("As Of Version is not supported yet.")
141                        }
142                    }
143                    let opt_ctx = self.ctx();
144                    let session = opt_ctx.session_ctx();
145                    let db_name = &session.database();
146                    let catalog_reader = session.env().catalog_reader().read_guard();
147                    let mut source_catalog = None;
148                    for schema in catalog_reader.iter_schemas(db_name).unwrap() {
149                        if schema
150                            .get_table_by_id(&base_table.table_catalog.id)
151                            .is_some()
152                        {
153                            source_catalog = schema.get_source_by_name(
154                                &base_table.table_catalog.iceberg_source_name().unwrap(),
155                            );
156                            break;
157                        }
158                    }
159                    if let Some(source_catalog) = source_catalog {
160                        let column_map: HashMap<String, (usize, ColumnCatalog)> = source_catalog
161                            .columns
162                            .clone()
163                            .into_iter()
164                            .enumerate()
165                            .map(|(i, column)| (column.name().to_owned(), (i, column)))
166                            .collect();
167                        let exprs = scan
168                            .table_catalog()
169                            .column_schema()
170                            .fields()
171                            .iter()
172                            .map(|field| {
173                                let source_filed_name = if field.name == ROW_ID_COLUMN_NAME {
174                                    RISINGWAVE_ICEBERG_ROW_ID
175                                } else {
176                                    &field.name
177                                };
178                                if let Some((i, source_column)) = column_map.get(source_filed_name)
179                                {
180                                    if source_column.column_desc.data_type == field.data_type {
181                                        ExprImpl::InputRef(
182                                            InputRef::new(*i, field.data_type.clone()).into(),
183                                        )
184                                    } else {
185                                        let mut input_ref = ExprImpl::InputRef(
186                                            InputRef::new(
187                                                *i,
188                                                source_column.column_desc.data_type.clone(),
189                                            )
190                                            .into(),
191                                        );
192                                        FunctionCall::cast_mut(
193                                            &mut input_ref,
194                                            field.data_type().clone(),
195                                            CastContext::Explicit,
196                                        )
197                                        .unwrap();
198                                        input_ref
199                                    }
200                                } else {
201                                    // fields like `_rw_timestamp`, would not be found in source.
202                                    ExprImpl::Literal(
203                                        Literal::new(None, field.data_type.clone()).into(),
204                                    )
205                                }
206                            })
207                            .collect_vec();
208                        let logical_source = LogicalSource::with_catalog(
209                            Rc::new(source_catalog.deref().clone()),
210                            SourceNodeKind::CreateMViewOrBatch,
211                            self.ctx(),
212                            as_of,
213                        )?;
214                        Ok(LogicalProject::new(logical_source.into(), exprs).into())
215                    } else {
216                        bail!(
217                            "failed to plan a iceberg engine table: {}. Can't find the corresponding iceberg source. Maybe you need to recreate the table",
218                            base_table.table_catalog.name()
219                        );
220                    }
221                }
222            }
223        }
224    }
225
226    pub(super) fn plan_source(&mut self, source: BoundSource) -> Result<PlanRef> {
227        if source.is_shareable_cdc_connector() {
228            Err(ErrorCode::InternalError(
229                "Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source. HINT: create TABLE from the source instead.".to_owned(),
230            )
231            .into())
232        } else {
233            let as_of = source.as_of.clone();
234            match as_of {
235                None
236                | Some(AsOf::VersionNum(_))
237                | Some(AsOf::TimestampString(_))
238                | Some(AsOf::TimestampNum(_)) => {}
239                Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
240                    bail_not_implemented!("As Of ProcessTime() is not supported yet.")
241                }
242                Some(AsOf::VersionString(_)) => {
243                    bail_not_implemented!("As Of Version is not supported yet.")
244                }
245            }
246
247            // validate the source has pk. We raise an error here to avoid panic in expect_stream_key later
248            // for a nicer error message.
249            if matches!(self.plan_for(), PlanFor::Stream) {
250                let has_pk =
251                    source.catalog.row_id_index.is_some() || !source.catalog.pk_col_ids.is_empty();
252                if !has_pk {
253                    // in older version, iceberg source doesn't have row_id, thus may hit this
254                    let is_iceberg = source.catalog.is_iceberg_connector();
255                    // only iceberg should hit this.
256                    debug_assert!(is_iceberg);
257                    if is_iceberg {
258                        return Err(ErrorCode::BindError(format!(
259                        "Cannot create a stream job from an iceberg source without a primary key.\nThe iceberg source might be created in an older version of RisingWave. Please try recreating the source.\nSource: {:?}",
260                        source.catalog
261                    ))
262                    .into());
263                    } else {
264                        return Err(ErrorCode::BindError(format!(
265                            "Cannot create a stream job from a source without a primary key.
266This is a bug. We would appreciate a bug report at:
267https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Fbug&template=bug_report.yml
268
269source: {:?}",
270                            source.catalog
271                        ))
272                        .into());
273                    }
274                }
275            }
276            Ok(LogicalSource::with_catalog(
277                Rc::new(source.catalog),
278                SourceNodeKind::CreateMViewOrBatch,
279                self.ctx(),
280                as_of,
281            )?
282            .into())
283        }
284    }
285
286    pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result<PlanRef> {
287        let left = self.plan_relation(join.left)?;
288        let right = self.plan_relation(join.right)?;
289        let join_type = join.join_type;
290        let on_clause = join.cond;
291        if on_clause.has_subquery() {
292            bail_not_implemented!("Subquery in join on condition");
293        } else {
294            Ok(LogicalJoin::create(left, right, join_type, on_clause))
295        }
296    }
297
298    pub(super) fn plan_apply(&mut self, mut join: BoundJoin) -> Result<PlanRef> {
299        let join_type = join.join_type;
300        let on_clause = join.cond;
301        if on_clause.has_subquery() {
302            bail_not_implemented!("Subquery in join on condition");
303        }
304
305        let correlated_id = self.ctx.next_correlated_id();
306        let correlated_indices = join
307            .right
308            .collect_correlated_indices_by_depth_and_assign_id(0, correlated_id);
309        let left = self.plan_relation(join.left)?;
310        let right = self.plan_relation(join.right)?;
311
312        Ok(LogicalApply::create(
313            left,
314            right,
315            join_type,
316            Condition::with_expr(on_clause),
317            correlated_id,
318            correlated_indices,
319            false,
320        ))
321    }
322
323    pub(super) fn plan_window_table_function(
324        &mut self,
325        table_function: BoundWindowTableFunction,
326    ) -> Result<PlanRef> {
327        use WindowTableFunctionKind::*;
328        match table_function.kind {
329            Tumble => self.plan_tumble_window(
330                table_function.input,
331                table_function.time_col,
332                table_function.args,
333            ),
334            Hop => self.plan_hop_window(
335                table_function.input,
336                table_function.time_col,
337                table_function.args,
338            ),
339        }
340    }
341
342    pub(super) fn plan_table_function(
343        &mut self,
344        table_function: ExprImpl,
345        with_ordinality: bool,
346    ) -> Result<PlanRef> {
347        // TODO: maybe we can unify LogicalTableFunction with LogicalValues
348        match table_function {
349            ExprImpl::TableFunction(tf) => {
350                Ok(LogicalTableFunction::new(*tf, with_ordinality, self.ctx()).into())
351            }
352            expr => {
353                let schema = Schema {
354                    // TODO: should be named
355                    fields: vec![Field::unnamed(expr.return_type())],
356                };
357                let expr_return_type = expr.return_type();
358                let root = LogicalValues::create(vec![vec![expr]], schema, self.ctx());
359                let input_ref = ExprImpl::from(InputRef::new(0, expr_return_type.clone()));
360                let mut exprs = if let DataType::Struct(st) = expr_return_type {
361                    st.iter()
362                        .enumerate()
363                        .map(|(i, (_, ty))| {
364                            let idx = ExprImpl::literal_int(i.try_into().unwrap());
365                            let args = vec![input_ref.clone(), idx];
366                            FunctionCall::new_unchecked(ExprType::Field, args, ty.clone()).into()
367                        })
368                        .collect()
369                } else {
370                    vec![input_ref]
371                };
372                if with_ordinality {
373                    exprs.push(ExprImpl::literal_bigint(1));
374                }
375                Ok(LogicalProject::create(root, exprs))
376            }
377        }
378    }
379
380    pub(super) fn plan_share(&mut self, share: BoundShare) -> Result<PlanRef> {
381        match share.input {
382            BoundShareInput::Query(Either::Left(nonrecursive_query)) => {
383                let id = share.share_id;
384                match self.share_cache.get(&id) {
385                    None => {
386                        let result = self
387                            .plan_query(nonrecursive_query)?
388                            .into_unordered_subplan();
389                        let logical_share = LogicalShare::create(result);
390                        self.share_cache.insert(id, logical_share.clone());
391                        Ok(logical_share)
392                    }
393                    Some(result) => Ok(result.clone()),
394                }
395            }
396            // for the recursive union in rcte
397            BoundShareInput::Query(Either::Right(recursive_union)) => self.plan_recursive_union(
398                *recursive_union.base,
399                *recursive_union.recursive,
400                share.share_id,
401            ),
402            BoundShareInput::ChangeLog(relation) => {
403                let id = share.share_id;
404                let result = self.plan_changelog(relation)?;
405                let logical_share = LogicalShare::create(result);
406                self.share_cache.insert(id, logical_share.clone());
407                Ok(logical_share)
408            }
409        }
410    }
411
412    pub(super) fn plan_watermark(&mut self, _watermark: BoundWatermark) -> Result<PlanRef> {
413        todo!("plan watermark");
414    }
415
416    pub(super) fn plan_cte_ref(&mut self, cte_ref: BoundBackCteRef) -> Result<PlanRef> {
417        // TODO: this is actually duplicated from `plan_recursive_union`, refactor?
418        let base = self.plan_set_expr(cte_ref.base, vec![], &[])?;
419        Ok(LogicalCteRef::create(cte_ref.share_id, base))
420    }
421
422    fn collect_col_data_types_for_tumble_window(relation: &Relation) -> Result<Vec<DataType>> {
423        let col_data_types = match relation {
424            Relation::Source(s) => s
425                .catalog
426                .columns
427                .iter()
428                .map(|col| col.data_type().clone())
429                .collect(),
430            Relation::BaseTable(t) => t
431                .table_catalog
432                .columns
433                .iter()
434                .map(|col| col.data_type().clone())
435                .collect(),
436            Relation::Subquery(q) => q.query.schema().data_types(),
437            Relation::Share(share) => share
438                .input
439                .fields()?
440                .into_iter()
441                .map(|(_, f)| f.data_type)
442                .collect(),
443            r => {
444                return Err(ErrorCode::BindError(format!(
445                    "Invalid input relation to tumble: {r:?}"
446                ))
447                .into());
448            }
449        };
450        Ok(col_data_types)
451    }
452
453    fn plan_tumble_window(
454        &mut self,
455        input: Relation,
456        time_col: InputRef,
457        args: Vec<ExprImpl>,
458    ) -> Result<PlanRef> {
459        let mut args = args.into_iter();
460        let col_data_types: Vec<_> = Self::collect_col_data_types_for_tumble_window(&input)?;
461
462        match (args.next(), args.next(), args.next()) {
463            (Some(window_size @ ExprImpl::Literal(_)), None, None) => {
464                let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
465                for (idx, col_dt) in col_data_types.iter().enumerate() {
466                    exprs.push(InputRef::new(idx, col_dt.clone()).into());
467                }
468                let window_start: ExprImpl = FunctionCall::new(
469                    ExprType::TumbleStart,
470                    vec![ExprImpl::InputRef(Box::new(time_col)), window_size.clone()],
471                )?
472                .into();
473                // TODO: `window_end` may be optimized to avoid double calculation of
474                // `tumble_start`, or we can depends on common expression
475                // optimization.
476                let window_end =
477                    FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
478                        .into();
479                exprs.push(window_start);
480                exprs.push(window_end);
481                let base = self.plan_relation(input)?;
482                let project = LogicalProject::create(base, exprs);
483                Ok(project)
484            }
485            (
486                Some(window_size @ ExprImpl::Literal(_)),
487                Some(window_offset @ ExprImpl::Literal(_)),
488                None,
489            ) => {
490                let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
491                for (idx, col_dt) in col_data_types.iter().enumerate() {
492                    exprs.push(InputRef::new(idx, col_dt.clone()).into());
493                }
494                let window_start: ExprImpl = FunctionCall::new(
495                    ExprType::TumbleStart,
496                    vec![
497                        ExprImpl::InputRef(Box::new(time_col)),
498                        window_size.clone(),
499                        window_offset,
500                    ],
501                )?
502                .into();
503                // TODO: `window_end` may be optimized to avoid double calculation of
504                // `tumble_start`, or we can depends on common expression
505                // optimization.
506                let window_end =
507                    FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
508                        .into();
509                exprs.push(window_start);
510                exprs.push(window_end);
511                let base = self.plan_relation(input)?;
512                let project = LogicalProject::create(base, exprs);
513                Ok(project)
514            }
515            _ => Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
516        }
517    }
518
519    fn plan_hop_window(
520        &mut self,
521        input: Relation,
522        time_col: InputRef,
523        args: Vec<ExprImpl>,
524    ) -> Result<PlanRef> {
525        let input = self.plan_relation(input)?;
526        let mut args = args.into_iter();
527        let Some((ExprImpl::Literal(window_slide), ExprImpl::Literal(window_size))) =
528            args.next_tuple()
529        else {
530            return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
531        };
532
533        let Some(ScalarImpl::Interval(window_slide)) = *window_slide.get_data() else {
534            return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
535        };
536        let Some(ScalarImpl::Interval(window_size)) = *window_size.get_data() else {
537            return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
538        };
539
540        let window_offset = match (args.next(), args.next()) {
541            (Some(ExprImpl::Literal(window_offset)), None) => match *window_offset.get_data() {
542                Some(ScalarImpl::Interval(window_offset)) => window_offset,
543                _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
544            },
545            (None, None) => Interval::from_month_day_usec(0, 0, 0),
546            _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
547        };
548
549        if !window_size.is_positive() || !window_slide.is_positive() {
550            return Err(ErrorCode::BindError(format!(
551                "window_size {} and window_slide {} must be positive",
552                window_size, window_slide
553            ))
554            .into());
555        }
556
557        if window_size.exact_div(&window_slide).is_none() {
558            return Err(ErrorCode::BindError(format!("Invalid arguments for HOP window function: window_size {} cannot be divided by window_slide {}",window_size, window_slide)).into());
559        }
560
561        Ok(LogicalHopWindow::create(
562            input,
563            time_col,
564            window_slide,
565            window_size,
566            window_offset,
567        ))
568    }
569}