risingwave_frontend/optimizer/plan_node/
stream_row_id_gen.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_pb::stream_plan::stream_node::PbNodeBody;
17
18use super::stream::prelude::*;
19use super::utils::{Distill, childless_record};
20use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
21use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
22use crate::optimizer::property::Distribution;
23use crate::stream_fragmenter::BuildFragmentGraphState;
24
25#[derive(Debug, Clone, PartialEq, Eq, Hash)]
26pub struct StreamRowIdGen {
27    pub base: PlanBase<Stream>,
28    input: PlanRef,
29    row_id_index: usize,
30}
31
32impl StreamRowIdGen {
33    pub fn new(input: PlanRef, row_id_index: usize) -> Self {
34        let distribution = input.distribution().clone();
35        Self::new_with_dist(input, row_id_index, distribution)
36    }
37
38    /// Create a new `StreamRowIdGen` with a custom distribution.
39    pub fn new_with_dist(
40        input: PlanRef,
41        row_id_index: usize,
42        distribution: Distribution,
43    ) -> StreamRowIdGen {
44        let base = PlanBase::new_stream(
45            input.ctx(),
46            input.schema().clone(),
47            input.stream_key().map(|v| v.to_vec()),
48            input.functional_dependency().clone(),
49            distribution,
50            input.append_only(),
51            input.emit_on_window_close(),
52            input.watermark_columns().clone(),
53            input.columns_monotonicity().clone(),
54        );
55        Self {
56            base,
57            input,
58            row_id_index,
59        }
60    }
61}
62
63impl Distill for StreamRowIdGen {
64    fn distill<'a>(&self) -> XmlNode<'a> {
65        let fields = vec![("row_id_index", Pretty::debug(&self.row_id_index))];
66        childless_record("StreamRowIdGen", fields)
67    }
68}
69
70impl PlanTreeNodeUnary for StreamRowIdGen {
71    fn input(&self) -> PlanRef {
72        self.input.clone()
73    }
74
75    fn clone_with_input(&self, input: PlanRef) -> Self {
76        Self::new_with_dist(input, self.row_id_index, self.distribution().clone())
77    }
78}
79
80impl_plan_tree_node_for_unary! {StreamRowIdGen}
81
82impl StreamNode for StreamRowIdGen {
83    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
84        use risingwave_pb::stream_plan::*;
85
86        PbNodeBody::RowIdGen(Box::new(RowIdGenNode {
87            row_id_index: self.row_id_index as _,
88        }))
89    }
90}
91
92impl ExprRewritable for StreamRowIdGen {}
93
94impl ExprVisitable for StreamRowIdGen {}