risingwave_frontend/optimizer/plan_node/
stream_sync_log_store.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_pb::stream_plan::SyncLogStoreNode;
17use risingwave_pb::stream_plan::stream_node::NodeBody;
18
19use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
20use crate::optimizer::plan_node::generic::PhysicalPlanRef;
21use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
22use crate::optimizer::plan_node::utils::{
23 Distill, childless_record, infer_synced_kv_log_store_table_catalog_inner,
24};
25use crate::optimizer::plan_node::{
26 ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode, StreamPlanRef as PlanRef,
27};
28use crate::stream_fragmenter::BuildFragmentGraphState;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct StreamSyncLogStore {
32 pub base: PlanBase<Stream>,
33 pub input: PlanRef,
34 pub buffer_size: usize,
35 pub pause_duration_ms: usize,
36}
37
38impl StreamSyncLogStore {
39 pub fn new(input: PlanRef) -> Self {
40 let base = PlanBase::new_stream(
41 input.ctx().clone(),
42 input.schema().clone(),
43 input.stream_key().map(|keys| keys.to_vec()),
44 input.functional_dependency().clone(),
45 input.distribution().clone(),
46 input.stream_kind(),
47 input.emit_on_window_close(),
48 input.watermark_columns().clone(),
49 input.columns_monotonicity().clone(),
50 );
51 let pause_duration_ms = input
52 .ctx()
53 .session_ctx()
54 .config()
55 .streaming_sync_log_store_pause_duration_ms();
56 let buffer_size = input
57 .ctx()
58 .session_ctx()
59 .config()
60 .streaming_sync_log_store_buffer_size();
61 Self {
62 base,
63 input,
64 buffer_size,
65 pause_duration_ms,
66 }
67 }
68}
69
70impl Distill for StreamSyncLogStore {
71 fn distill<'a>(&self) -> XmlNode<'a> {
72 let fields = vec![
73 ("buffer_size", Pretty::display(&self.buffer_size)),
74 (
75 "pause_duration_ms",
76 Pretty::display(&self.pause_duration_ms),
77 ),
78 ];
79 childless_record("StreamSyncLogStore", fields)
80 }
81}
82
83impl PlanTreeNodeUnary<Stream> for StreamSyncLogStore {
84 fn input(&self) -> PlanRef {
85 self.input.clone()
86 }
87
88 fn clone_with_input(&self, input: PlanRef) -> Self {
89 Self::new(input)
90 }
91}
92
93impl_plan_tree_node_for_unary! { Stream, StreamSyncLogStore }
94
95impl StreamNode for StreamSyncLogStore {
96 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
97 let columns = self.input.schema().fields();
98 let log_store_table = infer_synced_kv_log_store_table_catalog_inner(&self.input, columns)
99 .with_id(state.gen_table_id_wrapped())
100 .to_internal_table_prost()
101 .into();
102 NodeBody::SyncLogStore(Box::new(SyncLogStoreNode {
103 log_store_table,
104 pause_duration_ms: self.pause_duration_ms as _,
105 buffer_size: self.buffer_size as _,
106 aligned: false,
107 }))
108 }
109}
110
111impl ExprRewritable<Stream> for StreamSyncLogStore {}
112
113impl ExprVisitable for StreamSyncLogStore {}