risingwave_frontend/optimizer/plan_node/
predicate_pushdown.rs1use std::collections::HashMap;
16
17use paste::paste;
18
19use super::*;
20use crate::optimizer::PlanVisitor;
21use crate::optimizer::plan_visitor::ShareParentCounter;
22use crate::{for_batch_plan_nodes, for_stream_plan_nodes};
23
24pub trait PredicatePushdown {
27 fn predicate_pushdown(
43 &self,
44 predicate: Condition,
45 ctx: &mut PredicatePushdownContext,
46 ) -> PlanRef;
47}
48
49macro_rules! ban_predicate_pushdown {
50 ($( { $convention:ident, $name:ident }),*) => {
51 paste!{
52 $(impl PredicatePushdown for [<$convention $name>] {
53 fn predicate_pushdown(&self, _predicate: Condition, _ctx: &mut PredicatePushdownContext) -> PlanRef {
54 unreachable!("predicate pushdown is only allowed on logical plan")
55 }
56 })*
57 }
58 }
59}
60for_batch_plan_nodes! {ban_predicate_pushdown}
61for_stream_plan_nodes! {ban_predicate_pushdown}
62
63#[inline]
64pub fn gen_filter_and_pushdown<T: PlanTreeNodeUnary + PlanNode>(
65 node: &T,
66 filter_predicate: Condition,
67 pushed_predicate: Condition,
68 ctx: &mut PredicatePushdownContext,
69) -> PlanRef {
70 let new_input = node.input().predicate_pushdown(pushed_predicate, ctx);
71 let new_node = node.clone_with_input(new_input);
72 LogicalFilter::create(new_node.into(), filter_predicate)
73}
74
75#[derive(Debug, Clone)]
76pub struct PredicatePushdownContext {
77 share_predicate_map: HashMap<PlanNodeId, Vec<Condition>>,
78 share_parent_counter: ShareParentCounter,
79}
80
81impl PredicatePushdownContext {
82 pub fn new(root: PlanRef) -> Self {
83 let mut share_parent_counter = ShareParentCounter::default();
84 share_parent_counter.visit(root);
85 Self {
86 share_predicate_map: Default::default(),
87 share_parent_counter,
88 }
89 }
90
91 pub fn get_parent_num(&self, share: &LogicalShare) -> usize {
92 self.share_parent_counter.get_parent_num(share)
93 }
94
95 pub fn add_predicate(&mut self, plan_node_id: PlanNodeId, predicate: Condition) -> usize {
96 self.share_predicate_map
97 .entry(plan_node_id)
98 .and_modify(|e| e.push(predicate.clone()))
99 .or_insert_with(|| vec![predicate])
100 .len()
101 }
102
103 pub fn take_predicate(&mut self, plan_node_id: PlanNodeId) -> Option<Vec<Condition>> {
104 self.share_predicate_map.remove(&plan_node_id)
105 }
106}