risingwave_stream/task/barrier_manager/
mod.rs1pub mod cdc_progress;
16pub mod progress;
17
18use std::sync::Arc;
19
20pub use progress::CreateMviewProgressReporter;
21use risingwave_common::id::{SourceId, TableId};
22use risingwave_common::util::epoch::EpochPair;
23use risingwave_pb::id::{FragmentId, PartialGraphId};
24use tokio::sync::mpsc;
25use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
26
27use crate::error::{IntoUnexpectedExit, StreamError};
28use crate::executor::exchange::permit;
29use crate::executor::monitor::StreamingMetrics;
30use crate::executor::{Barrier, BarrierInner};
31use crate::task::barrier_manager::progress::BackfillState;
32use crate::task::cdc_progress::CdcTableBackfillState;
33use crate::task::{ActorId, StreamEnvironment};
34
35pub(super) enum LocalBarrierEvent {
39 ReportActorCollected {
40 actor_id: ActorId,
41 epoch: EpochPair,
42 },
43 ReportCreateProgress {
44 epoch: EpochPair,
45 fragment_id: FragmentId,
46 actor: ActorId,
47 state: BackfillState,
48 },
49 ReportSourceListFinished {
50 epoch: EpochPair,
51 actor_id: ActorId,
52 table_id: TableId,
53 associated_source_id: SourceId,
54 },
55 ReportSourceLoadFinished {
56 epoch: EpochPair,
57 actor_id: ActorId,
58 table_id: TableId,
59 associated_source_id: SourceId,
60 },
61 RefreshFinished {
62 epoch: EpochPair,
63 actor_id: ActorId,
64 table_id: TableId,
65 staging_table_id: TableId,
66 },
67 RegisterBarrierSender {
68 actor_id: ActorId,
69 barrier_sender: mpsc::UnboundedSender<Barrier>,
70 },
71 RegisterLocalUpstreamOutput {
72 actor_id: ActorId,
73 upstream_actor_id: ActorId,
74 upstream_partial_graph_id: PartialGraphId,
75 tx: permit::Sender,
76 },
77 ReportCdcTableBackfillProgress {
78 actor_id: ActorId,
79 epoch: EpochPair,
80 state: CdcTableBackfillState,
81 },
82 ReportCdcSourceOffsetUpdated {
83 epoch: EpochPair,
84 actor_id: ActorId,
85 source_id: SourceId,
86 },
87}
88
89#[derive(Clone)]
93pub struct LocalBarrierManager {
94 barrier_event_sender: UnboundedSender<LocalBarrierEvent>,
95 actor_failure_sender: UnboundedSender<(ActorId, StreamError)>,
96 pub(crate) term_id: String,
97 pub(crate) env: StreamEnvironment,
98}
99
100impl LocalBarrierManager {
101 pub(super) fn new(
102 term_id: String,
103 env: StreamEnvironment,
104 ) -> (
105 Self,
106 UnboundedReceiver<LocalBarrierEvent>,
107 UnboundedReceiver<(ActorId, StreamError)>,
108 ) {
109 let (event_tx, event_rx) = unbounded_channel();
110 let (err_tx, err_rx) = unbounded_channel();
111 (
112 Self {
113 barrier_event_sender: event_tx,
114 actor_failure_sender: err_tx,
115 term_id,
116 env,
117 },
118 event_rx,
119 err_rx,
120 )
121 }
122
123 pub fn for_test() -> Self {
124 Self::new("114514".to_owned(), StreamEnvironment::for_test()).0
125 }
126
127 fn send_event(&self, event: LocalBarrierEvent) {
129 let _ = self.barrier_event_sender.send(event);
131 }
132
133 pub fn collect<M>(&self, actor_id: ActorId, barrier: &BarrierInner<M>) {
136 self.send_event(LocalBarrierEvent::ReportActorCollected {
137 actor_id,
138 epoch: barrier.epoch,
139 })
140 }
141
142 pub fn notify_failure(&self, actor_id: ActorId, err: StreamError) {
145 let _ = self
146 .actor_failure_sender
147 .send((actor_id, err.into_unexpected_exit(actor_id)));
148 }
149
150 pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver<Barrier> {
151 let (tx, rx) = mpsc::unbounded_channel();
152 self.send_event(LocalBarrierEvent::RegisterBarrierSender {
153 actor_id,
154 barrier_sender: tx,
155 });
156 rx
157 }
158
159 pub fn register_local_upstream_output(
160 &self,
161 actor_id: ActorId,
162 upstream_actor_id: ActorId,
163 upstream_fragment_id: FragmentId,
164 upstream_partial_graph_id: PartialGraphId,
165 metrics: Arc<StreamingMetrics>,
166 ) -> permit::Receiver {
167 let upstream_fragment_id_str = upstream_fragment_id.to_string();
168 let fragment_channel_buffered_bytes = metrics
169 .fragment_channel_buffered_bytes
170 .with_guarded_label_values(&[&upstream_fragment_id_str]);
171 let (tx, rx) = permit::channel_from_config_with_metrics(
172 self.env.global_config(),
173 permit::ChannelMetrics {
174 sender_actor_channel_buffered_bytes: fragment_channel_buffered_bytes.clone(),
175 receiver_actor_channel_buffered_bytes: fragment_channel_buffered_bytes,
176 },
177 );
178 self.send_event(LocalBarrierEvent::RegisterLocalUpstreamOutput {
179 actor_id,
180 upstream_actor_id,
181 upstream_partial_graph_id,
182 tx,
183 });
184 rx
185 }
186
187 pub fn report_source_list_finished(
188 &self,
189 epoch: EpochPair,
190 actor_id: ActorId,
191 table_id: TableId,
192 associated_source_id: SourceId,
193 ) {
194 self.send_event(LocalBarrierEvent::ReportSourceListFinished {
195 epoch,
196 actor_id,
197 table_id,
198 associated_source_id,
199 });
200 }
201
202 pub fn report_source_load_finished(
203 &self,
204 epoch: EpochPair,
205 actor_id: ActorId,
206 table_id: TableId,
207 associated_source_id: SourceId,
208 ) {
209 self.send_event(LocalBarrierEvent::ReportSourceLoadFinished {
210 epoch,
211 actor_id,
212 table_id,
213 associated_source_id,
214 });
215 }
216
217 pub fn report_refresh_finished(
218 &self,
219 epoch: EpochPair,
220 actor_id: ActorId,
221 table_id: TableId,
222 staging_table_id: TableId,
223 ) {
224 self.send_event(LocalBarrierEvent::RefreshFinished {
225 epoch,
226 actor_id,
227 table_id,
228 staging_table_id,
229 });
230 }
231
232 pub fn report_cdc_source_offset_updated(
233 &self,
234 epoch: EpochPair,
235 actor_id: ActorId,
236 source_id: SourceId,
237 ) {
238 self.send_event(LocalBarrierEvent::ReportCdcSourceOffsetUpdated {
239 epoch,
240 actor_id,
241 source_id,
242 });
243 }
244}
245
246#[cfg(test)]
247impl LocalBarrierManager {
248 pub(super) fn spawn_for_test()
249 -> crate::task::barrier_worker::EventSender<crate::task::barrier_worker::LocalActorOperation>
250 {
251 use std::sync::Arc;
252 use std::sync::atomic::AtomicU64;
253
254 use crate::executor::monitor::StreamingMetrics;
255 use crate::task::barrier_worker::{EventSender, LocalBarrierWorker};
256
257 let (tx, rx) = unbounded_channel();
258 let _join_handle = LocalBarrierWorker::spawn(
259 StreamEnvironment::for_test(),
260 Arc::new(StreamingMetrics::unused()),
261 None,
262 Arc::new(AtomicU64::new(0)),
263 rx,
264 );
265 EventSender(tx)
266 }
267}