risingwave_frontend/optimizer/plan_node/
logical_table_function.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::catalog::{Field, Schema};
17use risingwave_common::types::DataType;
18
19use super::utils::{Distill, childless_record};
20use super::{
21    ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalProject, PlanBase, PlanRef,
22    PredicatePushdown, ToBatch, ToStream,
23};
24use crate::error::Result;
25use crate::expr::{Expr, ExprRewriter, ExprVisitor, TableFunction};
26use crate::optimizer::optimizer_context::OptimizerContextRef;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::plan_node::{
29    ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
30};
31use crate::optimizer::property::FunctionalDependencySet;
32use crate::utils::{ColIndexMapping, Condition};
33
34/// `LogicalTableFunction` is a scalar/table function used as a relation (in the `FROM` clause).
35///
36/// If the function returns a struct, it will be flattened into multiple columns.
37#[derive(Debug, Clone, PartialEq, Eq, Hash)]
38pub struct LogicalTableFunction {
39    pub base: PlanBase<Logical>,
40    pub table_function: TableFunction,
41    pub with_ordinality: bool,
42}
43
44impl LogicalTableFunction {
45    /// Create a [`LogicalTableFunction`] node. Used internally by optimizer.
46    pub fn new(
47        table_function: TableFunction,
48        with_ordinality: bool,
49        ctx: OptimizerContextRef,
50    ) -> Self {
51        let mut schema = if let DataType::Struct(s) = table_function.return_type() {
52            // If the function returns a struct, it will be flattened into multiple columns.
53            Schema::from(&s)
54        } else {
55            Schema {
56                fields: vec![Field::with_name(
57                    table_function.return_type(),
58                    table_function.name(),
59                )],
60            }
61        };
62        if with_ordinality {
63            schema
64                .fields
65                .push(Field::with_name(DataType::Int64, "ordinality"));
66        }
67        let functional_dependency = FunctionalDependencySet::new(schema.len());
68        let base = PlanBase::new_logical(ctx, schema, None, functional_dependency);
69        Self {
70            base,
71            table_function,
72            with_ordinality,
73        }
74    }
75
76    pub fn table_function(&self) -> &TableFunction {
77        &self.table_function
78    }
79}
80
81impl_plan_tree_node_for_leaf! { LogicalTableFunction }
82
83impl Distill for LogicalTableFunction {
84    fn distill<'a>(&self) -> XmlNode<'a> {
85        let data = Pretty::debug(&self.table_function);
86        childless_record("LogicalTableFunction", vec![("table_function", data)])
87    }
88}
89
90impl ColPrunable for LogicalTableFunction {
91    fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
92        // No pruning.
93        LogicalProject::with_out_col_idx(self.clone().into(), required_cols.iter().copied()).into()
94    }
95}
96
97impl ExprRewritable for LogicalTableFunction {
98    fn has_rewritable_expr(&self) -> bool {
99        true
100    }
101
102    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
103        let mut new = self.clone();
104        new.table_function.args = new
105            .table_function
106            .args
107            .into_iter()
108            .map(|e| r.rewrite_expr(e))
109            .collect();
110        new.base = self.base.clone_with_new_plan_id();
111        new.into()
112    }
113}
114
115impl ExprVisitable for LogicalTableFunction {
116    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
117        self.table_function
118            .args
119            .iter()
120            .for_each(|e| v.visit_expr(e));
121    }
122}
123
124impl PredicatePushdown for LogicalTableFunction {
125    fn predicate_pushdown(
126        &self,
127        predicate: Condition,
128        _ctx: &mut PredicatePushdownContext,
129    ) -> PlanRef {
130        LogicalFilter::create(self.clone().into(), predicate)
131    }
132}
133
134impl ToBatch for LogicalTableFunction {
135    fn to_batch(&self) -> Result<PlanRef> {
136        unreachable!("TableFunction should be converted to ProjectSet")
137    }
138}
139
140impl ToStream for LogicalTableFunction {
141    fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
142        unreachable!("TableFunction should be converted to ProjectSet")
143    }
144
145    fn logical_rewrite_for_stream(
146        &self,
147        _ctx: &mut RewriteStreamContext,
148    ) -> Result<(PlanRef, ColIndexMapping)> {
149        unreachable!("TableFunction should be converted to ProjectSet")
150    }
151}