risingwave_frontend/optimizer/plan_node/
stream_expand.rs1use risingwave_pb::stream_plan::ExpandNode;
16use risingwave_pb::stream_plan::expand_node::Subset;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::stream::prelude::*;
20use super::utils::impl_distill_by_unit;
21use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode, generic};
22use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
23use crate::optimizer::property::{Distribution, MonotonicityMap};
24use crate::stream_fragmenter::BuildFragmentGraphState;
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash)]
27pub struct StreamExpand {
28 pub base: PlanBase<Stream>,
29 core: generic::Expand<PlanRef>,
30}
31
32impl StreamExpand {
33 pub fn new(core: generic::Expand<PlanRef>) -> Self {
34 let input = core.input.clone();
35 let input_len = input.schema().len();
36
37 let dist = match input.distribution() {
38 Distribution::Single => Distribution::Single,
39 Distribution::SomeShard
40 | Distribution::HashShard(_)
41 | Distribution::UpstreamHashShard(_, _) => Distribution::SomeShard,
42 Distribution::Broadcast => unreachable!(),
43 };
44
45 let base = PlanBase::new_stream_with_core(
46 &core,
47 dist,
48 input.append_only(),
49 input.emit_on_window_close(),
50 input.watermark_columns().right_shift_clone(input_len),
51 MonotonicityMap::new(),
52 );
53 StreamExpand { base, core }
54 }
55
56 pub fn column_subsets(&self) -> &[Vec<usize>] {
57 &self.core.column_subsets
58 }
59}
60
61impl PlanTreeNodeUnary for StreamExpand {
62 fn input(&self) -> PlanRef {
63 self.core.input.clone()
64 }
65
66 fn clone_with_input(&self, input: PlanRef) -> Self {
67 let mut core = self.core.clone();
68 core.input = input;
69 Self::new(core)
70 }
71}
72
73impl_plan_tree_node_for_unary! { StreamExpand }
74impl_distill_by_unit!(StreamExpand, core, "StreamExpand");
75
76impl StreamNode for StreamExpand {
77 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
78 PbNodeBody::Expand(Box::new(ExpandNode {
79 column_subsets: self
80 .column_subsets()
81 .iter()
82 .map(|subset| subset_to_protobuf(subset))
83 .collect(),
84 }))
85 }
86}
87
88fn subset_to_protobuf(subset: &[usize]) -> Subset {
89 let column_indices = subset.iter().map(|key| *key as u32).collect();
90 Subset { column_indices }
91}
92
93impl ExprRewritable for StreamExpand {}
94
95impl ExprVisitable for StreamExpand {}