risingwave_frontend/optimizer/rule/stream/
stream_project_merge_rule.rs1use super::prelude::*;
16use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor};
17use crate::optimizer::plan_expr_visitor::InputRefCounter;
18use crate::optimizer::plan_node::{PlanTreeNodeUnary, StreamProject, generic};
19use crate::utils::Substitute;
20
21pub struct StreamProjectMergeRule {}
23impl Rule<Stream> for StreamProjectMergeRule {
24 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
25 let outer_project = plan.as_stream_project()?;
26 let input = outer_project.input();
27 let inner_project = input.as_stream_project()?;
28
29 let mut input_ref_counter = InputRefCounter::default();
30 for expr in outer_project.exprs() {
31 input_ref_counter.visit_expr(expr);
32 }
33 for (index, count) in &input_ref_counter.counter {
35 if *count > 1 && matches!(inner_project.exprs()[*index], ExprImpl::FunctionCall(_)) {
36 return None;
37 }
38 }
39
40 let mut subst = Substitute {
41 mapping: inner_project.exprs().clone(),
42 };
43 let exprs = outer_project
44 .exprs()
45 .iter()
46 .cloned()
47 .map(|expr| subst.rewrite_expr(expr))
48 .collect();
49 let logical_project = generic::Project::new(exprs, inner_project.input());
50
51 let noop_update_hint = outer_project.noop_update_hint() || inner_project.noop_update_hint();
53
54 Some(
55 StreamProject::new(logical_project)
56 .with_noop_update_hint(noop_update_hint)
57 .into(),
58 )
59 }
60}
61
62impl StreamProjectMergeRule {
63 pub fn create() -> BoxedRule {
64 Box::new(StreamProjectMergeRule {})
65 }
66}