risingwave_frontend/optimizer/plan_node/
stream_dedup.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::util::sort_util::OrderType;
17use risingwave_pb::stream_plan::DedupNode;
18use risingwave_pb::stream_plan::stream_node::PbNodeBody;
19
20use super::generic::GenericPlanNode;
21use super::stream::prelude::*;
22use super::utils::{TableCatalogBuilder, impl_distill_by_unit};
23use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, generic};
24use crate::TableCatalog;
25use crate::optimizer::plan_node::PlanRef;
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::stream_fragmenter::BuildFragmentGraphState;
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct StreamDedup {
31    pub base: PlanBase<Stream>,
32    core: generic::Dedup<PlanRef>,
33}
34
35impl StreamDedup {
36    pub fn new(core: generic::Dedup<PlanRef>) -> Self {
37        let input = core.input.clone();
38        // A dedup operator must be append-only.
39        assert!(input.append_only());
40
41        let base = PlanBase::new_stream_with_core(
42            &core,
43            input.distribution().clone(),
44            true,
45            input.emit_on_window_close(),
46            input.watermark_columns().clone(),
47            input.columns_monotonicity().clone(),
48        );
49        StreamDedup { base, core }
50    }
51
52    pub fn infer_internal_table_catalog(&self) -> TableCatalog {
53        let schema = self.core.schema();
54        let mut builder = TableCatalogBuilder::default();
55
56        schema.fields().iter().for_each(|field| {
57            builder.add_column(field);
58        });
59
60        self.core.dedup_cols.iter().for_each(|idx| {
61            builder.add_order_column(*idx, OrderType::ascending());
62        });
63
64        let read_prefix_len_hint = builder.get_current_pk_len();
65
66        builder.build(
67            self.base.distribution().dist_column_indices().to_vec(),
68            read_prefix_len_hint,
69        )
70    }
71}
72
73// assert!(self.base.append_only());
74impl_distill_by_unit!(StreamDedup, core, "StreamAppendOnlyDedup");
75
76impl PlanTreeNodeUnary for StreamDedup {
77    fn input(&self) -> PlanRef {
78        self.core.input.clone()
79    }
80
81    fn clone_with_input(&self, input: PlanRef) -> Self {
82        let mut core = self.core.clone();
83        core.input = input;
84        Self::new(core)
85    }
86}
87
88impl_plan_tree_node_for_unary! { StreamDedup }
89
90impl StreamNode for StreamDedup {
91    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
92        let table_catalog = self
93            .infer_internal_table_catalog()
94            .with_id(state.gen_table_id_wrapped());
95        PbNodeBody::AppendOnlyDedup(Box::new(DedupNode {
96            state_table: Some(table_catalog.to_internal_table_prost()),
97            dedup_column_indices: self
98                .core
99                .dedup_cols
100                .iter()
101                .map(|idx| *idx as _)
102                .collect_vec(),
103        }))
104    }
105}
106
107impl ExprRewritable for StreamDedup {}
108
109impl ExprVisitable for StreamDedup {}