risingwave_frontend/optimizer/plan_node/
stream_expand.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::stream_plan::ExpandNode;
16use risingwave_pb::stream_plan::expand_node::Subset;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::stream::prelude::*;
20use super::utils::impl_distill_by_unit;
21use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode, generic};
22use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
23use crate::optimizer::property::{Distribution, MonotonicityMap};
24use crate::stream_fragmenter::BuildFragmentGraphState;
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash)]
27pub struct StreamExpand {
28    pub base: PlanBase<Stream>,
29    core: generic::Expand<PlanRef>,
30}
31
32impl StreamExpand {
33    pub fn new(core: generic::Expand<PlanRef>) -> Self {
34        let input = core.input.clone();
35        let input_len = input.schema().len();
36
37        let dist = match input.distribution() {
38            Distribution::Single => Distribution::Single,
39            Distribution::SomeShard
40            | Distribution::HashShard(_)
41            | Distribution::UpstreamHashShard(_, _) => Distribution::SomeShard,
42            Distribution::Broadcast => unreachable!(),
43        };
44
45        let base = PlanBase::new_stream_with_core(
46            &core,
47            dist,
48            input.append_only(),
49            input.emit_on_window_close(),
50            input.watermark_columns().right_shift_clone(input_len),
51            MonotonicityMap::new(),
52        );
53        StreamExpand { base, core }
54    }
55
56    pub fn column_subsets(&self) -> &[Vec<usize>] {
57        &self.core.column_subsets
58    }
59}
60
61impl PlanTreeNodeUnary for StreamExpand {
62    fn input(&self) -> PlanRef {
63        self.core.input.clone()
64    }
65
66    fn clone_with_input(&self, input: PlanRef) -> Self {
67        let mut core = self.core.clone();
68        core.input = input;
69        Self::new(core)
70    }
71}
72
73impl_plan_tree_node_for_unary! { StreamExpand }
74impl_distill_by_unit!(StreamExpand, core, "StreamExpand");
75
76impl StreamNode for StreamExpand {
77    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
78        PbNodeBody::Expand(Box::new(ExpandNode {
79            column_subsets: self
80                .column_subsets()
81                .iter()
82                .map(|subset| subset_to_protobuf(subset))
83                .collect(),
84        }))
85    }
86}
87
88fn subset_to_protobuf(subset: &[usize]) -> Subset {
89    let column_indices = subset.iter().map(|key| *key as u32).collect();
90    Subset { column_indices }
91}
92
93impl ExprRewritable for StreamExpand {}
94
95impl ExprVisitable for StreamExpand {}