risingwave_frontend/optimizer/rule/
table_function_to_project_set_rule.rs1use itertools::Itertools;
16use risingwave_common::catalog::Schema;
17use risingwave_common::types::DataType;
18
19use super::prelude::{PlanRef, *};
20use crate::expr::{Expr, ExprImpl, ExprType, FunctionCall, InputRef};
21use crate::optimizer::plan_node::generic::GenericPlanRef;
22use crate::optimizer::plan_node::{
23 LogicalProject, LogicalProjectSet, LogicalTableFunction, LogicalValues, PlanTreeNodeUnary,
24};
25
26pub struct TableFunctionToProjectSetRule {}
45impl Rule<Logical> for TableFunctionToProjectSetRule {
46 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
47 let logical_table_function: &LogicalTableFunction = plan.as_logical_table_function()?;
48 let table_function =
49 ExprImpl::TableFunction(logical_table_function.table_function().clone().into());
50 let table_function_return_type = table_function.return_type();
51 let logical_values = LogicalValues::create(
52 vec![vec![]],
53 Schema::new(vec![]),
54 logical_table_function.base.ctx().clone(),
55 );
56 let logical_project_set = LogicalProjectSet::create(logical_values, vec![table_function]);
57 let table_function_col_idx = 1;
61 let logical_project = if let DataType::Struct(st) = table_function_return_type.clone() {
62 let exprs = st
63 .types()
64 .enumerate()
65 .map(|(i, data_type)| {
66 let field_access = FunctionCall::new_unchecked(
67 ExprType::Field,
68 vec![
69 InputRef::new(
70 table_function_col_idx,
71 table_function_return_type.clone(),
72 )
73 .into(),
74 ExprImpl::literal_int(i as i32),
75 ],
76 data_type.clone(),
77 );
78 ExprImpl::FunctionCall(field_access.into())
79 })
80 .collect_vec();
81 LogicalProject::new(logical_project_set, exprs)
82 } else {
83 LogicalProject::with_out_col_idx(
84 logical_project_set,
85 std::iter::once(table_function_col_idx),
86 )
87 };
88
89 if logical_table_function.with_ordinality {
90 let projected_row_id = InputRef::new(0, DataType::Int64).into();
91 let ordinality = FunctionCall::new(
92 ExprType::Add,
93 vec![projected_row_id, ExprImpl::literal_bigint(1)],
94 )
95 .unwrap() .into();
97 let mut exprs = logical_project.exprs().clone();
98 exprs.push(ordinality);
99 let logical_project = LogicalProject::new(logical_project.input(), exprs);
100 Some(logical_project.into())
101 } else {
102 Some(logical_project.into())
103 }
104 }
105}
106
107impl TableFunctionToProjectSetRule {
108 pub fn create() -> BoxedRule {
109 Box::new(TableFunctionToProjectSetRule {})
110 }
111}