risingwave_frontend/optimizer/rule/stream/
stream_project_merge_rule.rs1use super::prelude::*;
16use crate::expr::{ExprImpl, ExprVisitor};
17use crate::optimizer::plan_expr_visitor::InputRefCounter;
18use crate::optimizer::plan_node::{PlanTreeNodeUnary, StreamProject};
19use crate::utils::Substitute;
20
21pub struct StreamProjectMergeRule {}
23impl Rule<Stream> for StreamProjectMergeRule {
24 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
25 let outer_project = plan.as_stream_project()?;
26 let input = outer_project.input();
27 let inner_project = input.as_stream_project()?;
28
29 let mut input_ref_counter = InputRefCounter::default();
30 for expr in outer_project.exprs() {
31 input_ref_counter.visit_expr(expr);
32 }
33 for (index, count) in &input_ref_counter.counter {
35 if *count > 1 && matches!(inner_project.exprs()[*index], ExprImpl::FunctionCall(_)) {
36 return None;
37 }
38 }
39
40 let mut core = outer_project.core().clone();
41 core.rewrite_exprs(&mut Substitute {
42 mapping: inner_project.exprs().clone(),
43 });
44 core.input = inner_project.input();
45
46 let noop_update_hint = outer_project.noop_update_hint() || inner_project.noop_update_hint();
48
49 Some(
50 StreamProject::new(core)
51 .with_noop_update_hint(noop_update_hint)
52 .into(),
53 )
54 }
55}
56
57impl StreamProjectMergeRule {
58 pub fn create() -> BoxedRule {
59 Box::new(StreamProjectMergeRule {})
60 }
61}