risingwave_frontend/planner/
relation.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::ops::Deref;
17use std::rc::Rc;
18
19use either::Either;
20use itertools::Itertools;
21use risingwave_common::catalog::{
22    ColumnCatalog, Engine, Field, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, Schema,
23};
24use risingwave_common::types::{DataType, Interval, ScalarImpl};
25use risingwave_common::{bail, bail_not_implemented};
26use risingwave_sqlparser::ast::AsOf;
27
28use crate::binder::{
29    BoundBackCteRef, BoundBaseTable, BoundJoin, BoundShare, BoundShareInput, BoundSource,
30    BoundSystemTable, BoundWatermark, BoundWindowTableFunction, Relation, WindowTableFunctionKind,
31};
32use crate::error::{ErrorCode, Result};
33use crate::expr::{CastContext, Expr, ExprImpl, ExprType, FunctionCall, InputRef, Literal};
34use crate::optimizer::plan_node::generic::SourceNodeKind;
35use crate::optimizer::plan_node::{
36    LogicalApply, LogicalCteRef, LogicalHopWindow, LogicalJoin, LogicalProject, LogicalScan,
37    LogicalShare, LogicalSource, LogicalSysScan, LogicalTableFunction, LogicalValues, PlanRef,
38};
39use crate::optimizer::property::Cardinality;
40use crate::planner::{PlanFor, Planner};
41use crate::utils::Condition;
42
43const ERROR_WINDOW_SIZE_ARG: &str =
44    "The size arg of window table function should be an interval literal.";
45
46impl Planner {
47    pub fn plan_relation(&mut self, relation: Relation) -> Result<PlanRef> {
48        match relation {
49            Relation::BaseTable(t) => self.plan_base_table(&t),
50            Relation::SystemTable(st) => self.plan_sys_table(*st),
51            // TODO: order is ignored in the subquery
52            Relation::Subquery(q) => Ok(self.plan_query(q.query)?.into_unordered_subplan()),
53            Relation::Join(join) => self.plan_join(*join),
54            Relation::Apply(join) => self.plan_apply(*join),
55            Relation::WindowTableFunction(tf) => self.plan_window_table_function(*tf),
56            Relation::Source(s) => self.plan_source(*s),
57            Relation::TableFunction {
58                expr: tf,
59                with_ordinality,
60            } => self.plan_table_function(tf, with_ordinality),
61            Relation::Watermark(tf) => self.plan_watermark(*tf),
62            // note that rcte (i.e., RecursiveUnion) is included *implicitly* in share.
63            Relation::Share(share) => self.plan_share(*share),
64            Relation::BackCteRef(cte_ref) => self.plan_cte_ref(*cte_ref),
65        }
66    }
67
68    pub(crate) fn plan_sys_table(&mut self, sys_table: BoundSystemTable) -> Result<PlanRef> {
69        Ok(LogicalSysScan::create(
70            sys_table.sys_table_catalog.name().to_owned(),
71            Rc::new(sys_table.sys_table_catalog.table_desc()),
72            self.ctx(),
73            Cardinality::unknown(), // TODO(card): cardinality of system table
74        )
75        .into())
76    }
77
78    pub(super) fn plan_base_table(&mut self, base_table: &BoundBaseTable) -> Result<PlanRef> {
79        let as_of = base_table.as_of.clone();
80        let table_cardinality = base_table.table_catalog.cardinality;
81        let scan = LogicalScan::create(
82            base_table.table_catalog.name().to_owned(),
83            base_table.table_catalog.clone(),
84            base_table
85                .table_indexes
86                .iter()
87                .map(|x| x.as_ref().clone().into())
88                .collect(),
89            self.ctx(),
90            as_of.clone(),
91            table_cardinality,
92        );
93
94        match (base_table.table_catalog.engine, self.plan_for()) {
95            (Engine::Hummock, PlanFor::Stream)
96            | (Engine::Hummock, PlanFor::Batch)
97            | (Engine::Hummock, PlanFor::BatchDql) => {
98                match as_of {
99                    None
100                    | Some(AsOf::ProcessTime)
101                    | Some(AsOf::TimestampNum(_))
102                    | Some(AsOf::TimestampString(_))
103                    | Some(AsOf::ProcessTimeWithInterval(_)) => {}
104                    Some(AsOf::VersionNum(_)) | Some(AsOf::VersionString(_)) => {
105                        bail_not_implemented!("As Of Version is not supported yet.")
106                    }
107                };
108                Ok(scan.into())
109            }
110            (Engine::Iceberg, PlanFor::Stream) | (Engine::Iceberg, PlanFor::Batch) => {
111                match as_of {
112                    None
113                    | Some(AsOf::VersionNum(_))
114                    | Some(AsOf::TimestampString(_))
115                    | Some(AsOf::TimestampNum(_)) => {}
116                    Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
117                        bail_not_implemented!("As Of ProcessTime() is not supported yet.")
118                    }
119                    Some(AsOf::VersionString(_)) => {
120                        bail_not_implemented!("As Of Version is not supported yet.")
121                    }
122                }
123                Ok(scan.into())
124            }
125            (Engine::Iceberg, PlanFor::BatchDql) => {
126                match as_of {
127                    None
128                    | Some(AsOf::VersionNum(_))
129                    | Some(AsOf::TimestampString(_))
130                    | Some(AsOf::TimestampNum(_)) => {}
131                    Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
132                        bail_not_implemented!("As Of ProcessTime() is not supported yet.")
133                    }
134                    Some(AsOf::VersionString(_)) => {
135                        bail_not_implemented!("As Of Version is not supported yet.")
136                    }
137                }
138                let opt_ctx = self.ctx();
139                let session = opt_ctx.session_ctx();
140                let db_name = &session.database();
141                let catalog_reader = session.env().catalog_reader().read_guard();
142                let mut source_catalog = None;
143                for schema in catalog_reader.iter_schemas(db_name).unwrap() {
144                    if schema
145                        .get_table_by_id(&base_table.table_catalog.id)
146                        .is_some()
147                    {
148                        source_catalog = schema.get_source_by_name(
149                            &base_table.table_catalog.iceberg_source_name().unwrap(),
150                        );
151                        break;
152                    }
153                }
154                if let Some(source_catalog) = source_catalog {
155                    let column_map: HashMap<String, (usize, ColumnCatalog)> = source_catalog
156                        .columns
157                        .clone()
158                        .into_iter()
159                        .enumerate()
160                        .map(|(i, column)| (column.name().to_owned(), (i, column)))
161                        .collect();
162                    let exprs = scan
163                        .table_catalog()
164                        .column_schema()
165                        .fields()
166                        .iter()
167                        .map(|field| {
168                            let source_filed_name = if field.name == ROW_ID_COLUMN_NAME {
169                                RISINGWAVE_ICEBERG_ROW_ID
170                            } else {
171                                &field.name
172                            };
173                            if let Some((i, source_column)) = column_map.get(source_filed_name) {
174                                if source_column.column_desc.data_type == field.data_type {
175                                    ExprImpl::InputRef(
176                                        InputRef::new(*i, field.data_type.clone()).into(),
177                                    )
178                                } else {
179                                    let mut input_ref = ExprImpl::InputRef(
180                                        InputRef::new(
181                                            *i,
182                                            source_column.column_desc.data_type.clone(),
183                                        )
184                                        .into(),
185                                    );
186                                    FunctionCall::cast_mut(
187                                        &mut input_ref,
188                                        field.data_type().clone(),
189                                        CastContext::Explicit,
190                                    )
191                                    .unwrap();
192                                    input_ref
193                                }
194                            } else {
195                                // fields like `_rw_timestamp`, would not be found in source.
196                                ExprImpl::Literal(
197                                    Literal::new(None, field.data_type.clone()).into(),
198                                )
199                            }
200                        })
201                        .collect_vec();
202                    let logical_source = LogicalSource::with_catalog(
203                        Rc::new(source_catalog.deref().clone()),
204                        SourceNodeKind::CreateMViewOrBatch,
205                        self.ctx(),
206                        as_of,
207                    )?;
208                    Ok(LogicalProject::new(logical_source.into(), exprs).into())
209                } else {
210                    bail!(
211                        "failed to plan a iceberg engine table: {}. Can't find the corresponding iceberg source. Maybe you need to recreate the table",
212                        base_table.table_catalog.name()
213                    );
214                }
215            }
216        }
217    }
218
219    pub(super) fn plan_source(&mut self, source: BoundSource) -> Result<PlanRef> {
220        if source.is_shareable_cdc_connector() {
221            Err(ErrorCode::InternalError(
222                "Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source. HINT: create TABLE from the source instead.".to_owned(),
223            )
224            .into())
225        } else {
226            let as_of = source.as_of.clone();
227            match as_of {
228                None
229                | Some(AsOf::VersionNum(_))
230                | Some(AsOf::TimestampString(_))
231                | Some(AsOf::TimestampNum(_)) => {}
232                Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
233                    bail_not_implemented!("As Of ProcessTime() is not supported yet.")
234                }
235                Some(AsOf::VersionString(_)) => {
236                    bail_not_implemented!("As Of Version is not supported yet.")
237                }
238            }
239            Ok(LogicalSource::with_catalog(
240                Rc::new(source.catalog),
241                SourceNodeKind::CreateMViewOrBatch,
242                self.ctx(),
243                as_of,
244            )?
245            .into())
246        }
247    }
248
249    pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result<PlanRef> {
250        let left = self.plan_relation(join.left)?;
251        let right = self.plan_relation(join.right)?;
252        let join_type = join.join_type;
253        let on_clause = join.cond;
254        if on_clause.has_subquery() {
255            bail_not_implemented!("Subquery in join on condition");
256        } else {
257            Ok(LogicalJoin::create(left, right, join_type, on_clause))
258        }
259    }
260
261    pub(super) fn plan_apply(&mut self, mut join: BoundJoin) -> Result<PlanRef> {
262        let join_type = join.join_type;
263        let on_clause = join.cond;
264        if on_clause.has_subquery() {
265            bail_not_implemented!("Subquery in join on condition");
266        }
267
268        let correlated_id = self.ctx.next_correlated_id();
269        let correlated_indices = join
270            .right
271            .collect_correlated_indices_by_depth_and_assign_id(0, correlated_id);
272        let left = self.plan_relation(join.left)?;
273        let right = self.plan_relation(join.right)?;
274
275        Ok(LogicalApply::create(
276            left,
277            right,
278            join_type,
279            Condition::with_expr(on_clause),
280            correlated_id,
281            correlated_indices,
282            false,
283        ))
284    }
285
286    pub(super) fn plan_window_table_function(
287        &mut self,
288        table_function: BoundWindowTableFunction,
289    ) -> Result<PlanRef> {
290        use WindowTableFunctionKind::*;
291        match table_function.kind {
292            Tumble => self.plan_tumble_window(
293                table_function.input,
294                table_function.time_col,
295                table_function.args,
296            ),
297            Hop => self.plan_hop_window(
298                table_function.input,
299                table_function.time_col,
300                table_function.args,
301            ),
302        }
303    }
304
305    pub(super) fn plan_table_function(
306        &mut self,
307        table_function: ExprImpl,
308        with_ordinality: bool,
309    ) -> Result<PlanRef> {
310        // TODO: maybe we can unify LogicalTableFunction with LogicalValues
311        match table_function {
312            ExprImpl::TableFunction(tf) => {
313                Ok(LogicalTableFunction::new(*tf, with_ordinality, self.ctx()).into())
314            }
315            expr => {
316                let schema = Schema {
317                    // TODO: should be named
318                    fields: vec![Field::unnamed(expr.return_type())],
319                };
320                let expr_return_type = expr.return_type();
321                let root = LogicalValues::create(vec![vec![expr]], schema, self.ctx());
322                let input_ref = ExprImpl::from(InputRef::new(0, expr_return_type.clone()));
323                let mut exprs = if let DataType::Struct(st) = expr_return_type {
324                    st.iter()
325                        .enumerate()
326                        .map(|(i, (_, ty))| {
327                            let idx = ExprImpl::literal_int(i.try_into().unwrap());
328                            let args = vec![input_ref.clone(), idx];
329                            FunctionCall::new_unchecked(ExprType::Field, args, ty.clone()).into()
330                        })
331                        .collect()
332                } else {
333                    vec![input_ref]
334                };
335                if with_ordinality {
336                    exprs.push(ExprImpl::literal_bigint(1));
337                }
338                Ok(LogicalProject::create(root, exprs))
339            }
340        }
341    }
342
343    pub(super) fn plan_share(&mut self, share: BoundShare) -> Result<PlanRef> {
344        match share.input {
345            BoundShareInput::Query(Either::Left(nonrecursive_query)) => {
346                let id = share.share_id;
347                match self.share_cache.get(&id) {
348                    None => {
349                        let result = self
350                            .plan_query(nonrecursive_query)?
351                            .into_unordered_subplan();
352                        let logical_share = LogicalShare::create(result);
353                        self.share_cache.insert(id, logical_share.clone());
354                        Ok(logical_share)
355                    }
356                    Some(result) => Ok(result.clone()),
357                }
358            }
359            // for the recursive union in rcte
360            BoundShareInput::Query(Either::Right(recursive_union)) => self.plan_recursive_union(
361                *recursive_union.base,
362                *recursive_union.recursive,
363                share.share_id,
364            ),
365            BoundShareInput::ChangeLog(relation) => {
366                let id = share.share_id;
367                let result = self.plan_changelog(relation)?;
368                let logical_share = LogicalShare::create(result);
369                self.share_cache.insert(id, logical_share.clone());
370                Ok(logical_share)
371            }
372        }
373    }
374
375    pub(super) fn plan_watermark(&mut self, _watermark: BoundWatermark) -> Result<PlanRef> {
376        todo!("plan watermark");
377    }
378
379    pub(super) fn plan_cte_ref(&mut self, cte_ref: BoundBackCteRef) -> Result<PlanRef> {
380        // TODO: this is actually duplicated from `plan_recursive_union`, refactor?
381        let base = self.plan_set_expr(cte_ref.base, vec![], &[])?;
382        Ok(LogicalCteRef::create(cte_ref.share_id, base))
383    }
384
385    fn collect_col_data_types_for_tumble_window(relation: &Relation) -> Result<Vec<DataType>> {
386        let col_data_types = match relation {
387            Relation::Source(s) => s
388                .catalog
389                .columns
390                .iter()
391                .map(|col| col.data_type().clone())
392                .collect(),
393            Relation::BaseTable(t) => t
394                .table_catalog
395                .columns
396                .iter()
397                .map(|col| col.data_type().clone())
398                .collect(),
399            Relation::Subquery(q) => q.query.schema().data_types(),
400            Relation::Share(share) => share
401                .input
402                .fields()?
403                .into_iter()
404                .map(|(_, f)| f.data_type)
405                .collect(),
406            r => {
407                return Err(ErrorCode::BindError(format!(
408                    "Invalid input relation to tumble: {r:?}"
409                ))
410                .into());
411            }
412        };
413        Ok(col_data_types)
414    }
415
416    fn plan_tumble_window(
417        &mut self,
418        input: Relation,
419        time_col: InputRef,
420        args: Vec<ExprImpl>,
421    ) -> Result<PlanRef> {
422        let mut args = args.into_iter();
423        let col_data_types: Vec<_> = Self::collect_col_data_types_for_tumble_window(&input)?;
424
425        match (args.next(), args.next(), args.next()) {
426            (Some(window_size @ ExprImpl::Literal(_)), None, None) => {
427                let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
428                for (idx, col_dt) in col_data_types.iter().enumerate() {
429                    exprs.push(InputRef::new(idx, col_dt.clone()).into());
430                }
431                let window_start: ExprImpl = FunctionCall::new(
432                    ExprType::TumbleStart,
433                    vec![ExprImpl::InputRef(Box::new(time_col)), window_size.clone()],
434                )?
435                .into();
436                // TODO: `window_end` may be optimized to avoid double calculation of
437                // `tumble_start`, or we can depends on common expression
438                // optimization.
439                let window_end =
440                    FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
441                        .into();
442                exprs.push(window_start);
443                exprs.push(window_end);
444                let base = self.plan_relation(input)?;
445                let project = LogicalProject::create(base, exprs);
446                Ok(project)
447            }
448            (
449                Some(window_size @ ExprImpl::Literal(_)),
450                Some(window_offset @ ExprImpl::Literal(_)),
451                None,
452            ) => {
453                let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
454                for (idx, col_dt) in col_data_types.iter().enumerate() {
455                    exprs.push(InputRef::new(idx, col_dt.clone()).into());
456                }
457                let window_start: ExprImpl = FunctionCall::new(
458                    ExprType::TumbleStart,
459                    vec![
460                        ExprImpl::InputRef(Box::new(time_col)),
461                        window_size.clone(),
462                        window_offset,
463                    ],
464                )?
465                .into();
466                // TODO: `window_end` may be optimized to avoid double calculation of
467                // `tumble_start`, or we can depends on common expression
468                // optimization.
469                let window_end =
470                    FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
471                        .into();
472                exprs.push(window_start);
473                exprs.push(window_end);
474                let base = self.plan_relation(input)?;
475                let project = LogicalProject::create(base, exprs);
476                Ok(project)
477            }
478            _ => Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
479        }
480    }
481
482    fn plan_hop_window(
483        &mut self,
484        input: Relation,
485        time_col: InputRef,
486        args: Vec<ExprImpl>,
487    ) -> Result<PlanRef> {
488        let input = self.plan_relation(input)?;
489        let mut args = args.into_iter();
490        let Some((ExprImpl::Literal(window_slide), ExprImpl::Literal(window_size))) =
491            args.next_tuple()
492        else {
493            return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
494        };
495
496        let Some(ScalarImpl::Interval(window_slide)) = *window_slide.get_data() else {
497            return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
498        };
499        let Some(ScalarImpl::Interval(window_size)) = *window_size.get_data() else {
500            return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
501        };
502
503        let window_offset = match (args.next(), args.next()) {
504            (Some(ExprImpl::Literal(window_offset)), None) => match *window_offset.get_data() {
505                Some(ScalarImpl::Interval(window_offset)) => window_offset,
506                _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
507            },
508            (None, None) => Interval::from_month_day_usec(0, 0, 0),
509            _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
510        };
511
512        if !window_size.is_positive() || !window_slide.is_positive() {
513            return Err(ErrorCode::BindError(format!(
514                "window_size {} and window_slide {} must be positive",
515                window_size, window_slide
516            ))
517            .into());
518        }
519
520        if window_size.exact_div(&window_slide).is_none() {
521            return Err(ErrorCode::BindError(format!("Invalid arguments for HOP window function: window_size {} cannot be divided by window_slide {}",window_size, window_slide)).into());
522        }
523
524        Ok(LogicalHopWindow::create(
525            input,
526            time_col,
527            window_slide,
528            window_size,
529            window_offset,
530        ))
531    }
532}