risingwave_frontend/optimizer/plan_node/
stream_changelog.rs1use risingwave_pb::stream_plan::ChangeLogNode;
16use risingwave_pb::stream_plan::stream_node::PbNodeBody;
17
18use super::expr_visitable::ExprVisitable;
19use super::stream::StreamPlanRef;
20use super::stream::prelude::PhysicalPlanRef;
21use super::utils::impl_distill_by_unit;
22use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode, generic};
23use crate::PlanRef;
24use crate::optimizer::property::MonotonicityMap;
25use crate::stream_fragmenter::BuildFragmentGraphState;
26
27#[derive(Debug, Clone, PartialEq, Eq, Hash)]
28pub struct StreamChangeLog {
29 pub base: PlanBase<Stream>,
30 core: generic::ChangeLog<PlanRef>,
31}
32
33impl StreamChangeLog {
34 pub fn new(core: generic::ChangeLog<PlanRef>) -> Self {
35 let input = core.input.clone();
36 let dist = input.distribution().clone();
37 let base = PlanBase::new_stream_with_core(
39 &core,
40 dist,
41 true,
43 input.emit_on_window_close(),
44 input.watermark_columns().clone(),
45 MonotonicityMap::new(), );
47 StreamChangeLog { base, core }
48 }
49}
50
51impl PlanTreeNodeUnary for StreamChangeLog {
52 fn input(&self) -> PlanRef {
53 self.core.input.clone()
54 }
55
56 fn clone_with_input(&self, input: PlanRef) -> Self {
57 let mut core = self.core.clone();
58 core.input = input;
59 Self::new(core)
60 }
61}
62
63impl_plan_tree_node_for_unary! { StreamChangeLog }
64impl_distill_by_unit!(StreamChangeLog, core, "StreamChangeLog");
65
66impl StreamNode for StreamChangeLog {
67 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
68 PbNodeBody::Changelog(Box::new(ChangeLogNode {
69 need_op: self.core.need_op,
70 }))
71 }
72}
73
74impl ExprRewritable for StreamChangeLog {}
75
76impl ExprVisitable for StreamChangeLog {}