risingwave_stream/task/barrier_manager/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod cdc_progress;
16pub mod progress;
17
18pub use progress::CreateMviewProgressReporter;
19use risingwave_common::id::{SourceId, TableId};
20use risingwave_common::util::epoch::EpochPair;
21use risingwave_pb::id::{FragmentId, PartialGraphId};
22use tokio::sync::mpsc;
23use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
24
25use crate::error::{IntoUnexpectedExit, StreamError};
26use crate::executor::exchange::permit::{self, channel_from_config};
27use crate::executor::{Barrier, BarrierInner};
28use crate::task::barrier_manager::progress::BackfillState;
29use crate::task::cdc_progress::CdcTableBackfillState;
30use crate::task::{ActorId, StreamEnvironment};
31
32/// Events sent from actors via [`LocalBarrierManager`] to [`super::barrier_worker::managed_state::PartialGraphState`].
33///
34/// See [`crate::task`] for architecture overview.
35pub(super) enum LocalBarrierEvent {
36    ReportActorCollected {
37        actor_id: ActorId,
38        epoch: EpochPair,
39    },
40    ReportCreateProgress {
41        epoch: EpochPair,
42        fragment_id: FragmentId,
43        actor: ActorId,
44        state: BackfillState,
45    },
46    ReportSourceListFinished {
47        epoch: EpochPair,
48        actor_id: ActorId,
49        table_id: TableId,
50        associated_source_id: SourceId,
51    },
52    ReportSourceLoadFinished {
53        epoch: EpochPair,
54        actor_id: ActorId,
55        table_id: TableId,
56        associated_source_id: SourceId,
57    },
58    RefreshFinished {
59        epoch: EpochPair,
60        actor_id: ActorId,
61        table_id: TableId,
62        staging_table_id: TableId,
63    },
64    RegisterBarrierSender {
65        actor_id: ActorId,
66        barrier_sender: mpsc::UnboundedSender<Barrier>,
67    },
68    RegisterLocalUpstreamOutput {
69        actor_id: ActorId,
70        upstream_actor_id: ActorId,
71        upstream_partial_graph_id: PartialGraphId,
72        tx: permit::Sender,
73    },
74    ReportCdcTableBackfillProgress {
75        actor_id: ActorId,
76        epoch: EpochPair,
77        state: CdcTableBackfillState,
78    },
79    ReportCdcSourceOffsetUpdated {
80        epoch: EpochPair,
81        actor_id: ActorId,
82        source_id: SourceId,
83    },
84}
85
86/// Can send [`LocalBarrierEvent`] to [`super::barrier_worker::managed_state::PartialGraphState::poll_next_event`]
87///
88/// See [`crate::task`] for architecture overview.
89#[derive(Clone)]
90pub struct LocalBarrierManager {
91    barrier_event_sender: UnboundedSender<LocalBarrierEvent>,
92    actor_failure_sender: UnboundedSender<(ActorId, StreamError)>,
93    pub(crate) term_id: String,
94    pub(crate) env: StreamEnvironment,
95}
96
97impl LocalBarrierManager {
98    pub(super) fn new(
99        term_id: String,
100        env: StreamEnvironment,
101    ) -> (
102        Self,
103        UnboundedReceiver<LocalBarrierEvent>,
104        UnboundedReceiver<(ActorId, StreamError)>,
105    ) {
106        let (event_tx, event_rx) = unbounded_channel();
107        let (err_tx, err_rx) = unbounded_channel();
108        (
109            Self {
110                barrier_event_sender: event_tx,
111                actor_failure_sender: err_tx,
112                term_id,
113                env,
114            },
115            event_rx,
116            err_rx,
117        )
118    }
119
120    pub fn for_test() -> Self {
121        Self::new("114514".to_owned(), StreamEnvironment::for_test()).0
122    }
123
124    /// Event is handled by [`super::barrier_worker::managed_state::PartialGraphState::poll_next_event`]
125    fn send_event(&self, event: LocalBarrierEvent) {
126        // ignore error, because the current barrier manager maybe a stale one
127        let _ = self.barrier_event_sender.send(event);
128    }
129
130    /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report
131    /// and collect this barrier with its own `actor_id` using this function.
132    pub fn collect<M>(&self, actor_id: ActorId, barrier: &BarrierInner<M>) {
133        self.send_event(LocalBarrierEvent::ReportActorCollected {
134            actor_id,
135            epoch: barrier.epoch,
136        })
137    }
138
139    /// When a actor exit unexpectedly, it should report this event using this function, so meta
140    /// will notice actor's exit while collecting.
141    pub fn notify_failure(&self, actor_id: ActorId, err: StreamError) {
142        let _ = self
143            .actor_failure_sender
144            .send((actor_id, err.into_unexpected_exit(actor_id)));
145    }
146
147    pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver<Barrier> {
148        let (tx, rx) = mpsc::unbounded_channel();
149        self.send_event(LocalBarrierEvent::RegisterBarrierSender {
150            actor_id,
151            barrier_sender: tx,
152        });
153        rx
154    }
155
156    pub fn register_local_upstream_output(
157        &self,
158        actor_id: ActorId,
159        upstream_actor_id: ActorId,
160        upstream_partial_graph_id: PartialGraphId,
161    ) -> permit::Receiver {
162        let (tx, rx) = channel_from_config(self.env.global_config());
163        self.send_event(LocalBarrierEvent::RegisterLocalUpstreamOutput {
164            actor_id,
165            upstream_actor_id,
166            upstream_partial_graph_id,
167            tx,
168        });
169        rx
170    }
171
172    pub fn report_source_list_finished(
173        &self,
174        epoch: EpochPair,
175        actor_id: ActorId,
176        table_id: TableId,
177        associated_source_id: SourceId,
178    ) {
179        self.send_event(LocalBarrierEvent::ReportSourceListFinished {
180            epoch,
181            actor_id,
182            table_id,
183            associated_source_id,
184        });
185    }
186
187    pub fn report_source_load_finished(
188        &self,
189        epoch: EpochPair,
190        actor_id: ActorId,
191        table_id: TableId,
192        associated_source_id: SourceId,
193    ) {
194        self.send_event(LocalBarrierEvent::ReportSourceLoadFinished {
195            epoch,
196            actor_id,
197            table_id,
198            associated_source_id,
199        });
200    }
201
202    pub fn report_refresh_finished(
203        &self,
204        epoch: EpochPair,
205        actor_id: ActorId,
206        table_id: TableId,
207        staging_table_id: TableId,
208    ) {
209        self.send_event(LocalBarrierEvent::RefreshFinished {
210            epoch,
211            actor_id,
212            table_id,
213            staging_table_id,
214        });
215    }
216
217    pub fn report_cdc_source_offset_updated(
218        &self,
219        epoch: EpochPair,
220        actor_id: ActorId,
221        source_id: SourceId,
222    ) {
223        self.send_event(LocalBarrierEvent::ReportCdcSourceOffsetUpdated {
224            epoch,
225            actor_id,
226            source_id,
227        });
228    }
229}
230
231#[cfg(test)]
232impl LocalBarrierManager {
233    pub(super) fn spawn_for_test()
234    -> crate::task::barrier_worker::EventSender<crate::task::barrier_worker::LocalActorOperation>
235    {
236        use std::sync::Arc;
237        use std::sync::atomic::AtomicU64;
238
239        use crate::executor::monitor::StreamingMetrics;
240        use crate::task::barrier_worker::{EventSender, LocalBarrierWorker};
241
242        let (tx, rx) = unbounded_channel();
243        let _join_handle = LocalBarrierWorker::spawn(
244            StreamEnvironment::for_test(),
245            Arc::new(StreamingMetrics::unused()),
246            None,
247            Arc::new(AtomicU64::new(0)),
248            rx,
249        );
250        EventSender(tx)
251    }
252}