risingwave_frontend/optimizer/plan_node/
stream_sync_log_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_pb::stream_plan::SyncLogStoreNode;
17use risingwave_pb::stream_plan::stream_node::NodeBody;
18
19use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
20use crate::optimizer::plan_node::generic::PhysicalPlanRef;
21use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
22use crate::optimizer::plan_node::utils::{
23    Distill, childless_record, infer_synced_kv_log_store_table_catalog_inner,
24};
25use crate::optimizer::plan_node::{
26    ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode, StreamPlanRef as PlanRef,
27};
28use crate::stream_fragmenter::BuildFragmentGraphState;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct StreamSyncLogStore {
32    pub base: PlanBase<Stream>,
33    pub input: PlanRef,
34    pub buffer_size: usize,
35    pub pause_duration_ms: usize,
36}
37
38impl StreamSyncLogStore {
39    pub fn new(input: PlanRef) -> Self {
40        let base = PlanBase::new_stream(
41            input.ctx().clone(),
42            input.schema().clone(),
43            input.stream_key().map(|keys| keys.to_vec()),
44            input.functional_dependency().clone(),
45            input.distribution().clone(),
46            input.stream_kind(),
47            input.emit_on_window_close(),
48            input.watermark_columns().clone(),
49            input.columns_monotonicity().clone(),
50        );
51        let pause_duration_ms = input
52            .ctx()
53            .session_ctx()
54            .config()
55            .streaming_sync_log_store_pause_duration_ms();
56        let buffer_size = input
57            .ctx()
58            .session_ctx()
59            .config()
60            .streaming_sync_log_store_buffer_size();
61        Self {
62            base,
63            input,
64            buffer_size,
65            pause_duration_ms,
66        }
67    }
68}
69
70impl Distill for StreamSyncLogStore {
71    fn distill<'a>(&self) -> XmlNode<'a> {
72        let fields = vec![
73            ("buffer_size", Pretty::display(&self.buffer_size)),
74            (
75                "pause_duration_ms",
76                Pretty::display(&self.pause_duration_ms),
77            ),
78        ];
79        childless_record("StreamSyncLogStore", fields)
80    }
81}
82
83impl PlanTreeNodeUnary<Stream> for StreamSyncLogStore {
84    fn input(&self) -> PlanRef {
85        self.input.clone()
86    }
87
88    fn clone_with_input(&self, input: PlanRef) -> Self {
89        Self::new(input)
90    }
91}
92
93impl_plan_tree_node_for_unary! { Stream, StreamSyncLogStore }
94
95impl StreamNode for StreamSyncLogStore {
96    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
97        let columns = self.input.schema().fields();
98        let log_store_table = infer_synced_kv_log_store_table_catalog_inner(&self.input, columns)
99            .with_id(state.gen_table_id_wrapped())
100            .to_internal_table_prost()
101            .into();
102        NodeBody::SyncLogStore(Box::new(SyncLogStoreNode {
103            log_store_table,
104            pause_duration_ms: self.pause_duration_ms as _,
105            buffer_size: self.buffer_size as _,
106            aligned: false,
107        }))
108    }
109}
110
111impl ExprRewritable<Stream> for StreamSyncLogStore {}
112
113impl ExprVisitable for StreamSyncLogStore {}