risingwave_frontend/optimizer/plan_node/
logical_table_function.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::catalog::{Field, Schema};
17use risingwave_common::types::DataType;
18
19use super::utils::{Distill, childless_record};
20use super::{
21 ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalProject, PlanBase, PlanRef,
22 PredicatePushdown, ToBatch, ToStream,
23};
24use crate::error::Result;
25use crate::expr::{Expr, ExprRewriter, ExprVisitor, TableFunction};
26use crate::optimizer::optimizer_context::OptimizerContextRef;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::plan_node::{
29 ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
30};
31use crate::optimizer::property::FunctionalDependencySet;
32use crate::utils::{ColIndexMapping, Condition};
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
38pub struct LogicalTableFunction {
39 pub base: PlanBase<Logical>,
40 pub table_function: TableFunction,
41 pub with_ordinality: bool,
42}
43
44impl LogicalTableFunction {
45 pub fn new(
47 table_function: TableFunction,
48 with_ordinality: bool,
49 ctx: OptimizerContextRef,
50 ) -> Self {
51 let mut schema = if let DataType::Struct(s) = table_function.return_type() {
52 Schema::from(&s)
54 } else {
55 Schema {
56 fields: vec![Field::with_name(
57 table_function.return_type(),
58 table_function.name(),
59 )],
60 }
61 };
62 if with_ordinality {
63 schema
64 .fields
65 .push(Field::with_name(DataType::Int64, "ordinality"));
66 }
67 let functional_dependency = FunctionalDependencySet::new(schema.len());
68 let base = PlanBase::new_logical(ctx, schema, None, functional_dependency);
69 Self {
70 base,
71 table_function,
72 with_ordinality,
73 }
74 }
75
76 pub fn table_function(&self) -> &TableFunction {
77 &self.table_function
78 }
79}
80
81impl_plan_tree_node_for_leaf! { LogicalTableFunction }
82
83impl Distill for LogicalTableFunction {
84 fn distill<'a>(&self) -> XmlNode<'a> {
85 let data = Pretty::debug(&self.table_function);
86 childless_record("LogicalTableFunction", vec![("table_function", data)])
87 }
88}
89
90impl ColPrunable for LogicalTableFunction {
91 fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
92 LogicalProject::with_out_col_idx(self.clone().into(), required_cols.iter().copied()).into()
94 }
95}
96
97impl ExprRewritable for LogicalTableFunction {
98 fn has_rewritable_expr(&self) -> bool {
99 true
100 }
101
102 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
103 let mut new = self.clone();
104 new.table_function.args = new
105 .table_function
106 .args
107 .into_iter()
108 .map(|e| r.rewrite_expr(e))
109 .collect();
110 new.base = self.base.clone_with_new_plan_id();
111 new.into()
112 }
113}
114
115impl ExprVisitable for LogicalTableFunction {
116 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
117 self.table_function
118 .args
119 .iter()
120 .for_each(|e| v.visit_expr(e));
121 }
122}
123
124impl PredicatePushdown for LogicalTableFunction {
125 fn predicate_pushdown(
126 &self,
127 predicate: Condition,
128 _ctx: &mut PredicatePushdownContext,
129 ) -> PlanRef {
130 LogicalFilter::create(self.clone().into(), predicate)
131 }
132}
133
134impl ToBatch for LogicalTableFunction {
135 fn to_batch(&self) -> Result<PlanRef> {
136 unreachable!("TableFunction should be converted to ProjectSet")
137 }
138}
139
140impl ToStream for LogicalTableFunction {
141 fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
142 unreachable!("TableFunction should be converted to ProjectSet")
143 }
144
145 fn logical_rewrite_for_stream(
146 &self,
147 _ctx: &mut RewriteStreamContext,
148 ) -> Result<(PlanRef, ColIndexMapping)> {
149 unreachable!("TableFunction should be converted to ProjectSet")
150 }
151}