risingwave_frontend/planner/
relation.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::ops::Deref;
17use std::rc::Rc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::{
21    ColumnCatalog, Engine, Field, RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, Schema,
22};
23use risingwave_common::types::{DataType, Interval, ScalarImpl};
24use risingwave_common::{bail, bail_not_implemented};
25use risingwave_sqlparser::ast::AsOf;
26
27use crate::binder::{
28    BoundBaseTable, BoundGapFill, BoundJoin, BoundShare, BoundShareInput, BoundSource,
29    BoundSystemTable, BoundWatermark, BoundWindowTableFunction, Relation, WindowTableFunctionKind,
30};
31use crate::error::{ErrorCode, Result};
32use crate::expr::{CastContext, Expr, ExprImpl, ExprType, FunctionCall, InputRef, Literal};
33use crate::optimizer::plan_node::generic::SourceNodeKind;
34use crate::optimizer::plan_node::{
35    LogicalApply, LogicalGapFill, LogicalHopWindow, LogicalJoin, LogicalPlanRef as PlanRef,
36    LogicalProject, LogicalScan, LogicalShare, LogicalSource, LogicalSysScan, LogicalTableFunction,
37    LogicalValues,
38};
39use crate::optimizer::property::Cardinality;
40use crate::planner::{PlanFor, Planner};
41use crate::utils::Condition;
42
43const ERROR_WINDOW_SIZE_ARG: &str =
44    "The size arg of window table function should be an interval literal.";
45
46impl Planner {
47    pub fn plan_relation(&mut self, relation: Relation) -> Result<PlanRef> {
48        match relation {
49            Relation::BaseTable(t) => self.plan_base_table(&t),
50            Relation::SystemTable(st) => self.plan_sys_table(*st),
51            // TODO: order is ignored in the subquery
52            Relation::Subquery(q) => Ok(self.plan_query(q.query)?.into_unordered_subplan()),
53            Relation::Join(join) => self.plan_join(*join),
54            Relation::Apply(join) => self.plan_apply(*join),
55            Relation::WindowTableFunction(tf) => self.plan_window_table_function(*tf),
56            Relation::Source(s) => self.plan_source(*s),
57            Relation::TableFunction {
58                expr: tf,
59                with_ordinality,
60            } => self.plan_table_function(tf, with_ordinality),
61            Relation::Watermark(tf) => self.plan_watermark(*tf),
62            Relation::Share(share) => self.plan_share(*share),
63            Relation::GapFill(bound_gap_fill) => self.plan_gap_fill(*bound_gap_fill),
64        }
65    }
66
67    pub(crate) fn plan_sys_table(&mut self, sys_table: BoundSystemTable) -> Result<PlanRef> {
68        Ok(LogicalSysScan::create(
69            sys_table.sys_table_catalog,
70            self.ctx(),
71            Cardinality::unknown(), // TODO(card): cardinality of system table
72        )
73        .into())
74    }
75
76    pub(super) fn plan_base_table(&mut self, base_table: &BoundBaseTable) -> Result<PlanRef> {
77        let as_of = base_table.as_of.clone();
78        let scan = LogicalScan::from_base_table(base_table, self.ctx(), as_of.clone());
79
80        match base_table.table_catalog.engine {
81            Engine::Hummock => {
82                match as_of {
83                    None
84                    | Some(AsOf::ProcessTime)
85                    | Some(AsOf::TimestampNum(_))
86                    | Some(AsOf::TimestampString(_))
87                    | Some(AsOf::ProcessTimeWithInterval(_)) => {}
88                    Some(AsOf::VersionNum(_)) | Some(AsOf::VersionString(_)) => {
89                        bail_not_implemented!("As Of Version is not supported yet.")
90                    }
91                };
92                Ok(scan.into())
93            }
94            Engine::Iceberg => {
95                let is_append_only = base_table.table_catalog.append_only;
96                let use_iceberg_source = match (self.plan_for(), is_append_only) {
97                    (PlanFor::StreamIcebergEngineInternal, _) => false,
98                    (PlanFor::BatchDql, _) => true,
99                    (PlanFor::Stream | PlanFor::Batch, is_append_only) => is_append_only,
100                };
101
102                if !use_iceberg_source {
103                    match as_of {
104                        None
105                        | Some(AsOf::VersionNum(_))
106                        | Some(AsOf::TimestampString(_))
107                        | Some(AsOf::TimestampNum(_)) => {}
108                        Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
109                            bail_not_implemented!("As Of ProcessTime() is not supported yet.")
110                        }
111                        Some(AsOf::VersionString(_)) => {
112                            bail_not_implemented!("As Of Version is not supported yet.")
113                        }
114                    }
115                    Ok(scan.into())
116                } else {
117                    match as_of {
118                        None
119                        | Some(AsOf::VersionNum(_))
120                        | Some(AsOf::TimestampString(_))
121                        | Some(AsOf::TimestampNum(_)) => {}
122                        Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
123                            bail_not_implemented!("As Of ProcessTime() is not supported yet.")
124                        }
125                        Some(AsOf::VersionString(_)) => {
126                            bail_not_implemented!("As Of Version is not supported yet.")
127                        }
128                    }
129                    let opt_ctx = self.ctx();
130                    let session = opt_ctx.session_ctx();
131                    let db_name = &session.database();
132                    let catalog_reader = session.env().catalog_reader().read_guard();
133                    let mut source_catalog = None;
134                    for schema in catalog_reader.iter_schemas(db_name).unwrap() {
135                        if schema
136                            .get_table_by_id(base_table.table_catalog.id)
137                            .is_some()
138                        {
139                            source_catalog = schema.get_source_by_name(
140                                &base_table.table_catalog.iceberg_source_name().unwrap(),
141                            );
142                            break;
143                        }
144                    }
145                    if let Some(source_catalog) = source_catalog {
146                        let column_map: HashMap<String, (usize, ColumnCatalog)> = source_catalog
147                            .columns
148                            .clone()
149                            .into_iter()
150                            .enumerate()
151                            .map(|(i, column)| (column.name().to_owned(), (i, column)))
152                            .collect();
153                        let exprs = scan
154                            .table()
155                            .column_schema()
156                            .fields()
157                            .iter()
158                            .map(|field| {
159                                let source_filed_name = if field.name == ROW_ID_COLUMN_NAME {
160                                    RISINGWAVE_ICEBERG_ROW_ID
161                                } else {
162                                    &field.name
163                                };
164                                if let Some((i, source_column)) = column_map.get(source_filed_name)
165                                {
166                                    if source_column.column_desc.data_type == field.data_type {
167                                        ExprImpl::InputRef(
168                                            InputRef::new(*i, field.data_type.clone()).into(),
169                                        )
170                                    } else {
171                                        let mut input_ref = ExprImpl::InputRef(
172                                            InputRef::new(
173                                                *i,
174                                                source_column.column_desc.data_type.clone(),
175                                            )
176                                            .into(),
177                                        );
178                                        FunctionCall::cast_mut(
179                                            &mut input_ref,
180                                            &field.data_type(),
181                                            CastContext::Explicit,
182                                        )
183                                        .unwrap();
184                                        input_ref
185                                    }
186                                } else {
187                                    // fields like `_rw_timestamp`, would not be found in source.
188                                    ExprImpl::Literal(
189                                        Literal::new(None, field.data_type.clone()).into(),
190                                    )
191                                }
192                            })
193                            .collect_vec();
194                        let logical_source = LogicalSource::with_catalog(
195                            Rc::new(source_catalog.deref().clone()),
196                            SourceNodeKind::CreateMViewOrBatch,
197                            self.ctx(),
198                            as_of,
199                        )?;
200                        Ok(LogicalProject::new(logical_source.into(), exprs).into())
201                    } else {
202                        bail!(
203                            "failed to plan a iceberg engine table: {}. Can't find the corresponding iceberg source. Maybe you need to recreate the table",
204                            base_table.table_catalog.name()
205                        );
206                    }
207                }
208            }
209        }
210    }
211
212    pub(super) fn plan_source(&mut self, source: BoundSource) -> Result<PlanRef> {
213        if source.is_shareable_cdc_connector() {
214            Err(ErrorCode::InternalError(
215                "Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source. HINT: create TABLE from the source instead.".to_owned(),
216            )
217            .into())
218        } else {
219            let as_of = source.as_of.clone();
220            match as_of {
221                None
222                | Some(AsOf::VersionNum(_))
223                | Some(AsOf::TimestampString(_))
224                | Some(AsOf::TimestampNum(_)) => {}
225                Some(AsOf::ProcessTime) | Some(AsOf::ProcessTimeWithInterval(_)) => {
226                    bail_not_implemented!("As Of ProcessTime() is not supported yet.")
227                }
228                Some(AsOf::VersionString(_)) => {
229                    bail_not_implemented!("As Of Version is not supported yet.")
230                }
231            }
232
233            // validate the source has pk. We raise an error here to avoid panic in expect_stream_key later
234            // for a nicer error message.
235            if matches!(self.plan_for(), PlanFor::Stream) {
236                let has_pk =
237                    source.catalog.row_id_index.is_some() || !source.catalog.pk_col_ids.is_empty();
238                if !has_pk {
239                    // in older version, iceberg source doesn't have row_id, thus may hit this
240                    let is_iceberg = source.catalog.is_iceberg_connector();
241                    // only iceberg should hit this.
242                    debug_assert!(is_iceberg);
243                    if is_iceberg {
244                        return Err(ErrorCode::BindError(format!(
245                        "Cannot create a stream job from an iceberg source without a primary key.\nThe iceberg source might be created in an older version of RisingWave. Please try recreating the source.\nSource: {:?}",
246                        source.catalog
247                    ))
248                    .into());
249                    } else {
250                        return Err(ErrorCode::BindError(format!(
251                            "Cannot create a stream job from a source without a primary key.
252This is a bug. We would appreciate a bug report at:
253https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Fbug&template=bug_report.yml
254
255source: {:?}",
256                            source.catalog
257                        ))
258                        .into());
259                    }
260                }
261            }
262            Ok(LogicalSource::with_catalog(
263                Rc::new(source.catalog),
264                SourceNodeKind::CreateMViewOrBatch,
265                self.ctx(),
266                as_of,
267            )?
268            .into())
269        }
270    }
271
272    pub(super) fn plan_join(&mut self, join: BoundJoin) -> Result<PlanRef> {
273        let left = self.plan_relation(join.left)?;
274        let right = self.plan_relation(join.right)?;
275        let join_type = join.join_type;
276        let on_clause = join.cond;
277        if on_clause.has_subquery() {
278            bail_not_implemented!("Subquery in join on condition");
279        } else {
280            Ok(LogicalJoin::create(left, right, join_type, on_clause))
281        }
282    }
283
284    pub(super) fn plan_apply(&mut self, mut join: BoundJoin) -> Result<PlanRef> {
285        let join_type = join.join_type;
286        let on_clause = join.cond;
287        if on_clause.has_subquery() {
288            bail_not_implemented!("Subquery in join on condition");
289        }
290
291        let correlated_id = self.ctx.next_correlated_id();
292        let correlated_indices = join
293            .right
294            .collect_correlated_indices_by_depth_and_assign_id(0, correlated_id);
295        let left = self.plan_relation(join.left)?;
296        let right = self.plan_relation(join.right)?;
297
298        Ok(LogicalApply::create(
299            left,
300            right,
301            join_type,
302            Condition::with_expr(on_clause),
303            correlated_id,
304            correlated_indices,
305            false,
306        ))
307    }
308
309    pub(super) fn plan_window_table_function(
310        &mut self,
311        table_function: BoundWindowTableFunction,
312    ) -> Result<PlanRef> {
313        use WindowTableFunctionKind::*;
314        match table_function.kind {
315            Tumble => self.plan_tumble_window(
316                table_function.input,
317                table_function.time_col,
318                table_function.args,
319            ),
320            Hop => self.plan_hop_window(
321                table_function.input,
322                table_function.time_col,
323                table_function.args,
324            ),
325        }
326    }
327
328    pub(super) fn plan_table_function(
329        &mut self,
330        table_function: ExprImpl,
331        with_ordinality: bool,
332    ) -> Result<PlanRef> {
333        // TODO: maybe we can unify LogicalTableFunction with LogicalValues
334        match table_function {
335            ExprImpl::TableFunction(tf) => {
336                Ok(LogicalTableFunction::new(*tf, with_ordinality, self.ctx()).into())
337            }
338            expr => {
339                let schema = Schema {
340                    // TODO: should be named
341                    fields: vec![Field::unnamed(expr.return_type())],
342                };
343                let expr_return_type = expr.return_type();
344                let root = LogicalValues::create(vec![vec![expr]], schema, self.ctx());
345                let input_ref = ExprImpl::from(InputRef::new(0, expr_return_type.clone()));
346                let mut exprs = if let DataType::Struct(st) = expr_return_type {
347                    st.iter()
348                        .enumerate()
349                        .map(|(i, (_, ty))| {
350                            let idx = ExprImpl::literal_int(i.try_into().unwrap());
351                            let args = vec![input_ref.clone(), idx];
352                            FunctionCall::new_unchecked(ExprType::Field, args, ty.clone()).into()
353                        })
354                        .collect()
355                } else {
356                    vec![input_ref]
357                };
358                if with_ordinality {
359                    exprs.push(ExprImpl::literal_bigint(1));
360                }
361                Ok(LogicalProject::create(root, exprs))
362            }
363        }
364    }
365
366    pub(super) fn plan_share(&mut self, share: BoundShare) -> Result<PlanRef> {
367        match share.input {
368            BoundShareInput::Query(query) => {
369                let id = share.share_id;
370                match self.share_cache.get(&id) {
371                    None => {
372                        let result = self.plan_query(query)?.into_unordered_subplan();
373                        let logical_share = LogicalShare::create(result);
374                        self.share_cache.insert(id, logical_share.clone());
375                        Ok(logical_share)
376                    }
377                    Some(result) => Ok(result.clone()),
378                }
379            }
380            BoundShareInput::ChangeLog(relation) => {
381                let id = share.share_id;
382                let result = self.plan_changelog(relation)?;
383                let logical_share = LogicalShare::create(result);
384                self.share_cache.insert(id, logical_share.clone());
385                Ok(logical_share)
386            }
387        }
388    }
389
390    pub(super) fn plan_watermark(&mut self, _watermark: BoundWatermark) -> Result<PlanRef> {
391        todo!("plan watermark");
392    }
393
394    pub(super) fn plan_gap_fill(&mut self, gap_fill: BoundGapFill) -> Result<PlanRef> {
395        let input = self.plan_relation(gap_fill.input)?;
396        Ok(LogicalGapFill::new(
397            input,
398            gap_fill.time_col,
399            gap_fill.interval,
400            gap_fill.fill_strategies,
401        )
402        .into())
403    }
404
405    fn collect_col_data_types_for_tumble_window(relation: &Relation) -> Result<Vec<DataType>> {
406        let col_data_types = match relation {
407            Relation::Source(s) => s
408                .catalog
409                .columns
410                .iter()
411                .map(|col| col.data_type().clone())
412                .collect(),
413            Relation::BaseTable(t) => t
414                .table_catalog
415                .columns
416                .iter()
417                .map(|col| col.data_type().clone())
418                .collect(),
419            Relation::Subquery(q) => q.query.schema().data_types(),
420            Relation::Share(share) => share
421                .input
422                .fields()?
423                .into_iter()
424                .map(|(_, f)| f.data_type)
425                .collect(),
426            r => {
427                return Err(ErrorCode::BindError(format!(
428                    "Invalid input relation to tumble: {r:?}"
429                ))
430                .into());
431            }
432        };
433        Ok(col_data_types)
434    }
435
436    fn plan_tumble_window(
437        &mut self,
438        input: Relation,
439        time_col: InputRef,
440        args: Vec<ExprImpl>,
441    ) -> Result<PlanRef> {
442        let mut args = args.into_iter();
443        let col_data_types: Vec<_> = Self::collect_col_data_types_for_tumble_window(&input)?;
444
445        match (args.next(), args.next(), args.next()) {
446            (Some(window_size @ ExprImpl::Literal(_)), None, None) => {
447                let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
448                for (idx, col_dt) in col_data_types.iter().enumerate() {
449                    exprs.push(InputRef::new(idx, col_dt.clone()).into());
450                }
451                let window_start: ExprImpl = FunctionCall::new(
452                    ExprType::TumbleStart,
453                    vec![ExprImpl::InputRef(Box::new(time_col)), window_size.clone()],
454                )?
455                .into();
456                // TODO: `window_end` may be optimized to avoid double calculation of
457                // `tumble_start`, or we can depends on common expression
458                // optimization.
459                let window_end =
460                    FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
461                        .into();
462                exprs.push(window_start);
463                exprs.push(window_end);
464                let base = self.plan_relation(input)?;
465                let project = LogicalProject::create(base, exprs);
466                Ok(project)
467            }
468            (
469                Some(window_size @ ExprImpl::Literal(_)),
470                Some(window_offset @ ExprImpl::Literal(_)),
471                None,
472            ) => {
473                let mut exprs = Vec::with_capacity(col_data_types.len() + 2);
474                for (idx, col_dt) in col_data_types.iter().enumerate() {
475                    exprs.push(InputRef::new(idx, col_dt.clone()).into());
476                }
477                let window_start: ExprImpl = FunctionCall::new(
478                    ExprType::TumbleStart,
479                    vec![
480                        ExprImpl::InputRef(Box::new(time_col)),
481                        window_size.clone(),
482                        window_offset,
483                    ],
484                )?
485                .into();
486                // TODO: `window_end` may be optimized to avoid double calculation of
487                // `tumble_start`, or we can depends on common expression
488                // optimization.
489                let window_end =
490                    FunctionCall::new(ExprType::Add, vec![window_start.clone(), window_size])?
491                        .into();
492                exprs.push(window_start);
493                exprs.push(window_end);
494                let base = self.plan_relation(input)?;
495                let project = LogicalProject::create(base, exprs);
496                Ok(project)
497            }
498            _ => Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
499        }
500    }
501
502    fn plan_hop_window(
503        &mut self,
504        input: Relation,
505        time_col: InputRef,
506        args: Vec<ExprImpl>,
507    ) -> Result<PlanRef> {
508        let input = self.plan_relation(input)?;
509        let mut args = args.into_iter();
510        let Some((ExprImpl::Literal(window_slide), ExprImpl::Literal(window_size))) =
511            args.next_tuple()
512        else {
513            return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
514        };
515
516        let Some(ScalarImpl::Interval(window_slide)) = *window_slide.get_data() else {
517            return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
518        };
519        let Some(ScalarImpl::Interval(window_size)) = *window_size.get_data() else {
520            return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into());
521        };
522
523        let window_offset = match (args.next(), args.next()) {
524            (Some(ExprImpl::Literal(window_offset)), None) => match *window_offset.get_data() {
525                Some(ScalarImpl::Interval(window_offset)) => window_offset,
526                _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
527            },
528            (None, None) => Interval::from_month_day_usec(0, 0, 0),
529            _ => return Err(ErrorCode::BindError(ERROR_WINDOW_SIZE_ARG.to_owned()).into()),
530        };
531
532        if !window_size.is_positive() || !window_slide.is_positive() {
533            return Err(ErrorCode::BindError(format!(
534                "window_size {} and window_slide {} must be positive",
535                window_size, window_slide
536            ))
537            .into());
538        }
539
540        if window_size.exact_div(&window_slide).is_none() {
541            return Err(ErrorCode::BindError(format!("Invalid arguments for HOP window function: window_size {} cannot be divided by window_slide {}",window_size, window_slide)).into());
542        }
543
544        Ok(LogicalHopWindow::create(
545            input,
546            time_col,
547            window_slide,
548            window_size,
549            window_offset,
550        ))
551    }
552}