risingwave_frontend/optimizer/plan_node/
stream_expand.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::stream_plan::ExpandNode;
16use risingwave_pb::stream_plan::expand_node::Subset;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::stream::prelude::*;
20use super::utils::impl_distill_by_unit;
21use super::{
22    ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef, generic,
23};
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::property::{Distribution, MonotonicityMap};
26use crate::stream_fragmenter::BuildFragmentGraphState;
27
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub struct StreamExpand {
30    pub base: PlanBase<Stream>,
31    core: generic::Expand<PlanRef>,
32}
33
34impl StreamExpand {
35    pub fn new(core: generic::Expand<PlanRef>) -> Self {
36        let input = core.input.clone();
37        let input_len = input.schema().len();
38
39        let dist = match input.distribution() {
40            Distribution::Single => Distribution::Single,
41            Distribution::SomeShard
42            | Distribution::HashShard(_)
43            | Distribution::UpstreamHashShard(_, _) => Distribution::SomeShard,
44            Distribution::Broadcast => unreachable!(),
45        };
46
47        let base = PlanBase::new_stream_with_core(
48            &core,
49            dist,
50            input.stream_kind(),
51            input.emit_on_window_close(),
52            input.watermark_columns().right_shift_clone(input_len),
53            MonotonicityMap::new(),
54        );
55        StreamExpand { base, core }
56    }
57
58    pub fn column_subsets(&self) -> &[Vec<usize>] {
59        &self.core.column_subsets
60    }
61}
62
63impl PlanTreeNodeUnary<Stream> for StreamExpand {
64    fn input(&self) -> PlanRef {
65        self.core.input.clone()
66    }
67
68    fn clone_with_input(&self, input: PlanRef) -> Self {
69        let mut core = self.core.clone();
70        core.input = input;
71        Self::new(core)
72    }
73}
74
75impl_plan_tree_node_for_unary! { Stream, StreamExpand }
76impl_distill_by_unit!(StreamExpand, core, "StreamExpand");
77
78impl StreamNode for StreamExpand {
79    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
80        PbNodeBody::Expand(Box::new(ExpandNode {
81            column_subsets: self
82                .column_subsets()
83                .iter()
84                .map(|subset| subset_to_protobuf(subset))
85                .collect(),
86        }))
87    }
88}
89
90fn subset_to_protobuf(subset: &[usize]) -> Subset {
91    let column_indices = subset.iter().map(|key| *key as u32).collect();
92    Subset { column_indices }
93}
94
95impl ExprRewritable<Stream> for StreamExpand {}
96
97impl ExprVisitable for StreamExpand {}