risingwave_frontend/optimizer/plan_node/
stream_sync_log_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_pb::stream_plan::SyncLogStoreNode;
17use risingwave_pb::stream_plan::stream_node::NodeBody;
18
19use crate::PlanRef;
20use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
21use crate::optimizer::plan_node::generic::PhysicalPlanRef;
22use crate::optimizer::plan_node::stream::StreamPlanRef;
23use crate::optimizer::plan_node::utils::{
24    Distill, childless_record, infer_synced_kv_log_store_table_catalog_inner,
25};
26use crate::optimizer::plan_node::{
27    ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode,
28};
29use crate::stream_fragmenter::BuildFragmentGraphState;
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct StreamSyncLogStore {
33    pub base: PlanBase<Stream>,
34    pub input: PlanRef,
35    pub buffer_size: usize,
36    pub pause_duration_ms: usize,
37}
38
39impl StreamSyncLogStore {
40    pub fn new(input: PlanRef) -> Self {
41        let base = PlanBase::new_stream(
42            input.ctx().clone(),
43            input.schema().clone(),
44            input.stream_key().map(|keys| keys.to_vec()),
45            input.functional_dependency().clone(),
46            input.distribution().clone(),
47            input.append_only(),
48            input.emit_on_window_close(),
49            input.watermark_columns().clone(),
50            input.columns_monotonicity().clone(),
51        );
52        let pause_duration_ms = input
53            .ctx()
54            .session_ctx()
55            .config()
56            .streaming_sync_log_store_pause_duration_ms();
57        let buffer_size = input
58            .ctx()
59            .session_ctx()
60            .config()
61            .streaming_sync_log_store_buffer_size();
62        Self {
63            base,
64            input,
65            buffer_size,
66            pause_duration_ms,
67        }
68    }
69}
70
71impl Distill for StreamSyncLogStore {
72    fn distill<'a>(&self) -> XmlNode<'a> {
73        let fields = vec![
74            ("buffer_size", Pretty::display(&self.buffer_size)),
75            (
76                "pause_duration_ms",
77                Pretty::display(&self.pause_duration_ms),
78            ),
79        ];
80        childless_record("StreamSyncLogStore", fields)
81    }
82}
83
84impl PlanTreeNodeUnary for StreamSyncLogStore {
85    fn input(&self) -> PlanRef {
86        self.input.clone()
87    }
88
89    fn clone_with_input(&self, input: PlanRef) -> Self {
90        Self::new(input)
91    }
92}
93
94impl_plan_tree_node_for_unary! { StreamSyncLogStore }
95
96impl StreamNode for StreamSyncLogStore {
97    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
98        let columns = self.input.schema().fields();
99        let log_store_table = infer_synced_kv_log_store_table_catalog_inner(&self.input, columns)
100            .with_id(state.gen_table_id_wrapped())
101            .to_internal_table_prost()
102            .into();
103        NodeBody::SyncLogStore(Box::new(SyncLogStoreNode {
104            log_store_table,
105            pause_duration_ms: self.pause_duration_ms as _,
106            buffer_size: self.buffer_size as _,
107        }))
108    }
109}
110
111impl ExprRewritable for StreamSyncLogStore {}
112
113impl ExprVisitable for StreamSyncLogStore {}