risingwave_frontend/optimizer/plan_node/
logical_dedup.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use fixedbitset::FixedBitSet;
16use itertools::Itertools;
17use risingwave_common::util::column_index_mapping::ColIndexMapping;
18
19use super::generic::{GenericPlanRef, TopNLimit};
20use super::utils::impl_distill_by_unit;
21use super::{
22    BatchGroupTopN, BatchPlanRef, ColPrunable, ColumnPruningContext, ExprRewritable, Logical,
23    LogicalPlanRef as PlanRef, LogicalProject, PlanBase, PlanTreeNodeUnary, PredicatePushdown,
24    PredicatePushdownContext, RewriteStreamContext, StreamDedup, StreamGroupTopN, ToBatch,
25    ToStream, ToStreamContext, gen_filter_and_pushdown, generic,
26};
27use crate::error::Result;
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::property::{Order, RequiredDist};
30use crate::utils::Condition;
31
32/// [`LogicalDedup`] deduplicates data on specific columns. It is now used in `DISTINCT ON` without
33/// an `ORDER BY`.
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct LogicalDedup {
36    pub base: PlanBase<Logical>,
37    core: generic::Dedup<PlanRef>,
38}
39
40impl LogicalDedup {
41    pub fn new(input: PlanRef, dedup_cols: Vec<usize>) -> Self {
42        let core = generic::Dedup::new(input, dedup_cols);
43        let base = PlanBase::new_logical_with_core(&core);
44        LogicalDedup { base, core }
45    }
46
47    pub fn dedup_cols(&self) -> &[usize] {
48        &self.core.dedup_cols
49    }
50}
51
52impl PlanTreeNodeUnary<Logical> for LogicalDedup {
53    fn input(&self) -> PlanRef {
54        self.core.input.clone()
55    }
56
57    fn clone_with_input(&self, input: PlanRef) -> Self {
58        Self::new(input, self.dedup_cols().to_vec())
59    }
60
61    fn rewrite_with_input(
62        &self,
63        input: PlanRef,
64        input_col_change: ColIndexMapping,
65    ) -> (Self, ColIndexMapping) {
66        (
67            Self::new(
68                input,
69                self.dedup_cols()
70                    .iter()
71                    .map(|idx| input_col_change.map(*idx))
72                    .collect_vec(),
73            ),
74            input_col_change,
75        )
76    }
77}
78
79impl_plan_tree_node_for_unary! { Logical, LogicalDedup}
80
81impl PredicatePushdown for LogicalDedup {
82    fn predicate_pushdown(
83        &self,
84        predicate: Condition,
85        ctx: &mut PredicatePushdownContext,
86    ) -> PlanRef {
87        gen_filter_and_pushdown(self, predicate, Condition::true_cond(), ctx)
88    }
89}
90
91impl ToStream for LogicalDedup {
92    fn logical_rewrite_for_stream(
93        &self,
94        ctx: &mut RewriteStreamContext,
95    ) -> Result<(PlanRef, ColIndexMapping)> {
96        let (input, input_col_change) = self.input().logical_rewrite_for_stream(ctx)?;
97        let (logical, out_col_change) = self.rewrite_with_input(input, input_col_change);
98        Ok((logical.into(), out_col_change))
99    }
100
101    fn to_stream(
102        &self,
103        ctx: &mut ToStreamContext,
104    ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
105        use super::stream::prelude::*;
106
107        let input = self.input().to_stream(ctx)?;
108        let input = RequiredDist::hash_shard(self.dedup_cols())
109            .streaming_enforce_if_not_satisfies(input)?;
110        if input.append_only() {
111            // `LogicalDedup` is transformed to `StreamDedup` only when the input is append-only.
112            let core = self.core.clone_with_input(input);
113            Ok(StreamDedup::new(core).into())
114        } else {
115            // If the input is not append-only, we use a `StreamGroupTopN` with the limit being 1.
116            let logical_top_n = generic::TopN::with_group(
117                input,
118                TopNLimit::new(1, false),
119                0,
120                Order::default(),
121                self.dedup_cols().to_vec(),
122            );
123            Ok(StreamGroupTopN::new(logical_top_n, None)?.into())
124        }
125    }
126}
127
128impl ToBatch for LogicalDedup {
129    fn to_batch(&self) -> Result<BatchPlanRef> {
130        let input = self.input().to_batch()?;
131        let logical_top_n = generic::TopN::with_group(
132            input,
133            TopNLimit::new(1, false),
134            0,
135            Order::default(),
136            self.dedup_cols().to_vec(),
137        );
138        Ok(BatchGroupTopN::new(logical_top_n).into())
139    }
140}
141
142impl ExprRewritable<Logical> for LogicalDedup {}
143
144impl ExprVisitable for LogicalDedup {}
145
146impl ColPrunable for LogicalDedup {
147    fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
148        let input_required_bitset = FixedBitSet::from_iter(required_cols.iter().copied());
149        let dedup_required_bitset = {
150            let mut dedup_required_bitset = FixedBitSet::with_capacity(self.input().schema().len());
151            self.dedup_cols()
152                .iter()
153                .for_each(|idx| dedup_required_bitset.insert(*idx));
154            dedup_required_bitset
155        };
156        let input_required_cols = {
157            let mut tmp = input_required_bitset;
158            tmp.union_with(&dedup_required_bitset);
159            tmp.ones().collect_vec()
160        };
161        let mapping = ColIndexMapping::with_remaining_columns(
162            &input_required_cols,
163            self.input().schema().len(),
164        );
165
166        let new_input = self.input().prune_col(&input_required_cols, ctx);
167        let new_dedup_cols = self
168            .dedup_cols()
169            .iter()
170            .map(|&idx| mapping.map(idx))
171            .collect_vec();
172        let logical_dedup = Self::new(new_input, new_dedup_cols).into();
173
174        if input_required_cols == required_cols {
175            logical_dedup
176        } else {
177            let output_required_cols = required_cols
178                .iter()
179                .map(|&idx| mapping.map(idx))
180                .collect_vec();
181            let src_size = logical_dedup.schema().len();
182            LogicalProject::with_mapping(
183                logical_dedup,
184                ColIndexMapping::with_remaining_columns(&output_required_cols, src_size),
185            )
186            .into()
187        }
188    }
189}
190
191impl_distill_by_unit!(LogicalDedup, core, "LogicalDedup");