risingwave_frontend/optimizer/rule/
table_function_to_project_set_rule.rs1use itertools::Itertools;
16use risingwave_common::catalog::Schema;
17use risingwave_common::types::DataType;
18
19use super::{BoxedRule, Rule};
20use crate::expr::{Expr, ExprImpl, ExprType, FunctionCall, InputRef};
21use crate::optimizer::PlanRef;
22use crate::optimizer::plan_node::generic::GenericPlanRef;
23use crate::optimizer::plan_node::{
24 LogicalProject, LogicalProjectSet, LogicalTableFunction, LogicalValues, PlanTreeNodeUnary,
25};
26
27pub struct TableFunctionToProjectSetRule {}
46impl Rule for TableFunctionToProjectSetRule {
47 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
48 let logical_table_function: &LogicalTableFunction = plan.as_logical_table_function()?;
49 let table_function =
50 ExprImpl::TableFunction(logical_table_function.table_function().clone().into());
51 let table_function_return_type = table_function.return_type();
52 let logical_values = LogicalValues::create(
53 vec![vec![]],
54 Schema::new(vec![]),
55 logical_table_function.base.ctx().clone(),
56 );
57 let logical_project_set = LogicalProjectSet::create(logical_values, vec![table_function]);
58 let table_function_col_idx = 1;
62 let logical_project = if let DataType::Struct(st) = table_function_return_type.clone() {
63 let exprs = st
64 .types()
65 .enumerate()
66 .map(|(i, data_type)| {
67 let field_access = FunctionCall::new_unchecked(
68 ExprType::Field,
69 vec![
70 InputRef::new(
71 table_function_col_idx,
72 table_function_return_type.clone(),
73 )
74 .into(),
75 ExprImpl::literal_int(i as i32),
76 ],
77 data_type.clone(),
78 );
79 ExprImpl::FunctionCall(field_access.into())
80 })
81 .collect_vec();
82 LogicalProject::new(logical_project_set, exprs)
83 } else {
84 LogicalProject::with_out_col_idx(
85 logical_project_set,
86 std::iter::once(table_function_col_idx),
87 )
88 };
89
90 if logical_table_function.with_ordinality {
91 let projected_row_id = InputRef::new(0, DataType::Int64).into();
92 let ordinality = FunctionCall::new(
93 ExprType::Add,
94 vec![projected_row_id, ExprImpl::literal_bigint(1)],
95 )
96 .unwrap() .into();
98 let mut exprs = logical_project.exprs().clone();
99 exprs.push(ordinality);
100 let logical_project = LogicalProject::new(logical_project.input(), exprs);
101 Some(logical_project.into())
102 } else {
103 Some(logical_project.into())
104 }
105 }
106}
107
108impl TableFunctionToProjectSetRule {
109 pub fn create() -> BoxedRule {
110 Box::new(TableFunctionToProjectSetRule {})
111 }
112}