risingwave_frontend/optimizer/plan_node/
logical_dedup.rs1use fixedbitset::FixedBitSet;
16use itertools::Itertools;
17use risingwave_common::util::column_index_mapping::ColIndexMapping;
18
19use super::generic::{GenericPlanRef, TopNLimit};
20use super::utils::impl_distill_by_unit;
21use super::{
22 BatchGroupTopN, BatchPlanRef, ColPrunable, ColumnPruningContext, ExprRewritable, Logical,
23 LogicalPlanRef as PlanRef, LogicalProject, PlanBase, PlanTreeNodeUnary, PredicatePushdown,
24 PredicatePushdownContext, RewriteStreamContext, StreamDedup, StreamGroupTopN, ToBatch,
25 ToStream, ToStreamContext, gen_filter_and_pushdown, generic, try_enforce_locality_requirement,
26};
27use crate::error::Result;
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::property::{Order, RequiredDist};
30use crate::utils::Condition;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct LogicalDedup {
36 pub base: PlanBase<Logical>,
37 core: generic::Dedup<PlanRef>,
38}
39
40impl LogicalDedup {
41 pub fn new(input: PlanRef, dedup_cols: Vec<usize>) -> Self {
42 let core = generic::Dedup::new(input, dedup_cols);
43 let base = PlanBase::new_logical_with_core(&core);
44 LogicalDedup { base, core }
45 }
46
47 pub fn dedup_cols(&self) -> &[usize] {
48 &self.core.dedup_cols
49 }
50}
51
52impl PlanTreeNodeUnary<Logical> for LogicalDedup {
53 fn input(&self) -> PlanRef {
54 self.core.input.clone()
55 }
56
57 fn clone_with_input(&self, input: PlanRef) -> Self {
58 Self::new(input, self.dedup_cols().to_vec())
59 }
60
61 fn rewrite_with_input(
62 &self,
63 input: PlanRef,
64 input_col_change: ColIndexMapping,
65 ) -> (Self, ColIndexMapping) {
66 (
67 Self::new(
68 input,
69 self.dedup_cols()
70 .iter()
71 .map(|idx| input_col_change.map(*idx))
72 .collect_vec(),
73 ),
74 input_col_change,
75 )
76 }
77}
78
79impl_plan_tree_node_for_unary! { Logical, LogicalDedup}
80
81impl PredicatePushdown for LogicalDedup {
82 fn predicate_pushdown(
83 &self,
84 predicate: Condition,
85 ctx: &mut PredicatePushdownContext,
86 ) -> PlanRef {
87 gen_filter_and_pushdown(self, predicate, Condition::true_cond(), ctx)
88 }
89}
90
91impl ToStream for LogicalDedup {
92 fn logical_rewrite_for_stream(
93 &self,
94 ctx: &mut RewriteStreamContext,
95 ) -> Result<(PlanRef, ColIndexMapping)> {
96 let (input, input_col_change) = self.input().logical_rewrite_for_stream(ctx)?;
97 let (logical, out_col_change) = self.rewrite_with_input(input, input_col_change);
98 Ok((logical.into(), out_col_change))
99 }
100
101 fn to_stream(
102 &self,
103 ctx: &mut ToStreamContext,
104 ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
105 use super::stream::prelude::*;
106
107 let logical_input = try_enforce_locality_requirement(self.input(), self.dedup_cols());
108 let input = logical_input.to_stream(ctx)?;
109 let input = RequiredDist::hash_shard(self.dedup_cols())
110 .streaming_enforce_if_not_satisfies(input)?;
111 if input.append_only() {
112 let core = self.core.clone_with_input(input);
114 Ok(StreamDedup::new(core).into())
115 } else {
116 let logical_top_n = generic::TopN::with_group(
118 input,
119 TopNLimit::new(1, false),
120 0,
121 Order::default(),
122 self.dedup_cols().to_vec(),
123 );
124 Ok(StreamGroupTopN::new(logical_top_n, None)?.into())
125 }
126 }
127}
128
129impl ToBatch for LogicalDedup {
130 fn to_batch(&self) -> Result<BatchPlanRef> {
131 let input = self.input().to_batch()?;
132 let logical_top_n = generic::TopN::with_group(
133 input,
134 TopNLimit::new(1, false),
135 0,
136 Order::default(),
137 self.dedup_cols().to_vec(),
138 );
139 Ok(BatchGroupTopN::new(logical_top_n).into())
140 }
141}
142
143impl ExprRewritable<Logical> for LogicalDedup {}
144
145impl ExprVisitable for LogicalDedup {}
146
147impl ColPrunable for LogicalDedup {
148 fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
149 let input_required_bitset = FixedBitSet::from_iter(required_cols.iter().copied());
150 let dedup_required_bitset = {
151 let mut dedup_required_bitset = FixedBitSet::with_capacity(self.input().schema().len());
152 self.dedup_cols()
153 .iter()
154 .for_each(|idx| dedup_required_bitset.insert(*idx));
155 dedup_required_bitset
156 };
157 let input_required_cols = {
158 let mut tmp = input_required_bitset;
159 tmp.union_with(&dedup_required_bitset);
160 tmp.ones().collect_vec()
161 };
162 let mapping = ColIndexMapping::with_remaining_columns(
163 &input_required_cols,
164 self.input().schema().len(),
165 );
166
167 let new_input = self.input().prune_col(&input_required_cols, ctx);
168 let new_dedup_cols = self
169 .dedup_cols()
170 .iter()
171 .map(|&idx| mapping.map(idx))
172 .collect_vec();
173 let logical_dedup = Self::new(new_input, new_dedup_cols).into();
174
175 if input_required_cols == required_cols {
176 logical_dedup
177 } else {
178 let output_required_cols = required_cols
179 .iter()
180 .map(|&idx| mapping.map(idx))
181 .collect_vec();
182 let src_size = logical_dedup.schema().len();
183 LogicalProject::with_mapping(
184 logical_dedup,
185 ColIndexMapping::with_remaining_columns(&output_required_cols, src_size),
186 )
187 .into()
188 }
189 }
190}
191
192impl_distill_by_unit!(LogicalDedup, core, "LogicalDedup");