risingwave_frontend/optimizer/plan_node/
logical_recursive_union.rs1use itertools::Itertools;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::bail_not_implemented;
18use risingwave_common::util::column_index_mapping::ColIndexMapping;
19use smallvec::{SmallVec, smallvec};
20
21use super::expr_visitable::ExprVisitable;
22use super::generic::GenericPlanRef;
23use super::utils::{Distill, childless_record};
24use super::{
25 ColPrunable, ColumnPruningContext, ExprRewritable, Logical, PlanBase, PlanTreeNode,
26 PredicatePushdown, PredicatePushdownContext, RewriteStreamContext, ToBatch, ToStream,
27 ToStreamContext, generic,
28};
29use crate::PlanRef;
30use crate::binder::ShareId;
31use crate::error::Result;
32use crate::utils::Condition;
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct LogicalRecursiveUnion {
38 pub base: PlanBase<Logical>,
39 core: generic::RecursiveUnion<PlanRef>,
40}
41
42impl LogicalRecursiveUnion {
43 pub fn new(base_plan: PlanRef, recursive: PlanRef, id: ShareId) -> Self {
44 let core = generic::RecursiveUnion {
45 base: base_plan,
46 recursive,
47 id,
48 };
49 let base = PlanBase::new_logical_with_core(&core);
50 LogicalRecursiveUnion { base, core }
51 }
52
53 pub fn create(base_plan: PlanRef, recursive: PlanRef, id: ShareId) -> PlanRef {
54 Self::new(base_plan, recursive, id).into()
55 }
56
57 pub(super) fn pretty_fields(base: impl GenericPlanRef, name: &str) -> XmlNode<'_> {
58 childless_record(name, vec![("id", Pretty::debug(&base.id().0))])
59 }
60}
61
62impl PlanTreeNode for LogicalRecursiveUnion {
63 fn inputs(&self) -> SmallVec<[PlanRef; 2]> {
64 smallvec![self.core.base.clone(), self.core.recursive.clone()]
65 }
66
67 fn clone_with_inputs(&self, inputs: &[PlanRef]) -> PlanRef {
68 let mut inputs = inputs.iter().cloned();
69 Self::create(inputs.next().unwrap(), inputs.next().unwrap(), self.core.id)
70 }
71}
72
73impl Distill for LogicalRecursiveUnion {
74 fn distill<'a>(&self) -> XmlNode<'a> {
75 Self::pretty_fields(&self.base, "LogicalRecursiveUnion")
76 }
77}
78
79impl ColPrunable for LogicalRecursiveUnion {
80 fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
81 let new_inputs = self
82 .inputs()
83 .iter()
84 .map(|input| input.prune_col(required_cols, ctx))
85 .collect_vec();
86 let new_plan = self.clone_with_inputs(&new_inputs);
87 self.ctx()
88 .insert_rcte_cache_plan(self.core.id, new_plan.clone());
89 new_plan
90 }
91}
92
93impl ExprRewritable for LogicalRecursiveUnion {}
94
95impl ExprVisitable for LogicalRecursiveUnion {}
96
97impl PredicatePushdown for LogicalRecursiveUnion {
98 fn predicate_pushdown(
99 &self,
100 predicate: Condition,
101 ctx: &mut PredicatePushdownContext,
102 ) -> PlanRef {
103 let new_inputs = self
104 .inputs()
105 .iter()
106 .map(|input| input.predicate_pushdown(predicate.clone(), ctx))
107 .collect_vec();
108 let new_plan = self.clone_with_inputs(&new_inputs);
109 self.ctx()
110 .insert_rcte_cache_plan(self.core.id, new_plan.clone());
111 new_plan
112 }
113}
114
115impl ToBatch for LogicalRecursiveUnion {
116 fn to_batch(&self) -> Result<PlanRef> {
117 bail_not_implemented!(
118 issue = 15135,
119 "recursive CTE not supported for to_batch of LogicalRecursiveUnion"
120 )
121 }
122}
123
124impl ToStream for LogicalRecursiveUnion {
125 fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
126 bail_not_implemented!(
127 issue = 15135,
128 "recursive CTE not supported for to_stream of LogicalRecursiveUnion"
129 )
130 }
131
132 fn logical_rewrite_for_stream(
133 &self,
134 _ctx: &mut RewriteStreamContext,
135 ) -> Result<(PlanRef, ColIndexMapping)> {
136 bail_not_implemented!(
137 issue = 15135,
138 "recursive CTE not supported for logical_rewrite_for_stream of LogicalRecursiveUnion"
139 )
140 }
141}