risingwave_stream/task/barrier_manager/
mod.rs1pub mod cdc_progress;
16pub mod progress;
17
18pub use progress::CreateMviewProgressReporter;
19use risingwave_common::catalog::DatabaseId;
20use risingwave_common::util::epoch::EpochPair;
21use tokio::sync::mpsc;
22use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
23
24use crate::error::{IntoUnexpectedExit, StreamError};
25use crate::executor::exchange::permit::{self, channel_from_config};
26use crate::executor::{Barrier, BarrierInner};
27use crate::task::barrier_manager::progress::BackfillState;
28use crate::task::cdc_progress::CdcTableBackfillState;
29use crate::task::{ActorId, StreamEnvironment};
30
31pub(super) enum LocalBarrierEvent {
35 ReportActorCollected {
36 actor_id: ActorId,
37 epoch: EpochPair,
38 },
39 ReportCreateProgress {
40 epoch: EpochPair,
41 actor: ActorId,
42 state: BackfillState,
43 },
44 ReportSourceListFinished {
45 epoch: EpochPair,
46 actor_id: ActorId,
47 table_id: u32,
48 associated_source_id: u32,
49 },
50 ReportSourceLoadFinished {
51 epoch: EpochPair,
52 actor_id: ActorId,
53 table_id: u32,
54 associated_source_id: u32,
55 },
56 RefreshFinished {
57 epoch: EpochPair,
58 actor_id: ActorId,
59 table_id: u32,
60 staging_table_id: u32,
61 },
62 RegisterBarrierSender {
63 actor_id: ActorId,
64 barrier_sender: mpsc::UnboundedSender<Barrier>,
65 },
66 RegisterLocalUpstreamOutput {
67 actor_id: ActorId,
68 upstream_actor_id: ActorId,
69 tx: permit::Sender,
70 },
71 ReportCdcTableBackfillProgress {
72 actor_id: ActorId,
73 epoch: EpochPair,
74 state: CdcTableBackfillState,
75 },
76}
77
78#[derive(Clone)]
82pub struct LocalBarrierManager {
83 barrier_event_sender: UnboundedSender<LocalBarrierEvent>,
84 actor_failure_sender: UnboundedSender<(ActorId, StreamError)>,
85 pub(crate) database_id: DatabaseId,
86 pub(crate) term_id: String,
87 pub(crate) env: StreamEnvironment,
88}
89
90impl LocalBarrierManager {
91 pub(super) fn new(
92 database_id: DatabaseId,
93 term_id: String,
94 env: StreamEnvironment,
95 ) -> (
96 Self,
97 UnboundedReceiver<LocalBarrierEvent>,
98 UnboundedReceiver<(ActorId, StreamError)>,
99 ) {
100 let (event_tx, event_rx) = unbounded_channel();
101 let (err_tx, err_rx) = unbounded_channel();
102 (
103 Self {
104 barrier_event_sender: event_tx,
105 actor_failure_sender: err_tx,
106 database_id,
107 term_id,
108 env,
109 },
110 event_rx,
111 err_rx,
112 )
113 }
114
115 pub fn for_test() -> Self {
116 Self::new(
117 DatabaseId {
118 database_id: 114514,
119 },
120 "114514".to_owned(),
121 StreamEnvironment::for_test(),
122 )
123 .0
124 }
125
126 fn send_event(&self, event: LocalBarrierEvent) {
128 let _ = self.barrier_event_sender.send(event);
130 }
131
132 pub fn collect<M>(&self, actor_id: ActorId, barrier: &BarrierInner<M>) {
135 self.send_event(LocalBarrierEvent::ReportActorCollected {
136 actor_id,
137 epoch: barrier.epoch,
138 })
139 }
140
141 pub fn notify_failure(&self, actor_id: ActorId, err: StreamError) {
144 let _ = self
145 .actor_failure_sender
146 .send((actor_id, err.into_unexpected_exit(actor_id)));
147 }
148
149 pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver<Barrier> {
150 let (tx, rx) = mpsc::unbounded_channel();
151 self.send_event(LocalBarrierEvent::RegisterBarrierSender {
152 actor_id,
153 barrier_sender: tx,
154 });
155 rx
156 }
157
158 pub fn register_local_upstream_output(
159 &self,
160 actor_id: ActorId,
161 upstream_actor_id: ActorId,
162 ) -> permit::Receiver {
163 let (tx, rx) = channel_from_config(self.env.config());
164 self.send_event(LocalBarrierEvent::RegisterLocalUpstreamOutput {
165 actor_id,
166 upstream_actor_id,
167 tx,
168 });
169 rx
170 }
171
172 pub fn report_source_list_finished(
173 &self,
174 epoch: EpochPair,
175 actor_id: ActorId,
176 table_id: u32,
177 associated_source_id: u32,
178 ) {
179 self.send_event(LocalBarrierEvent::ReportSourceListFinished {
180 epoch,
181 actor_id,
182 table_id,
183 associated_source_id,
184 });
185 }
186
187 pub fn report_source_load_finished(
188 &self,
189 epoch: EpochPair,
190 actor_id: ActorId,
191 table_id: u32,
192 associated_source_id: u32,
193 ) {
194 self.send_event(LocalBarrierEvent::ReportSourceLoadFinished {
195 epoch,
196 actor_id,
197 table_id,
198 associated_source_id,
199 });
200 }
201
202 pub fn report_refresh_finished(
203 &self,
204 epoch: EpochPair,
205 actor_id: ActorId,
206 table_id: u32,
207 staging_table_id: u32,
208 ) {
209 self.send_event(LocalBarrierEvent::RefreshFinished {
210 epoch,
211 actor_id,
212 table_id,
213 staging_table_id,
214 });
215 }
216}
217
218#[cfg(test)]
219impl LocalBarrierManager {
220 pub(super) fn spawn_for_test()
221 -> crate::task::barrier_worker::EventSender<crate::task::barrier_worker::LocalActorOperation>
222 {
223 use std::sync::Arc;
224 use std::sync::atomic::AtomicU64;
225
226 use crate::executor::monitor::StreamingMetrics;
227 use crate::task::barrier_worker::{EventSender, LocalBarrierWorker};
228
229 let (tx, rx) = unbounded_channel();
230 let _join_handle = LocalBarrierWorker::spawn(
231 StreamEnvironment::for_test(),
232 Arc::new(StreamingMetrics::unused()),
233 None,
234 Arc::new(AtomicU64::new(0)),
235 rx,
236 );
237 EventSender(tx)
238 }
239}