risingwave_frontend/optimizer/plan_node/
stream_sync_log_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_pb::stream_plan::SyncLogStoreNode;
17use risingwave_pb::stream_plan::stream_node::NodeBody;
18
19use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
20use crate::optimizer::plan_node::generic::PhysicalPlanRef;
21use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
22use crate::optimizer::plan_node::utils::{
23    Distill, childless_record, infer_synced_kv_log_store_table_catalog_inner,
24};
25use crate::optimizer::plan_node::{
26    ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamExchange, StreamNode,
27    StreamPlanRef as PlanRef,
28};
29use crate::optimizer::plan_rewriter::PlanRewriter;
30use crate::stream_fragmenter::BuildFragmentGraphState;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct StreamSyncLogStore {
34    pub base: PlanBase<Stream>,
35    pub input: PlanRef,
36}
37
38impl StreamSyncLogStore {
39    pub fn new(input: PlanRef) -> Self {
40        let base = PlanBase::new_stream(
41            input.ctx(),
42            input.schema().clone(),
43            input.stream_key().map(|keys| keys.to_vec()),
44            input.functional_dependency().clone(),
45            input.distribution().clone(),
46            input.stream_kind(),
47            input.emit_on_window_close(),
48            input.watermark_columns().clone(),
49            input.columns_monotonicity().clone(),
50        );
51
52        Self { base, input }
53    }
54}
55
56impl Distill for StreamSyncLogStore {
57    fn distill<'a>(&self) -> XmlNode<'a> {
58        childless_record("StreamSyncLogStore", vec![])
59    }
60}
61
62impl PlanTreeNodeUnary<Stream> for StreamSyncLogStore {
63    fn input(&self) -> PlanRef {
64        self.input.clone()
65    }
66
67    fn clone_with_input(&self, input: PlanRef) -> Self {
68        Self::new(input)
69    }
70}
71
72impl_plan_tree_node_for_unary! { Stream, StreamSyncLogStore }
73
74impl StreamNode for StreamSyncLogStore {
75    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
76        let columns = self.input.schema().fields();
77        let log_store_table = infer_synced_kv_log_store_table_catalog_inner(&self.input, columns)
78            .with_id(state.gen_table_id_wrapped())
79            .to_internal_table_prost()
80            .into();
81        NodeBody::SyncLogStore(Box::new(SyncLogStoreNode {
82            log_store_table,
83            aligned: false,
84
85            // The following fields should now be read from per-job config override.
86            #[expect(deprecated)]
87            pause_duration_ms: None,
88            #[expect(deprecated)]
89            buffer_size: None,
90        }))
91    }
92}
93
94impl ExprRewritable<Stream> for StreamSyncLogStore {}
95
96impl ExprVisitable for StreamSyncLogStore {}
97
98/// Run after the final stream job root is created, so physical explain and graph building see the
99/// same sync log store fragment boundary.
100pub(crate) fn ensure_sync_log_store_fragment_root(plan: PlanRef) -> PlanRef {
101    plan.rewrite_with(&mut EnsureSyncLogStoreFragmentRootRewriter)
102}
103
104/// Insert no-shuffle exchanges above sync log stores that are not already separated by an exchange.
105struct EnsureSyncLogStoreFragmentRootRewriter;
106
107impl PlanRewriter<Stream> for EnsureSyncLogStoreFragmentRootRewriter {
108    fn rewrite_with_inputs(&mut self, plan: &PlanRef, mut inputs: Vec<PlanRef>) -> PlanRef {
109        if plan.as_stream_exchange().is_none() {
110            for input in &mut inputs {
111                if input.as_stream_sync_log_store().is_some() {
112                    *input = StreamExchange::new_no_shuffle(input.clone()).into();
113                }
114            }
115        }
116
117        plan.clone_root_with_inputs(&inputs)
118    }
119}