risingwave_frontend/optimizer/plan_node/
stream_sync_log_store.rs1use pretty_xmlish::XmlNode;
16use risingwave_pb::stream_plan::SyncLogStoreNode;
17use risingwave_pb::stream_plan::stream_node::NodeBody;
18
19use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
20use crate::optimizer::plan_node::generic::PhysicalPlanRef;
21use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
22use crate::optimizer::plan_node::utils::{
23 Distill, childless_record, infer_synced_kv_log_store_table_catalog_inner,
24};
25use crate::optimizer::plan_node::{
26 ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamExchange, StreamNode,
27 StreamPlanRef as PlanRef,
28};
29use crate::optimizer::plan_rewriter::PlanRewriter;
30use crate::stream_fragmenter::BuildFragmentGraphState;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct StreamSyncLogStore {
34 pub base: PlanBase<Stream>,
35 pub input: PlanRef,
36}
37
38impl StreamSyncLogStore {
39 pub fn new(input: PlanRef) -> Self {
40 let base = PlanBase::new_stream(
41 input.ctx(),
42 input.schema().clone(),
43 input.stream_key().map(|keys| keys.to_vec()),
44 input.functional_dependency().clone(),
45 input.distribution().clone(),
46 input.stream_kind(),
47 input.emit_on_window_close(),
48 input.watermark_columns().clone(),
49 input.columns_monotonicity().clone(),
50 );
51
52 Self { base, input }
53 }
54}
55
56impl Distill for StreamSyncLogStore {
57 fn distill<'a>(&self) -> XmlNode<'a> {
58 childless_record("StreamSyncLogStore", vec![])
59 }
60}
61
62impl PlanTreeNodeUnary<Stream> for StreamSyncLogStore {
63 fn input(&self) -> PlanRef {
64 self.input.clone()
65 }
66
67 fn clone_with_input(&self, input: PlanRef) -> Self {
68 Self::new(input)
69 }
70}
71
72impl_plan_tree_node_for_unary! { Stream, StreamSyncLogStore }
73
74impl StreamNode for StreamSyncLogStore {
75 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
76 let columns = self.input.schema().fields();
77 let log_store_table = infer_synced_kv_log_store_table_catalog_inner(&self.input, columns)
78 .with_id(state.gen_table_id_wrapped())
79 .to_internal_table_prost()
80 .into();
81 NodeBody::SyncLogStore(Box::new(SyncLogStoreNode {
82 log_store_table,
83 aligned: false,
84
85 #[expect(deprecated)]
87 pause_duration_ms: None,
88 #[expect(deprecated)]
89 buffer_size: None,
90 }))
91 }
92}
93
94impl ExprRewritable<Stream> for StreamSyncLogStore {}
95
96impl ExprVisitable for StreamSyncLogStore {}
97
98pub(crate) fn ensure_sync_log_store_fragment_root(plan: PlanRef) -> PlanRef {
101 plan.rewrite_with(&mut EnsureSyncLogStoreFragmentRootRewriter)
102}
103
104struct EnsureSyncLogStoreFragmentRootRewriter;
106
107impl PlanRewriter<Stream> for EnsureSyncLogStoreFragmentRootRewriter {
108 fn rewrite_with_inputs(&mut self, plan: &PlanRef, mut inputs: Vec<PlanRef>) -> PlanRef {
109 if plan.as_stream_exchange().is_none() {
110 for input in &mut inputs {
111 if input.as_stream_sync_log_store().is_some() {
112 *input = StreamExchange::new_no_shuffle(input.clone()).into();
113 }
114 }
115 }
116
117 plan.clone_root_with_inputs(&inputs)
118 }
119}