risingwave_frontend/optimizer/plan_node/
stream_dedup.rs1use itertools::Itertools;
16use risingwave_common::util::sort_util::OrderType;
17use risingwave_pb::stream_plan::DedupNode;
18use risingwave_pb::stream_plan::stream_node::PbNodeBody;
19
20use super::generic::GenericPlanNode;
21use super::stream::prelude::*;
22use super::utils::{TableCatalogBuilder, impl_distill_by_unit};
23use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, generic};
24use crate::TableCatalog;
25use crate::optimizer::plan_node::StreamPlanRef as PlanRef;
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::property::StreamKind;
28use crate::stream_fragmenter::BuildFragmentGraphState;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct StreamDedup {
32 pub base: PlanBase<Stream>,
33 core: generic::Dedup<PlanRef>,
34}
35
36impl StreamDedup {
37 pub fn new(core: generic::Dedup<PlanRef>) -> Self {
38 let input = core.input.clone();
39 assert!(input.append_only());
41
42 let base = PlanBase::new_stream_with_core(
43 &core,
44 input.distribution().clone(),
45 StreamKind::AppendOnly,
46 input.emit_on_window_close(),
47 input.watermark_columns().clone(),
48 input.columns_monotonicity().clone(),
49 );
50 StreamDedup { base, core }
51 }
52
53 pub fn infer_internal_table_catalog(&self) -> TableCatalog {
54 let schema = self.core.schema();
55 let mut builder = TableCatalogBuilder::default();
56
57 schema.fields().iter().for_each(|field| {
58 builder.add_column(field);
59 });
60
61 self.core.dedup_cols.iter().for_each(|idx| {
62 builder.add_order_column(*idx, OrderType::ascending());
63 });
64
65 let read_prefix_len_hint = builder.get_current_pk_len();
66
67 builder.build(
68 self.base.distribution().dist_column_indices().to_vec(),
69 read_prefix_len_hint,
70 )
71 }
72}
73
74impl_distill_by_unit!(StreamDedup, core, "StreamAppendOnlyDedup");
76
77impl PlanTreeNodeUnary<Stream> for StreamDedup {
78 fn input(&self) -> PlanRef {
79 self.core.input.clone()
80 }
81
82 fn clone_with_input(&self, input: PlanRef) -> Self {
83 let mut core = self.core.clone();
84 core.input = input;
85 Self::new(core)
86 }
87}
88
89impl_plan_tree_node_for_unary! { Stream, StreamDedup }
90
91impl StreamNode for StreamDedup {
92 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
93 let table_catalog = self
94 .infer_internal_table_catalog()
95 .with_id(state.gen_table_id_wrapped());
96 PbNodeBody::AppendOnlyDedup(Box::new(DedupNode {
97 state_table: Some(table_catalog.to_internal_table_prost()),
98 dedup_column_indices: self
99 .core
100 .dedup_cols
101 .iter()
102 .map(|idx| *idx as _)
103 .collect_vec(),
104 }))
105 }
106}
107
108impl ExprRewritable<Stream> for StreamDedup {}
109
110impl ExprVisitable for StreamDedup {}