risingwave_frontend/optimizer/plan_node/
stream_expand.rs1use risingwave_pb::stream_plan::ExpandNode;
16use risingwave_pb::stream_plan::expand_node::Subset;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::stream::prelude::*;
20use super::utils::impl_distill_by_unit;
21use super::{
22 ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef, generic,
23};
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::property::{Distribution, MonotonicityMap};
26use crate::stream_fragmenter::BuildFragmentGraphState;
27
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub struct StreamExpand {
30 pub base: PlanBase<Stream>,
31 core: generic::Expand<PlanRef>,
32}
33
34impl StreamExpand {
35 pub fn new(core: generic::Expand<PlanRef>) -> Self {
36 let input = core.input.clone();
37 let input_len = input.schema().len();
38
39 let dist = match input.distribution() {
40 Distribution::Single => Distribution::Single,
41 Distribution::SomeShard
42 | Distribution::HashShard(_)
43 | Distribution::UpstreamHashShard(_, _) => Distribution::SomeShard,
44 Distribution::Broadcast => unreachable!(),
45 };
46
47 let base = PlanBase::new_stream_with_core(
48 &core,
49 dist,
50 input.stream_kind(),
51 input.emit_on_window_close(),
52 input.watermark_columns().right_shift_clone(input_len),
53 MonotonicityMap::new(),
54 );
55 StreamExpand { base, core }
56 }
57
58 pub fn column_subsets(&self) -> &[Vec<usize>] {
59 &self.core.column_subsets
60 }
61}
62
63impl PlanTreeNodeUnary<Stream> for StreamExpand {
64 fn input(&self) -> PlanRef {
65 self.core.input.clone()
66 }
67
68 fn clone_with_input(&self, input: PlanRef) -> Self {
69 let mut core = self.core.clone();
70 core.input = input;
71 Self::new(core)
72 }
73}
74
75impl_plan_tree_node_for_unary! { Stream, StreamExpand }
76impl_distill_by_unit!(StreamExpand, core, "StreamExpand");
77
78impl StreamNode for StreamExpand {
79 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
80 PbNodeBody::Expand(Box::new(ExpandNode {
81 column_subsets: self
82 .column_subsets()
83 .iter()
84 .map(|subset| subset_to_protobuf(subset))
85 .collect(),
86 }))
87 }
88}
89
90fn subset_to_protobuf(subset: &[usize]) -> Subset {
91 let column_indices = subset.iter().map(|key| *key as u32).collect();
92 Subset { column_indices }
93}
94
95impl ExprRewritable<Stream> for StreamExpand {}
96
97impl ExprVisitable for StreamExpand {}