risingwave_stream/task/barrier_manager/
mod.rs1pub mod cdc_progress;
16pub mod progress;
17
18pub use progress::CreateMviewProgressReporter;
19use risingwave_common::id::{SourceId, TableId};
20use risingwave_common::util::epoch::EpochPair;
21use risingwave_pb::id::{FragmentId, PartialGraphId};
22use tokio::sync::mpsc;
23use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
24
25use crate::error::{IntoUnexpectedExit, StreamError};
26use crate::executor::exchange::permit::{self, channel_from_config};
27use crate::executor::{Barrier, BarrierInner};
28use crate::task::barrier_manager::progress::BackfillState;
29use crate::task::cdc_progress::CdcTableBackfillState;
30use crate::task::{ActorId, StreamEnvironment};
31
32pub(super) enum LocalBarrierEvent {
36 ReportActorCollected {
37 actor_id: ActorId,
38 epoch: EpochPair,
39 },
40 ReportCreateProgress {
41 epoch: EpochPair,
42 fragment_id: FragmentId,
43 actor: ActorId,
44 state: BackfillState,
45 },
46 ReportSourceListFinished {
47 epoch: EpochPair,
48 actor_id: ActorId,
49 table_id: TableId,
50 associated_source_id: SourceId,
51 },
52 ReportSourceLoadFinished {
53 epoch: EpochPair,
54 actor_id: ActorId,
55 table_id: TableId,
56 associated_source_id: SourceId,
57 },
58 RefreshFinished {
59 epoch: EpochPair,
60 actor_id: ActorId,
61 table_id: TableId,
62 staging_table_id: TableId,
63 },
64 RegisterBarrierSender {
65 actor_id: ActorId,
66 barrier_sender: mpsc::UnboundedSender<Barrier>,
67 },
68 RegisterLocalUpstreamOutput {
69 actor_id: ActorId,
70 upstream_actor_id: ActorId,
71 upstream_partial_graph_id: PartialGraphId,
72 tx: permit::Sender,
73 },
74 ReportCdcTableBackfillProgress {
75 actor_id: ActorId,
76 epoch: EpochPair,
77 state: CdcTableBackfillState,
78 },
79 ReportCdcSourceOffsetUpdated {
80 epoch: EpochPair,
81 actor_id: ActorId,
82 source_id: SourceId,
83 },
84}
85
86#[derive(Clone)]
90pub struct LocalBarrierManager {
91 barrier_event_sender: UnboundedSender<LocalBarrierEvent>,
92 actor_failure_sender: UnboundedSender<(ActorId, StreamError)>,
93 pub(crate) term_id: String,
94 pub(crate) env: StreamEnvironment,
95}
96
97impl LocalBarrierManager {
98 pub(super) fn new(
99 term_id: String,
100 env: StreamEnvironment,
101 ) -> (
102 Self,
103 UnboundedReceiver<LocalBarrierEvent>,
104 UnboundedReceiver<(ActorId, StreamError)>,
105 ) {
106 let (event_tx, event_rx) = unbounded_channel();
107 let (err_tx, err_rx) = unbounded_channel();
108 (
109 Self {
110 barrier_event_sender: event_tx,
111 actor_failure_sender: err_tx,
112 term_id,
113 env,
114 },
115 event_rx,
116 err_rx,
117 )
118 }
119
120 pub fn for_test() -> Self {
121 Self::new("114514".to_owned(), StreamEnvironment::for_test()).0
122 }
123
124 fn send_event(&self, event: LocalBarrierEvent) {
126 let _ = self.barrier_event_sender.send(event);
128 }
129
130 pub fn collect<M>(&self, actor_id: ActorId, barrier: &BarrierInner<M>) {
133 self.send_event(LocalBarrierEvent::ReportActorCollected {
134 actor_id,
135 epoch: barrier.epoch,
136 })
137 }
138
139 pub fn notify_failure(&self, actor_id: ActorId, err: StreamError) {
142 let _ = self
143 .actor_failure_sender
144 .send((actor_id, err.into_unexpected_exit(actor_id)));
145 }
146
147 pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver<Barrier> {
148 let (tx, rx) = mpsc::unbounded_channel();
149 self.send_event(LocalBarrierEvent::RegisterBarrierSender {
150 actor_id,
151 barrier_sender: tx,
152 });
153 rx
154 }
155
156 pub fn register_local_upstream_output(
157 &self,
158 actor_id: ActorId,
159 upstream_actor_id: ActorId,
160 upstream_partial_graph_id: PartialGraphId,
161 ) -> permit::Receiver {
162 let (tx, rx) = channel_from_config(self.env.global_config());
163 self.send_event(LocalBarrierEvent::RegisterLocalUpstreamOutput {
164 actor_id,
165 upstream_actor_id,
166 upstream_partial_graph_id,
167 tx,
168 });
169 rx
170 }
171
172 pub fn report_source_list_finished(
173 &self,
174 epoch: EpochPair,
175 actor_id: ActorId,
176 table_id: TableId,
177 associated_source_id: SourceId,
178 ) {
179 self.send_event(LocalBarrierEvent::ReportSourceListFinished {
180 epoch,
181 actor_id,
182 table_id,
183 associated_source_id,
184 });
185 }
186
187 pub fn report_source_load_finished(
188 &self,
189 epoch: EpochPair,
190 actor_id: ActorId,
191 table_id: TableId,
192 associated_source_id: SourceId,
193 ) {
194 self.send_event(LocalBarrierEvent::ReportSourceLoadFinished {
195 epoch,
196 actor_id,
197 table_id,
198 associated_source_id,
199 });
200 }
201
202 pub fn report_refresh_finished(
203 &self,
204 epoch: EpochPair,
205 actor_id: ActorId,
206 table_id: TableId,
207 staging_table_id: TableId,
208 ) {
209 self.send_event(LocalBarrierEvent::RefreshFinished {
210 epoch,
211 actor_id,
212 table_id,
213 staging_table_id,
214 });
215 }
216
217 pub fn report_cdc_source_offset_updated(
218 &self,
219 epoch: EpochPair,
220 actor_id: ActorId,
221 source_id: SourceId,
222 ) {
223 self.send_event(LocalBarrierEvent::ReportCdcSourceOffsetUpdated {
224 epoch,
225 actor_id,
226 source_id,
227 });
228 }
229}
230
231#[cfg(test)]
232impl LocalBarrierManager {
233 pub(super) fn spawn_for_test()
234 -> crate::task::barrier_worker::EventSender<crate::task::barrier_worker::LocalActorOperation>
235 {
236 use std::sync::Arc;
237 use std::sync::atomic::AtomicU64;
238
239 use crate::executor::monitor::StreamingMetrics;
240 use crate::task::barrier_worker::{EventSender, LocalBarrierWorker};
241
242 let (tx, rx) = unbounded_channel();
243 let _join_handle = LocalBarrierWorker::spawn(
244 StreamEnvironment::for_test(),
245 Arc::new(StreamingMetrics::unused()),
246 None,
247 Arc::new(AtomicU64::new(0)),
248 rx,
249 );
250 EventSender(tx)
251 }
252}