risingwave_frontend/optimizer/plan_node/
stream_dedup.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::util::sort_util::OrderType;
17use risingwave_pb::stream_plan::DedupNode;
18use risingwave_pb::stream_plan::stream_node::PbNodeBody;
19
20use super::generic::GenericPlanNode;
21use super::stream::prelude::*;
22use super::utils::{TableCatalogBuilder, impl_distill_by_unit};
23use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, generic};
24use crate::TableCatalog;
25use crate::optimizer::plan_node::StreamPlanRef as PlanRef;
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::property::StreamKind;
28use crate::stream_fragmenter::BuildFragmentGraphState;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct StreamDedup {
32    pub base: PlanBase<Stream>,
33    core: generic::Dedup<PlanRef>,
34}
35
36impl StreamDedup {
37    pub fn new(core: generic::Dedup<PlanRef>) -> Self {
38        let input = core.input.clone();
39        // A dedup operator must be append-only.
40        assert!(input.append_only());
41
42        let base = PlanBase::new_stream_with_core(
43            &core,
44            input.distribution().clone(),
45            StreamKind::AppendOnly,
46            input.emit_on_window_close(),
47            input.watermark_columns().clone(),
48            input.columns_monotonicity().clone(),
49        );
50        StreamDedup { base, core }
51    }
52
53    pub fn infer_internal_table_catalog(&self) -> TableCatalog {
54        let schema = self.core.schema();
55        let mut builder = TableCatalogBuilder::default();
56
57        schema.fields().iter().for_each(|field| {
58            builder.add_column(field);
59        });
60
61        self.core.dedup_cols.iter().for_each(|idx| {
62            builder.add_order_column(*idx, OrderType::ascending());
63        });
64
65        let read_prefix_len_hint = builder.get_current_pk_len();
66
67        builder.build(
68            self.base.distribution().dist_column_indices().to_vec(),
69            read_prefix_len_hint,
70        )
71    }
72}
73
74// assert!(self.base.append_only());
75impl_distill_by_unit!(StreamDedup, core, "StreamAppendOnlyDedup");
76
77impl PlanTreeNodeUnary<Stream> for StreamDedup {
78    fn input(&self) -> PlanRef {
79        self.core.input.clone()
80    }
81
82    fn clone_with_input(&self, input: PlanRef) -> Self {
83        let mut core = self.core.clone();
84        core.input = input;
85        Self::new(core)
86    }
87}
88
89impl_plan_tree_node_for_unary! { Stream, StreamDedup }
90
91impl StreamNode for StreamDedup {
92    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
93        let table_catalog = self
94            .infer_internal_table_catalog()
95            .with_id(state.gen_table_id_wrapped());
96        PbNodeBody::AppendOnlyDedup(Box::new(DedupNode {
97            state_table: Some(table_catalog.to_internal_table_prost()),
98            dedup_column_indices: self
99                .core
100                .dedup_cols
101                .iter()
102                .map(|idx| *idx as _)
103                .collect_vec(),
104        }))
105    }
106}
107
108impl ExprRewritable<Stream> for StreamDedup {}
109
110impl ExprVisitable for StreamDedup {}