risingwave_frontend/optimizer/plan_node/
stream_sync_log_store.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_pb::stream_plan::SyncLogStoreNode;
17use risingwave_pb::stream_plan::stream_node::NodeBody;
18
19use crate::PlanRef;
20use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
21use crate::optimizer::plan_node::generic::PhysicalPlanRef;
22use crate::optimizer::plan_node::stream::StreamPlanRef;
23use crate::optimizer::plan_node::utils::{
24 Distill, childless_record, infer_synced_kv_log_store_table_catalog_inner,
25};
26use crate::optimizer::plan_node::{
27 ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode,
28};
29use crate::stream_fragmenter::BuildFragmentGraphState;
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct StreamSyncLogStore {
33 pub base: PlanBase<Stream>,
34 pub input: PlanRef,
35 pub buffer_size: usize,
36 pub pause_duration_ms: usize,
37}
38
39impl StreamSyncLogStore {
40 pub fn new(input: PlanRef) -> Self {
41 let base = PlanBase::new_stream(
42 input.ctx().clone(),
43 input.schema().clone(),
44 input.stream_key().map(|keys| keys.to_vec()),
45 input.functional_dependency().clone(),
46 input.distribution().clone(),
47 input.append_only(),
48 input.emit_on_window_close(),
49 input.watermark_columns().clone(),
50 input.columns_monotonicity().clone(),
51 );
52 let pause_duration_ms = input
53 .ctx()
54 .session_ctx()
55 .config()
56 .streaming_sync_log_store_pause_duration_ms();
57 let buffer_size = input
58 .ctx()
59 .session_ctx()
60 .config()
61 .streaming_sync_log_store_buffer_size();
62 Self {
63 base,
64 input,
65 buffer_size,
66 pause_duration_ms,
67 }
68 }
69}
70
71impl Distill for StreamSyncLogStore {
72 fn distill<'a>(&self) -> XmlNode<'a> {
73 let fields = vec![
74 ("buffer_size", Pretty::display(&self.buffer_size)),
75 (
76 "pause_duration_ms",
77 Pretty::display(&self.pause_duration_ms),
78 ),
79 ];
80 childless_record("StreamSyncLogStore", fields)
81 }
82}
83
84impl PlanTreeNodeUnary for StreamSyncLogStore {
85 fn input(&self) -> PlanRef {
86 self.input.clone()
87 }
88
89 fn clone_with_input(&self, input: PlanRef) -> Self {
90 Self::new(input)
91 }
92}
93
94impl_plan_tree_node_for_unary! { StreamSyncLogStore }
95
96impl StreamNode for StreamSyncLogStore {
97 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
98 let columns = self.input.schema().fields();
99 let log_store_table = infer_synced_kv_log_store_table_catalog_inner(&self.input, columns)
100 .with_id(state.gen_table_id_wrapped())
101 .to_internal_table_prost()
102 .into();
103 NodeBody::SyncLogStore(Box::new(SyncLogStoreNode {
104 log_store_table,
105 pause_duration_ms: self.pause_duration_ms as _,
106 buffer_size: self.buffer_size as _,
107 }))
108 }
109}
110
111impl ExprRewritable for StreamSyncLogStore {}
112
113impl ExprVisitable for StreamSyncLogStore {}