risingwave_frontend/optimizer/plan_node/
stream_sync_log_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_pb::stream_plan::SyncLogStoreNode;
17use risingwave_pb::stream_plan::stream_node::NodeBody;
18
19use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
20use crate::optimizer::plan_node::generic::PhysicalPlanRef;
21use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
22use crate::optimizer::plan_node::utils::{
23    Distill, childless_record, infer_synced_kv_log_store_table_catalog_inner,
24};
25use crate::optimizer::plan_node::{
26    ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode, StreamPlanRef as PlanRef,
27};
28use crate::stream_fragmenter::BuildFragmentGraphState;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct StreamSyncLogStore {
32    pub base: PlanBase<Stream>,
33    pub input: PlanRef,
34}
35
36impl StreamSyncLogStore {
37    pub fn new(input: PlanRef) -> Self {
38        let base = PlanBase::new_stream(
39            input.ctx(),
40            input.schema().clone(),
41            input.stream_key().map(|keys| keys.to_vec()),
42            input.functional_dependency().clone(),
43            input.distribution().clone(),
44            input.stream_kind(),
45            input.emit_on_window_close(),
46            input.watermark_columns().clone(),
47            input.columns_monotonicity().clone(),
48        );
49
50        Self { base, input }
51    }
52}
53
54impl Distill for StreamSyncLogStore {
55    fn distill<'a>(&self) -> XmlNode<'a> {
56        childless_record("StreamSyncLogStore", vec![])
57    }
58}
59
60impl PlanTreeNodeUnary<Stream> for StreamSyncLogStore {
61    fn input(&self) -> PlanRef {
62        self.input.clone()
63    }
64
65    fn clone_with_input(&self, input: PlanRef) -> Self {
66        Self::new(input)
67    }
68}
69
70impl_plan_tree_node_for_unary! { Stream, StreamSyncLogStore }
71
72impl StreamNode for StreamSyncLogStore {
73    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
74        let columns = self.input.schema().fields();
75        let log_store_table = infer_synced_kv_log_store_table_catalog_inner(&self.input, columns)
76            .with_id(state.gen_table_id_wrapped())
77            .to_internal_table_prost()
78            .into();
79        NodeBody::SyncLogStore(Box::new(SyncLogStoreNode {
80            log_store_table,
81            aligned: false,
82
83            // The following fields should now be read from per-job config override.
84            #[allow(deprecated)]
85            pause_duration_ms: None,
86            #[allow(deprecated)]
87            buffer_size: None,
88        }))
89    }
90}
91
92impl ExprRewritable<Stream> for StreamSyncLogStore {}
93
94impl ExprVisitable for StreamSyncLogStore {}