risingwave_frontend/optimizer/plan_node/
stream_sync_log_store.rs1use pretty_xmlish::XmlNode;
16use risingwave_pb::stream_plan::SyncLogStoreNode;
17use risingwave_pb::stream_plan::stream_node::NodeBody;
18
19use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
20use crate::optimizer::plan_node::generic::PhysicalPlanRef;
21use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
22use crate::optimizer::plan_node::utils::{
23 Distill, childless_record, infer_synced_kv_log_store_table_catalog_inner,
24};
25use crate::optimizer::plan_node::{
26 ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode, StreamPlanRef as PlanRef,
27};
28use crate::stream_fragmenter::BuildFragmentGraphState;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct StreamSyncLogStore {
32 pub base: PlanBase<Stream>,
33 pub input: PlanRef,
34}
35
36impl StreamSyncLogStore {
37 pub fn new(input: PlanRef) -> Self {
38 let base = PlanBase::new_stream(
39 input.ctx(),
40 input.schema().clone(),
41 input.stream_key().map(|keys| keys.to_vec()),
42 input.functional_dependency().clone(),
43 input.distribution().clone(),
44 input.stream_kind(),
45 input.emit_on_window_close(),
46 input.watermark_columns().clone(),
47 input.columns_monotonicity().clone(),
48 );
49
50 Self { base, input }
51 }
52}
53
54impl Distill for StreamSyncLogStore {
55 fn distill<'a>(&self) -> XmlNode<'a> {
56 childless_record("StreamSyncLogStore", vec![])
57 }
58}
59
60impl PlanTreeNodeUnary<Stream> for StreamSyncLogStore {
61 fn input(&self) -> PlanRef {
62 self.input.clone()
63 }
64
65 fn clone_with_input(&self, input: PlanRef) -> Self {
66 Self::new(input)
67 }
68}
69
70impl_plan_tree_node_for_unary! { Stream, StreamSyncLogStore }
71
72impl StreamNode for StreamSyncLogStore {
73 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
74 let columns = self.input.schema().fields();
75 let log_store_table = infer_synced_kv_log_store_table_catalog_inner(&self.input, columns)
76 .with_id(state.gen_table_id_wrapped())
77 .to_internal_table_prost()
78 .into();
79 NodeBody::SyncLogStore(Box::new(SyncLogStoreNode {
80 log_store_table,
81 aligned: false,
82
83 #[allow(deprecated)]
85 pause_duration_ms: None,
86 #[allow(deprecated)]
87 buffer_size: None,
88 }))
89 }
90}
91
92impl ExprRewritable<Stream> for StreamSyncLogStore {}
93
94impl ExprVisitable for StreamSyncLogStore {}