risingwave_frontend/optimizer/plan_node/
stream_dedup.rs1use itertools::Itertools;
16use risingwave_common::util::sort_util::OrderType;
17use risingwave_pb::stream_plan::DedupNode;
18use risingwave_pb::stream_plan::stream_node::PbNodeBody;
19
20use super::generic::GenericPlanNode;
21use super::stream::prelude::*;
22use super::utils::{TableCatalogBuilder, impl_distill_by_unit};
23use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, generic};
24use crate::TableCatalog;
25use crate::optimizer::plan_node::PlanRef;
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::stream_fragmenter::BuildFragmentGraphState;
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct StreamDedup {
31 pub base: PlanBase<Stream>,
32 core: generic::Dedup<PlanRef>,
33}
34
35impl StreamDedup {
36 pub fn new(core: generic::Dedup<PlanRef>) -> Self {
37 let input = core.input.clone();
38 assert!(input.append_only());
40
41 let base = PlanBase::new_stream_with_core(
42 &core,
43 input.distribution().clone(),
44 true,
45 input.emit_on_window_close(),
46 input.watermark_columns().clone(),
47 input.columns_monotonicity().clone(),
48 );
49 StreamDedup { base, core }
50 }
51
52 pub fn infer_internal_table_catalog(&self) -> TableCatalog {
53 let schema = self.core.schema();
54 let mut builder = TableCatalogBuilder::default();
55
56 schema.fields().iter().for_each(|field| {
57 builder.add_column(field);
58 });
59
60 self.core.dedup_cols.iter().for_each(|idx| {
61 builder.add_order_column(*idx, OrderType::ascending());
62 });
63
64 let read_prefix_len_hint = builder.get_current_pk_len();
65
66 builder.build(
67 self.base.distribution().dist_column_indices().to_vec(),
68 read_prefix_len_hint,
69 )
70 }
71}
72
73impl_distill_by_unit!(StreamDedup, core, "StreamAppendOnlyDedup");
75
76impl PlanTreeNodeUnary for StreamDedup {
77 fn input(&self) -> PlanRef {
78 self.core.input.clone()
79 }
80
81 fn clone_with_input(&self, input: PlanRef) -> Self {
82 let mut core = self.core.clone();
83 core.input = input;
84 Self::new(core)
85 }
86}
87
88impl_plan_tree_node_for_unary! { StreamDedup }
89
90impl StreamNode for StreamDedup {
91 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
92 let table_catalog = self
93 .infer_internal_table_catalog()
94 .with_id(state.gen_table_id_wrapped());
95 PbNodeBody::AppendOnlyDedup(Box::new(DedupNode {
96 state_table: Some(table_catalog.to_internal_table_prost()),
97 dedup_column_indices: self
98 .core
99 .dedup_cols
100 .iter()
101 .map(|idx| *idx as _)
102 .collect_vec(),
103 }))
104 }
105}
106
107impl ExprRewritable for StreamDedup {}
108
109impl ExprVisitable for StreamDedup {}