risingwave_frontend/optimizer/plan_node/
stream_changelog.rs1use risingwave_pb::stream_plan::ChangeLogNode;
16use risingwave_pb::stream_plan::stream_node::PbNodeBody;
17
18use super::expr_visitable::ExprVisitable;
19use super::stream::StreamPlanNodeMetadata;
20use super::stream::prelude::PhysicalPlanRef;
21use super::utils::impl_distill_by_unit;
22use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode, generic};
23use crate::optimizer::StreamPlanRef as PlanRef;
24use crate::optimizer::property::{Distribution, MonotonicityMap, StreamKind};
25use crate::stream_fragmenter::BuildFragmentGraphState;
26
27#[derive(Debug, Clone, PartialEq, Eq, Hash)]
28pub struct StreamChangeLog {
29 pub base: PlanBase<Stream>,
30 core: generic::ChangeLog<PlanRef>,
31 distribution_keys: Vec<u32>,
32}
33
34impl StreamChangeLog {
35 pub fn new_with_dist(
36 core: generic::ChangeLog<PlanRef>,
37 dist: Distribution,
38 distribution_keys: Vec<u32>,
39 ) -> Self {
40 let input = core.input.clone();
41 let base = PlanBase::new_stream_with_core(
42 &core,
43 dist,
44 StreamKind::AppendOnly,
46 input.emit_on_window_close(),
47 input.watermark_columns().clone(),
48 MonotonicityMap::new(), );
50 StreamChangeLog {
51 base,
52 core,
53 distribution_keys,
54 }
55 }
56}
57
58impl PlanTreeNodeUnary<Stream> for StreamChangeLog {
59 fn input(&self) -> PlanRef {
60 self.core.input.clone()
61 }
62
63 fn clone_with_input(&self, input: PlanRef) -> Self {
64 let mut core = self.core.clone();
65 core.input = input;
66 Self::new_with_dist(
67 core,
68 self.base.distribution().clone(),
69 self.distribution_keys.clone(),
70 )
71 }
72}
73
74impl_plan_tree_node_for_unary! { Stream, StreamChangeLog }
75impl_distill_by_unit!(StreamChangeLog, core, "StreamChangeLog");
76
77impl StreamNode for StreamChangeLog {
78 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
79 PbNodeBody::Changelog(Box::new(ChangeLogNode {
80 need_op: self.core.need_op,
81 distribution_keys: self.distribution_keys.clone(),
82 }))
83 }
84}
85
86impl ExprRewritable<Stream> for StreamChangeLog {}
87
88impl ExprVisitable for StreamChangeLog {}