risingwave_frontend/planner/
relation.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::ops::Deref;
17use std::rc::Rc;
18
19use either::Either;
20use itertools::Itertools;
21use risingwave_common::catalog::{
22    ColumnCatalog, Engine, Field, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, Schema,
23};
24use risingwave_common::types::{DataType, Interval, ScalarImpl};
25use risingwave_common::{bail, bail_not_implemented};
26use risingwave_sqlparser::ast::AsOf;
27
28use crate::binder::{
29    BoundBackCteRef, BoundBaseTable, BoundGapFill, BoundJoin, BoundShare, BoundShareInput,
30    BoundSource, BoundSystemTable, BoundWatermark, BoundWindowTableFunction, Relation,
31    WindowTableFunctionKind,
32};
33use crate::error::{ErrorCode, Result};
34use crate::expr::{CastContext, Expr, ExprImpl, ExprType, FunctionCall, InputRef, Literal};
35use crate::optimizer::plan_node::generic::SourceNodeKind;
36use crate::optimizer::plan_node::{
37    LogicalApply, LogicalCteRef, LogicalGapFill, LogicalHopWindow, LogicalJoin,
38    LogicalPlanRef as PlanRef, LogicalProject, LogicalScan, LogicalShare, LogicalSource,
39    LogicalSysScan, LogicalTableFunction, LogicalValues,
40};
41use crate::optimizer::property::Cardinality;
42use crate::planner::{PlanFor, Planner};
43use crate::utils::Condition;
44
45const ERROR_WINDOW_SIZE_ARG: &str =
46    "The size arg of window table function should be an interval literal.";
47
48impl Planner {
49    pub fn plan_relation(&mut self, relation: Relation) -> Result<PlanRef> {
50        match relation {
51            Relation::BaseTable(t) => self.plan_base_table(&t),
52            Relation::SystemTable(st) => self.plan_sys_table(*st),
53            // TODO: order is ignored in the subquery
54            Relation::Subquery(q) => Ok(self.plan_query(q.query)?.into_unordered_subplan()),
55            Relation::Join(join) => self.plan_join(*join),
56            Relation::Apply(join) => self.plan_apply(*join),
57            Relation::WindowTableFunction(tf) => self.plan_window_table_function(*tf),
58            Relation::Source(s) => self.plan_source(*s),
59            Relation::TableFunction {
60                expr: tf,
61                with_ordinality,
62            } => self.plan_table_function(tf, with_ordinality),
63            Relation::Watermark(tf) => self.plan_watermark(*tf),
64            // note that rcte (i.e., RecursiveUnion) is included *implicitly* in share.
65            Relation::Share(share) => self.plan_share(*share),
66            Relation::BackCteRef(cte_ref) => self.plan_cte_ref(*cte_ref),
67            Relation::GapFill(bound_gap_fill) => self.plan_gap_fill(*bound_gap_fill),
68        }
69    }
70
71    pub(crate) fn plan_sys_table(&mut self, sys_table: BoundSystemTable) -> Result<PlanRef> {
72        Ok(LogicalSysScan::create(
73            sys_table.sys_table_catalog,
74            self.ctx(),
75            Cardinality::unknown(), // TODO(card): cardinality of system table
76        )
77        .into())
78    }
79
80    pub(super) fn plan_base_table(&mut self, base_table: &BoundBaseTable) -> Result<PlanRef> {
81        let as_of = base_table.as_of.clone();
82        let scan = LogicalScan::from_base_table(base_table, self.ctx(), as_of.clone());
83
84        match base_table.table_catalog.engine {
85            Engine::Hummock => {
86                match as_of {
87                    None
88                    | Some(AsOf::ProcessTime)
89                    | Some(AsOf::TimestampNum(_))
90                    | Some(AsOf::TimestampString(_))
91                    | Some(AsOf::ProcessTimeWithInterval(_)) => {}
92                    Some(AsOf::VersionNum(_)) | Some(AsOf::VersionString(_)) => {
93                        bail_not_implemented!("As Of Version is not supported yet.")
94                    }
95                };
96                Ok(scan.into())
97            }
98            Engine::Iceberg => {
99                let is_append_only = base_table.table_catalog.append_only;
100                let use_iceberg_source = match (self.plan_for(), is_append_only) {
101                    (PlanFor::StreamIcebergEngineInternal, _) => false,
102                    (PlanFor::BatchDql, _) => true,
103                    (PlanFor::Stream | PlanFor::Batch, is_append_only) => is_append_only,
104                };
105
106                if !use_iceberg_source {
107                    match as_of {
108                        None
109                        | Some(AsOf::VersionNum(_))
110                        | Some(AsOf::TimestampString(_))
111                        | Some(AsOf::TimestampNum(_)) => {}
112                        Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
113                            bail_not_implemented!("As Of ProcessTime() is not supported yet.")
114                        }
115                        Some(AsOf::VersionString(_)) => {
116                            bail_not_implemented!("As Of Version is not supported yet.")
117                        }
118                    }
119                    Ok(scan.into())
120                } else {
121                    match as_of {
122                        None
123                        | Some(AsOf::VersionNum(_))
124                        | Some(AsOf::TimestampString(_))
125                        | Some(AsOf::TimestampNum(_)) => {}
126                        Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
127                            bail_not_implemented!("As Of ProcessTime() is not supported yet.")
128                        }
129                        Some(AsOf::VersionString(_)) => {
130                            bail_not_implemented!("As Of Version is not supported yet.")
131                        }
132                    }
133                    let opt_ctx = self.ctx();
134                    let session = opt_ctx.session_ctx();
135                    let db_name = &session.database();
136                    let catalog_reader = session.env().catalog_reader().read_guard();
137                    let mut source_catalog = None;
138                    for schema in catalog_reader.iter_schemas(db_name).unwrap() {
139                        if schema
140                            .get_table_by_id(&base_table.table_catalog.id)
141                            .is_some()
142                        {
143                            source_catalog = schema.get_source_by_name(
144                                &base_table.table_catalog.iceberg_source_name().unwrap(),
145                            );
146                            break;
147                        }
148                    }
149                    if let Some(source_catalog) = source_catalog {
150                        let column_map: HashMap<String, (usize, ColumnCatalog)> = source_catalog
151                            .columns
152                            .clone()
153                            .into_iter()
154                            .enumerate()
155                            .map(|(i, column)| (column.name().to_owned(), (i, column)))
156                            .collect();
157                        let exprs = scan
158                            .table()
159                            .column_schema()
160                            .fields()
161                            .iter()
162                            .map(|field| {
163                                let source_filed_name = if field.name == ROW_ID_COLUMN_NAME {
164                                    RISINGWAVE_ICEBERG_ROW_ID
165                                } else {
166                                    &field.name
167                                };
168                                if let Some((i, source_column)) = column_map.get(source_filed_name)
169                                {
170                                    if source_column.column_desc.data_type == field.data_type {
171                                        ExprImpl::InputRef(
172                                            InputRef::new(*i, field.data_type.clone()).into(),
173                                        )
174                                    } else {
175                                        let mut input_ref = ExprImpl::InputRef(
176                                            InputRef::new(
177                                                *i,
178                                                source_column.column_desc.data_type.clone(),
179                                            )
180                                            .into(),
181                                        );
182                                        FunctionCall::cast_mut(
183                                            &mut input_ref,
184                                            &field.data_type(),
185                                            CastContext::Explicit,
186                                        )
187                                        .unwrap();
188                                        input_ref
189                                    }
190                                } else {
191                                    // fields like `_rw_timestamp`, would not be found in source.
192                                    ExprImpl::Literal(
193                                        Literal::new(None, field.data_type.clone()).into(),
194                                    )
195                                }
196                            })
197                            .collect_vec();
198                        let logical_source = LogicalSource::with_catalog(
199                            Rc::new(source_catalog.deref().clone()),
200                            SourceNodeKind::CreateMViewOrBatch,
201                            self.ctx(),
202                            as_of,
203                        )?;
204                        Ok(LogicalProject::new(logical_source.into(), exprs).into())
205                    } else {
206                        bail!(
207                            "failed to plan a iceberg engine table: {}. Can't find the corresponding iceberg source. Maybe you need to recreate the table",
208                            base_table.table_catalog.name()
209                        );
210                    }
211                }
212            }
213        }
214    }
215
216    pub(super) fn plan_source(&mut self, source: BoundSource) -> Result<PlanRef> {
217        if source.is_shareable_cdc_connector() {
218            Err(ErrorCode::InternalError(
219                "Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source. HINT: create TABLE from the source instead.".to_owned(),
220            )
221            .into())
222        } else {
223            let as_of = source.as_of.clone();
224            match as_of {
225                None
226                | Some(AsOf::VersionNum(_))
227                | Some(AsOf::TimestampString(_))
228                | Some(AsOf::TimestampNum(_)) => {}
229                Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
230                    bail_not_implemented!("As Of ProcessTime() is not supported yet.")
231                }
232                Some(AsOf::VersionString(_)) => {
233                    bail_not_implemented!("As Of Version is not supported yet.")
234                }
235            }
236
237            // validate the source has pk. We raise an error here to avoid panic in expect_stream_key later
238            // for a nicer error message.
239            if matches!(self.plan_for(), PlanFor::Stream) {
240                let has_pk =
241                    source.catalog.row_id_index.is_some() || !source.catalog.pk_col_ids.is_empty();
242                if !has_pk {
243                    // in older version, iceberg source doesn't have row_id, thus may hit this
244                    let is_iceberg = source.catalog.is_iceberg_connector();
245                    // only iceberg should hit this.
246                    debug_assert!(is_iceberg);
247                    if is_iceberg {
248                        return Err(ErrorCode::BindError(format!(
249                        "Cannot create a stream job from an iceberg source without a primary key.\nThe iceberg source might be created in an older version of RisingWave. Please try recreating the source.\nSource: {:?}",
250                        source.catalog
251                    ))
252                    .into());
253                    } else {
254                        return Err(ErrorCode::BindError(format!(
255                            "Cannot create a stream job from a source without a primary key.
256This is a bug. We would appreciate a bug report at:
257https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Fbug&template=bug_report.yml
258
259source: {:?}",
260                            source.catalog
261                        ))
262                        .into());
263                    }
264                }
265            }
266            Ok(LogicalSource::with_catalog(
267                Rc::new(source.catalog),
268                SourceNodeKind::CreateMViewOrBatch,
269                self.ctx(),
270                as_of,
271            )?
272            .into())
273        }
274    }
275
276    pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result<PlanRef> {
277        let left = self.plan_relation(join.left)?;
278        let right = self.plan_relation(join.right)?;
279        let join_type = join.join_type;
280        let on_clause = join.cond;
281        if on_clause.has_subquery() {
282            bail_not_implemented!("Subquery in join on condition");
283        } else {
284            Ok(LogicalJoin::create(left, right, join_type, on_clause))
285        }
286    }
287
288    pub(super) fn plan_apply(&mut self, mut join: BoundJoin) -> Result<PlanRef> {
289        let join_type = join.join_type;
290        let on_clause = join.cond;
291        if on_clause.has_subquery() {
292            bail_not_implemented!("Subquery in join on condition");
293        }
294
295        let correlated_id = self.ctx.next_correlated_id();
296        let correlated_indices = join
297            .right
298            .collect_correlated_indices_by_depth_and_assign_id(0, correlated_id);
299        let left = self.plan_relation(join.left)?;
300        let right = self.plan_relation(join.right)?;
301
302        Ok(LogicalApply::create(
303            left,
304            right,
305            join_type,
306            Condition::with_expr(on_clause),
307            correlated_id,
308            correlated_indices,
309            false,
310        ))
311    }
312
313    pub(super) fn plan_window_table_function(
314        &mut self,
315        table_function: BoundWindowTableFunction,
316    ) -> Result<PlanRef> {
317        use WindowTableFunctionKind::*;
318        match table_function.kind {
319            Tumble => self.plan_tumble_window(
320                table_function.input,
321                table_function.time_col,
322                table_function.args,
323            ),
324            Hop => self.plan_hop_window(
325                table_function.input,
326                table_function.time_col,
327                table_function.args,
328            ),
329        }
330    }
331
332    pub(super) fn plan_table_function(
333        &mut self,
334        table_function: ExprImpl,
335        with_ordinality: bool,
336    ) -> Result<PlanRef> {
337        // TODO: maybe we can unify LogicalTableFunction with LogicalValues
338        match table_function {
339            ExprImpl::TableFunction(tf) => {
340                Ok(LogicalTableFunction::new(*tf, with_ordinality, self.ctx()).into())
341            }
342            expr => {
343                let schema = Schema {
344                    // TODO: should be named
345                    fields: vec![Field::unnamed(expr.return_type())],
346                };
347                let expr_return_type = expr.return_type();
348                let root = LogicalValues::create(vec![vec![expr]], schema, self.ctx());
349                let input_ref = ExprImpl::from(InputRef::new(0, expr_return_type.clone()));
350                let mut exprs = if let DataType::Struct(st) = expr_return_type {
351                    st.iter()
352                        .enumerate()
353                        .map(|(i, (_, ty))| {
354                            let idx = ExprImpl::literal_int(i.try_into().unwrap());
355                            let args = vec![input_ref.clone(), idx];
356                            FunctionCall::new_unchecked(ExprType::Field, args, ty.clone()).into()
357                        })
358                        .collect()
359                } else {
360                    vec![input_ref]
361                };
362                if with_ordinality {
363                    exprs.push(ExprImpl::literal_bigint(1));
364                }
365                Ok(LogicalProject::create(root, exprs))
366            }
367        }
368    }
369
370    pub(super) fn plan_share(&mut self, share: BoundShare) -> Result<PlanRef> {
371        match share.input {
372            BoundShareInput::Query(Either::Left(nonrecursive_query)) => {
373                let id = share.share_id;
374                match self.share_cache.get(&id) {
375                    None => {
376                        let result = self
377                            .plan_query(nonrecursive_query)?
378                            .into_unordered_subplan();
379                        let logical_share = LogicalShare::create(result);
380                        self.share_cache.insert(id, logical_share.clone());
381                        Ok(logical_share)
382                    }
383                    Some(result) => Ok(result.clone()),
384                }
385            }
386            // for the recursive union in rcte
387            BoundShareInput::Query(Either::Right(recursive_union)) => self.plan_recursive_union(
388                *recursive_union.base,
389                *recursive_union.recursive,
390                share.share_id,
391            ),
392            BoundShareInput::ChangeLog(relation) => {
393                let id = share.share_id;
394                let result = self.plan_changelog(relation)?;
395                let logical_share = LogicalShare::create(result);
396                self.share_cache.insert(id, logical_share.clone());
397                Ok(logical_share)
398            }
399        }
400    }
401
402    pub(super) fn plan_watermark(&mut self, _watermark: BoundWatermark) -> Result<PlanRef> {
403        todo!("plan watermark");
404    }
405
406    pub(super) fn plan_cte_ref(&mut self, cte_ref: BoundBackCteRef) -> Result<PlanRef> {
407        // TODO: this is actually duplicated from `plan_recursive_union`, refactor?
408        let base = self.plan_set_expr(cte_ref.base, vec![], &[])?;
409        Ok(LogicalCteRef::create(cte_ref.share_id, base))
410    }
411
412    pub(super) fn plan_gap_fill(&mut self, gap_fill: BoundGapFill) -> Result<PlanRef> {
413        let input = self.plan_relation(gap_fill.input)?;
414        Ok(LogicalGapFill::new(
415            input,
416            gap_fill.time_col,
417            gap_fill.interval,
418            gap_fill.fill_strategies,
419        )
420        .into())
421    }
422
423    fn collect_col_data_types_for_tumble_window(relation: &Relation) -> Result<Vec<DataType>> {
424        let col_data_types = match relation {
425            Relation::Source(s) => s
426                .catalog
427                .columns
428                .iter()
429                .map(|col| col.data_type().clone())
430                .collect(),
431            Relation::BaseTable(t) => t
432                .table_catalog
433                .columns
434                .iter()
435                .map(|col| col.data_type().clone())
436                .collect(),
437            Relation::Subquery(q) => q.query.schema().data_types(),
438            Relation::Share(share) => share
439                .input
440                .fields()?
441                .into_iter()
442                .map(|(_, f)| f.data_type)
443                .collect(),
444            r => {
445                return Err(ErrorCode::BindError(format!(
446                    "Invalid input relation to tumble: {r:?}"
447                ))
448                .into());
449            }
450        };
451        Ok(col_data_types)
452    }
453
454    fn plan_tumble_window(
455        &mut self,
456        input: Relation,
457        time_col: InputRef,
458        args: Vec<ExprImpl>,
459    ) -> Result<PlanRef> {
460        let mut args = args.into_iter();
461        let col_data_types: Vec<_> = Self::collect_col_data_types_for_tumble_window(&input)?;
462
463        match (args.next(), args.next(), args.next()) {
464            (Some(window_size @ ExprImpl::Literal(_)), None, None) => {
465                let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
466                for (idx, col_dt) in col_data_types.iter().enumerate() {
467                    exprs.push(InputRef::new(idx, col_dt.clone()).into());
468                }
469                let window_start: ExprImpl = FunctionCall::new(
470                    ExprType::TumbleStart,
471                    vec![ExprImpl::InputRef(Box::new(time_col)), window_size.clone()],
472                )?
473                .into();
474                // TODO: `window_end` may be optimized to avoid double calculation of
475                // `tumble_start`, or we can depends on common expression
476                // optimization.
477                let window_end =
478                    FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
479                        .into();
480                exprs.push(window_start);
481                exprs.push(window_end);
482                let base = self.plan_relation(input)?;
483                let project = LogicalProject::create(base, exprs);
484                Ok(project)
485            }
486            (
487                Some(window_size @ ExprImpl::Literal(_)),
488                Some(window_offset @ ExprImpl::Literal(_)),
489                None,
490            ) => {
491                let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
492                for (idx, col_dt) in col_data_types.iter().enumerate() {
493                    exprs.push(InputRef::new(idx, col_dt.clone()).into());
494                }
495                let window_start: ExprImpl = FunctionCall::new(
496                    ExprType::TumbleStart,
497                    vec![
498                        ExprImpl::InputRef(Box::new(time_col)),
499                        window_size.clone(),
500                        window_offset,
501                    ],
502                )?
503                .into();
504                // TODO: `window_end` may be optimized to avoid double calculation of
505                // `tumble_start`, or we can depends on common expression
506                // optimization.
507                let window_end =
508                    FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
509                        .into();
510                exprs.push(window_start);
511                exprs.push(window_end);
512                let base = self.plan_relation(input)?;
513                let project = LogicalProject::create(base, exprs);
514                Ok(project)
515            }
516            _ => Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
517        }
518    }
519
520    fn plan_hop_window(
521        &mut self,
522        input: Relation,
523        time_col: InputRef,
524        args: Vec<ExprImpl>,
525    ) -> Result<PlanRef> {
526        let input = self.plan_relation(input)?;
527        let mut args = args.into_iter();
528        let Some((ExprImpl::Literal(window_slide), ExprImpl::Literal(window_size))) =
529            args.next_tuple()
530        else {
531            return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
532        };
533
534        let Some(ScalarImpl::Interval(window_slide)) = *window_slide.get_data() else {
535            return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
536        };
537        let Some(ScalarImpl::Interval(window_size)) = *window_size.get_data() else {
538            return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
539        };
540
541        let window_offset = match (args.next(), args.next()) {
542            (Some(ExprImpl::Literal(window_offset)), None) => match *window_offset.get_data() {
543                Some(ScalarImpl::Interval(window_offset)) => window_offset,
544                _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
545            },
546            (None, None) => Interval::from_month_day_usec(0, 0, 0),
547            _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
548        };
549
550        if !window_size.is_positive() || !window_slide.is_positive() {
551            return Err(ErrorCode::BindError(format!(
552                "window_size {} and window_slide {} must be positive",
553                window_size, window_slide
554            ))
555            .into());
556        }
557
558        if window_size.exact_div(&window_slide).is_none() {
559            return Err(ErrorCode::BindError(format!("Invalid arguments for HOP window function: window_size {} cannot be divided by window_slide {}",window_size, window_slide)).into());
560        }
561
562        Ok(LogicalHopWindow::create(
563            input,
564            time_col,
565            window_slide,
566            window_size,
567            window_offset,
568        ))
569    }
570}