risingwave_stream/task/barrier_manager/
mod.rs1pub mod cdc_progress;
16pub mod progress;
17
18pub use progress::CreateMviewProgressReporter;
19use risingwave_common::id::{SourceId, TableId};
20use risingwave_common::util::epoch::EpochPair;
21use risingwave_pb::id::{FragmentId, PartialGraphId};
22use tokio::sync::mpsc;
23use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
24
25use crate::error::{IntoUnexpectedExit, StreamError};
26use crate::executor::exchange::permit::{self, channel_from_config};
27use crate::executor::{Barrier, BarrierInner};
28use crate::task::barrier_manager::progress::BackfillState;
29use crate::task::cdc_progress::CdcTableBackfillState;
30use crate::task::{ActorId, StreamEnvironment};
31
32pub(super) enum LocalBarrierEvent {
36 ReportActorCollected {
37 actor_id: ActorId,
38 epoch: EpochPair,
39 },
40 ReportCreateProgress {
41 epoch: EpochPair,
42 fragment_id: FragmentId,
43 actor: ActorId,
44 state: BackfillState,
45 },
46 ReportSourceListFinished {
47 epoch: EpochPair,
48 actor_id: ActorId,
49 table_id: TableId,
50 associated_source_id: SourceId,
51 },
52 ReportSourceLoadFinished {
53 epoch: EpochPair,
54 actor_id: ActorId,
55 table_id: TableId,
56 associated_source_id: SourceId,
57 },
58 RefreshFinished {
59 epoch: EpochPair,
60 actor_id: ActorId,
61 table_id: TableId,
62 staging_table_id: TableId,
63 },
64 RegisterBarrierSender {
65 actor_id: ActorId,
66 barrier_sender: mpsc::UnboundedSender<Barrier>,
67 },
68 RegisterLocalUpstreamOutput {
69 actor_id: ActorId,
70 upstream_actor_id: ActorId,
71 upstream_partial_graph_id: PartialGraphId,
72 tx: permit::Sender,
73 },
74 ReportCdcTableBackfillProgress {
75 actor_id: ActorId,
76 epoch: EpochPair,
77 state: CdcTableBackfillState,
78 },
79}
80
81#[derive(Clone)]
85pub struct LocalBarrierManager {
86 barrier_event_sender: UnboundedSender<LocalBarrierEvent>,
87 actor_failure_sender: UnboundedSender<(ActorId, StreamError)>,
88 pub(crate) term_id: String,
89 pub(crate) env: StreamEnvironment,
90}
91
92impl LocalBarrierManager {
93 pub(super) fn new(
94 term_id: String,
95 env: StreamEnvironment,
96 ) -> (
97 Self,
98 UnboundedReceiver<LocalBarrierEvent>,
99 UnboundedReceiver<(ActorId, StreamError)>,
100 ) {
101 let (event_tx, event_rx) = unbounded_channel();
102 let (err_tx, err_rx) = unbounded_channel();
103 (
104 Self {
105 barrier_event_sender: event_tx,
106 actor_failure_sender: err_tx,
107 term_id,
108 env,
109 },
110 event_rx,
111 err_rx,
112 )
113 }
114
115 pub fn for_test() -> Self {
116 Self::new("114514".to_owned(), StreamEnvironment::for_test()).0
117 }
118
119 fn send_event(&self, event: LocalBarrierEvent) {
121 let _ = self.barrier_event_sender.send(event);
123 }
124
125 pub fn collect<M>(&self, actor_id: ActorId, barrier: &BarrierInner<M>) {
128 self.send_event(LocalBarrierEvent::ReportActorCollected {
129 actor_id,
130 epoch: barrier.epoch,
131 })
132 }
133
134 pub fn notify_failure(&self, actor_id: ActorId, err: StreamError) {
137 let _ = self
138 .actor_failure_sender
139 .send((actor_id, err.into_unexpected_exit(actor_id)));
140 }
141
142 pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver<Barrier> {
143 let (tx, rx) = mpsc::unbounded_channel();
144 self.send_event(LocalBarrierEvent::RegisterBarrierSender {
145 actor_id,
146 barrier_sender: tx,
147 });
148 rx
149 }
150
151 pub fn register_local_upstream_output(
152 &self,
153 actor_id: ActorId,
154 upstream_actor_id: ActorId,
155 upstream_partial_graph_id: PartialGraphId,
156 ) -> permit::Receiver {
157 let (tx, rx) = channel_from_config(self.env.global_config());
158 self.send_event(LocalBarrierEvent::RegisterLocalUpstreamOutput {
159 actor_id,
160 upstream_actor_id,
161 upstream_partial_graph_id,
162 tx,
163 });
164 rx
165 }
166
167 pub fn report_source_list_finished(
168 &self,
169 epoch: EpochPair,
170 actor_id: ActorId,
171 table_id: TableId,
172 associated_source_id: SourceId,
173 ) {
174 self.send_event(LocalBarrierEvent::ReportSourceListFinished {
175 epoch,
176 actor_id,
177 table_id,
178 associated_source_id,
179 });
180 }
181
182 pub fn report_source_load_finished(
183 &self,
184 epoch: EpochPair,
185 actor_id: ActorId,
186 table_id: TableId,
187 associated_source_id: SourceId,
188 ) {
189 self.send_event(LocalBarrierEvent::ReportSourceLoadFinished {
190 epoch,
191 actor_id,
192 table_id,
193 associated_source_id,
194 });
195 }
196
197 pub fn report_refresh_finished(
198 &self,
199 epoch: EpochPair,
200 actor_id: ActorId,
201 table_id: TableId,
202 staging_table_id: TableId,
203 ) {
204 self.send_event(LocalBarrierEvent::RefreshFinished {
205 epoch,
206 actor_id,
207 table_id,
208 staging_table_id,
209 });
210 }
211}
212
213#[cfg(test)]
214impl LocalBarrierManager {
215 pub(super) fn spawn_for_test()
216 -> crate::task::barrier_worker::EventSender<crate::task::barrier_worker::LocalActorOperation>
217 {
218 use std::sync::Arc;
219 use std::sync::atomic::AtomicU64;
220
221 use crate::executor::monitor::StreamingMetrics;
222 use crate::task::barrier_worker::{EventSender, LocalBarrierWorker};
223
224 let (tx, rx) = unbounded_channel();
225 let _join_handle = LocalBarrierWorker::spawn(
226 StreamEnvironment::for_test(),
227 Arc::new(StreamingMetrics::unused()),
228 None,
229 Arc::new(AtomicU64::new(0)),
230 rx,
231 );
232 EventSender(tx)
233 }
234}