risingwave_frontend/optimizer/plan_node/
stream_row_id_gen.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_pb::stream_plan::stream_node::PbNodeBody;
17
18use super::stream::prelude::*;
19use super::utils::{Distill, childless_record};
20use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
21use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
22use crate::optimizer::property::Distribution;
23use crate::stream_fragmenter::BuildFragmentGraphState;
24
25#[derive(Debug, Clone, PartialEq, Eq, Hash)]
26pub struct StreamRowIdGen {
27 pub base: PlanBase<Stream>,
28 input: PlanRef,
29 row_id_index: usize,
30}
31
32impl StreamRowIdGen {
33 pub fn new(input: PlanRef, row_id_index: usize) -> Self {
34 let distribution = input.distribution().clone();
35 Self::new_with_dist(input, row_id_index, distribution)
36 }
37
38 pub fn new_with_dist(
40 input: PlanRef,
41 row_id_index: usize,
42 distribution: Distribution,
43 ) -> StreamRowIdGen {
44 let base = PlanBase::new_stream(
45 input.ctx(),
46 input.schema().clone(),
47 input.stream_key().map(|v| v.to_vec()),
48 input.functional_dependency().clone(),
49 distribution,
50 input.append_only(),
51 input.emit_on_window_close(),
52 input.watermark_columns().clone(),
53 input.columns_monotonicity().clone(),
54 );
55 Self {
56 base,
57 input,
58 row_id_index,
59 }
60 }
61}
62
63impl Distill for StreamRowIdGen {
64 fn distill<'a>(&self) -> XmlNode<'a> {
65 let fields = vec![("row_id_index", Pretty::debug(&self.row_id_index))];
66 childless_record("StreamRowIdGen", fields)
67 }
68}
69
70impl PlanTreeNodeUnary for StreamRowIdGen {
71 fn input(&self) -> PlanRef {
72 self.input.clone()
73 }
74
75 fn clone_with_input(&self, input: PlanRef) -> Self {
76 Self::new_with_dist(input, self.row_id_index, self.distribution().clone())
77 }
78}
79
80impl_plan_tree_node_for_unary! {StreamRowIdGen}
81
82impl StreamNode for StreamRowIdGen {
83 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
84 use risingwave_pb::stream_plan::*;
85
86 PbNodeBody::RowIdGen(Box::new(RowIdGenNode {
87 row_id_index: self.row_id_index as _,
88 }))
89 }
90}
91
92impl ExprRewritable for StreamRowIdGen {}
93
94impl ExprVisitable for StreamRowIdGen {}