risingwave_stream/task/barrier_manager/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod cdc_progress;
16pub mod progress;
17
18use std::sync::Arc;
19
20pub use progress::CreateMviewProgressReporter;
21use risingwave_common::id::{SourceId, TableId};
22use risingwave_common::util::epoch::EpochPair;
23use risingwave_pb::id::{FragmentId, PartialGraphId};
24use tokio::sync::mpsc;
25use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
26
27use crate::error::{IntoUnexpectedExit, StreamError};
28use crate::executor::exchange::permit;
29use crate::executor::monitor::StreamingMetrics;
30use crate::executor::{Barrier, BarrierInner};
31use crate::task::barrier_manager::progress::BackfillState;
32use crate::task::cdc_progress::CdcTableBackfillState;
33use crate::task::{ActorId, StreamEnvironment};
34
35/// Events sent from actors via [`LocalBarrierManager`] to [`super::barrier_worker::managed_state::PartialGraphState`].
36///
37/// See [`crate::task`] for architecture overview.
38pub(super) enum LocalBarrierEvent {
39    ReportActorCollected {
40        actor_id: ActorId,
41        epoch: EpochPair,
42    },
43    ReportCreateProgress {
44        epoch: EpochPair,
45        fragment_id: FragmentId,
46        actor: ActorId,
47        state: BackfillState,
48    },
49    ReportSourceListFinished {
50        epoch: EpochPair,
51        actor_id: ActorId,
52        table_id: TableId,
53        associated_source_id: SourceId,
54    },
55    ReportSourceLoadFinished {
56        epoch: EpochPair,
57        actor_id: ActorId,
58        table_id: TableId,
59        associated_source_id: SourceId,
60    },
61    RefreshFinished {
62        epoch: EpochPair,
63        actor_id: ActorId,
64        table_id: TableId,
65        staging_table_id: TableId,
66    },
67    RegisterBarrierSender {
68        actor_id: ActorId,
69        barrier_sender: mpsc::UnboundedSender<Barrier>,
70    },
71    RegisterLocalUpstreamOutput {
72        actor_id: ActorId,
73        upstream_actor_id: ActorId,
74        upstream_partial_graph_id: PartialGraphId,
75        tx: permit::Sender,
76    },
77    ReportCdcTableBackfillProgress {
78        actor_id: ActorId,
79        epoch: EpochPair,
80        state: CdcTableBackfillState,
81    },
82    ReportCdcSourceOffsetUpdated {
83        epoch: EpochPair,
84        actor_id: ActorId,
85        source_id: SourceId,
86    },
87}
88
89/// Can send [`LocalBarrierEvent`] to [`super::barrier_worker::managed_state::PartialGraphState::poll_next_event`]
90///
91/// See [`crate::task`] for architecture overview.
92#[derive(Clone)]
93pub struct LocalBarrierManager {
94    barrier_event_sender: UnboundedSender<LocalBarrierEvent>,
95    actor_failure_sender: UnboundedSender<(ActorId, StreamError)>,
96    pub(crate) term_id: String,
97    pub(crate) env: StreamEnvironment,
98}
99
100impl LocalBarrierManager {
101    pub(super) fn new(
102        term_id: String,
103        env: StreamEnvironment,
104    ) -> (
105        Self,
106        UnboundedReceiver<LocalBarrierEvent>,
107        UnboundedReceiver<(ActorId, StreamError)>,
108    ) {
109        let (event_tx, event_rx) = unbounded_channel();
110        let (err_tx, err_rx) = unbounded_channel();
111        (
112            Self {
113                barrier_event_sender: event_tx,
114                actor_failure_sender: err_tx,
115                term_id,
116                env,
117            },
118            event_rx,
119            err_rx,
120        )
121    }
122
123    pub fn for_test() -> Self {
124        Self::new("114514".to_owned(), StreamEnvironment::for_test()).0
125    }
126
127    /// Event is handled by [`super::barrier_worker::managed_state::PartialGraphState::poll_next_event`]
128    fn send_event(&self, event: LocalBarrierEvent) {
129        // ignore error, because the current barrier manager maybe a stale one
130        let _ = self.barrier_event_sender.send(event);
131    }
132
133    /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report
134    /// and collect this barrier with its own `actor_id` using this function.
135    pub fn collect<M>(&self, actor_id: ActorId, barrier: &BarrierInner<M>) {
136        self.send_event(LocalBarrierEvent::ReportActorCollected {
137            actor_id,
138            epoch: barrier.epoch,
139        })
140    }
141
142    /// When a actor exit unexpectedly, it should report this event using this function, so meta
143    /// will notice actor's exit while collecting.
144    pub fn notify_failure(&self, actor_id: ActorId, err: StreamError) {
145        let _ = self
146            .actor_failure_sender
147            .send((actor_id, err.into_unexpected_exit(actor_id)));
148    }
149
150    pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver<Barrier> {
151        let (tx, rx) = mpsc::unbounded_channel();
152        self.send_event(LocalBarrierEvent::RegisterBarrierSender {
153            actor_id,
154            barrier_sender: tx,
155        });
156        rx
157    }
158
159    pub fn register_local_upstream_output(
160        &self,
161        actor_id: ActorId,
162        upstream_actor_id: ActorId,
163        upstream_fragment_id: FragmentId,
164        upstream_partial_graph_id: PartialGraphId,
165        metrics: Arc<StreamingMetrics>,
166    ) -> permit::Receiver {
167        let upstream_fragment_id_str = upstream_fragment_id.to_string();
168        let fragment_channel_buffered_bytes = metrics
169            .fragment_channel_buffered_bytes
170            .with_guarded_label_values(&[&upstream_fragment_id_str]);
171        let (tx, rx) = permit::channel_from_config_with_metrics(
172            self.env.global_config(),
173            permit::ChannelMetrics {
174                sender_actor_channel_buffered_bytes: fragment_channel_buffered_bytes.clone(),
175                receiver_actor_channel_buffered_bytes: fragment_channel_buffered_bytes,
176            },
177        );
178        self.send_event(LocalBarrierEvent::RegisterLocalUpstreamOutput {
179            actor_id,
180            upstream_actor_id,
181            upstream_partial_graph_id,
182            tx,
183        });
184        rx
185    }
186
187    pub fn report_source_list_finished(
188        &self,
189        epoch: EpochPair,
190        actor_id: ActorId,
191        table_id: TableId,
192        associated_source_id: SourceId,
193    ) {
194        self.send_event(LocalBarrierEvent::ReportSourceListFinished {
195            epoch,
196            actor_id,
197            table_id,
198            associated_source_id,
199        });
200    }
201
202    pub fn report_source_load_finished(
203        &self,
204        epoch: EpochPair,
205        actor_id: ActorId,
206        table_id: TableId,
207        associated_source_id: SourceId,
208    ) {
209        self.send_event(LocalBarrierEvent::ReportSourceLoadFinished {
210            epoch,
211            actor_id,
212            table_id,
213            associated_source_id,
214        });
215    }
216
217    pub fn report_refresh_finished(
218        &self,
219        epoch: EpochPair,
220        actor_id: ActorId,
221        table_id: TableId,
222        staging_table_id: TableId,
223    ) {
224        self.send_event(LocalBarrierEvent::RefreshFinished {
225            epoch,
226            actor_id,
227            table_id,
228            staging_table_id,
229        });
230    }
231
232    pub fn report_cdc_source_offset_updated(
233        &self,
234        epoch: EpochPair,
235        actor_id: ActorId,
236        source_id: SourceId,
237    ) {
238        self.send_event(LocalBarrierEvent::ReportCdcSourceOffsetUpdated {
239            epoch,
240            actor_id,
241            source_id,
242        });
243    }
244}
245
246#[cfg(test)]
247impl LocalBarrierManager {
248    pub(super) fn spawn_for_test()
249    -> crate::task::barrier_worker::EventSender<crate::task::barrier_worker::LocalActorOperation>
250    {
251        use std::sync::Arc;
252        use std::sync::atomic::AtomicU64;
253
254        use crate::executor::monitor::StreamingMetrics;
255        use crate::task::barrier_worker::{EventSender, LocalBarrierWorker};
256
257        let (tx, rx) = unbounded_channel();
258        let _join_handle = LocalBarrierWorker::spawn(
259            StreamEnvironment::for_test(),
260            Arc::new(StreamingMetrics::unused()),
261            None,
262            Arc::new(AtomicU64::new(0)),
263            rx,
264        );
265        EventSender(tx)
266    }
267}