risingwave_stream/task/barrier_manager/
mod.rs1pub mod cdc_progress;
16pub mod progress;
17
18pub use progress::CreateMviewProgressReporter;
19use risingwave_common::catalog::DatabaseId;
20use risingwave_common::id::{SourceId, TableId};
21use risingwave_common::util::epoch::EpochPair;
22use risingwave_pb::id::FragmentId;
23use tokio::sync::mpsc;
24use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
25
26use crate::error::{IntoUnexpectedExit, StreamError};
27use crate::executor::exchange::permit::{self, channel_from_config};
28use crate::executor::{Barrier, BarrierInner};
29use crate::task::barrier_manager::progress::BackfillState;
30use crate::task::cdc_progress::CdcTableBackfillState;
31use crate::task::{ActorId, StreamEnvironment};
32
33pub(super) enum LocalBarrierEvent {
37 ReportActorCollected {
38 actor_id: ActorId,
39 epoch: EpochPair,
40 },
41 ReportCreateProgress {
42 epoch: EpochPair,
43 fragment_id: FragmentId,
44 actor: ActorId,
45 state: BackfillState,
46 },
47 ReportSourceListFinished {
48 epoch: EpochPair,
49 actor_id: ActorId,
50 table_id: TableId,
51 associated_source_id: SourceId,
52 },
53 ReportSourceLoadFinished {
54 epoch: EpochPair,
55 actor_id: ActorId,
56 table_id: TableId,
57 associated_source_id: SourceId,
58 },
59 RefreshFinished {
60 epoch: EpochPair,
61 actor_id: ActorId,
62 table_id: TableId,
63 staging_table_id: TableId,
64 },
65 RegisterBarrierSender {
66 actor_id: ActorId,
67 barrier_sender: mpsc::UnboundedSender<Barrier>,
68 },
69 RegisterLocalUpstreamOutput {
70 actor_id: ActorId,
71 upstream_actor_id: ActorId,
72 tx: permit::Sender,
73 },
74 ReportCdcTableBackfillProgress {
75 actor_id: ActorId,
76 epoch: EpochPair,
77 state: CdcTableBackfillState,
78 },
79}
80
81#[derive(Clone)]
85pub struct LocalBarrierManager {
86 barrier_event_sender: UnboundedSender<LocalBarrierEvent>,
87 actor_failure_sender: UnboundedSender<(ActorId, StreamError)>,
88 pub(crate) database_id: DatabaseId,
89 pub(crate) term_id: String,
90 pub(crate) env: StreamEnvironment,
91}
92
93impl LocalBarrierManager {
94 pub(super) fn new(
95 database_id: DatabaseId,
96 term_id: String,
97 env: StreamEnvironment,
98 ) -> (
99 Self,
100 UnboundedReceiver<LocalBarrierEvent>,
101 UnboundedReceiver<(ActorId, StreamError)>,
102 ) {
103 let (event_tx, event_rx) = unbounded_channel();
104 let (err_tx, err_rx) = unbounded_channel();
105 (
106 Self {
107 barrier_event_sender: event_tx,
108 actor_failure_sender: err_tx,
109 database_id,
110 term_id,
111 env,
112 },
113 event_rx,
114 err_rx,
115 )
116 }
117
118 pub fn for_test() -> Self {
119 Self::new(
120 114514.into(),
121 "114514".to_owned(),
122 StreamEnvironment::for_test(),
123 )
124 .0
125 }
126
127 fn send_event(&self, event: LocalBarrierEvent) {
129 let _ = self.barrier_event_sender.send(event);
131 }
132
133 pub fn collect<M>(&self, actor_id: ActorId, barrier: &BarrierInner<M>) {
136 self.send_event(LocalBarrierEvent::ReportActorCollected {
137 actor_id,
138 epoch: barrier.epoch,
139 })
140 }
141
142 pub fn notify_failure(&self, actor_id: ActorId, err: StreamError) {
145 let _ = self
146 .actor_failure_sender
147 .send((actor_id, err.into_unexpected_exit(actor_id)));
148 }
149
150 pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver<Barrier> {
151 let (tx, rx) = mpsc::unbounded_channel();
152 self.send_event(LocalBarrierEvent::RegisterBarrierSender {
153 actor_id,
154 barrier_sender: tx,
155 });
156 rx
157 }
158
159 pub fn register_local_upstream_output(
160 &self,
161 actor_id: ActorId,
162 upstream_actor_id: ActorId,
163 ) -> permit::Receiver {
164 let (tx, rx) = channel_from_config(self.env.global_config());
165 self.send_event(LocalBarrierEvent::RegisterLocalUpstreamOutput {
166 actor_id,
167 upstream_actor_id,
168 tx,
169 });
170 rx
171 }
172
173 pub fn report_source_list_finished(
174 &self,
175 epoch: EpochPair,
176 actor_id: ActorId,
177 table_id: TableId,
178 associated_source_id: SourceId,
179 ) {
180 self.send_event(LocalBarrierEvent::ReportSourceListFinished {
181 epoch,
182 actor_id,
183 table_id,
184 associated_source_id,
185 });
186 }
187
188 pub fn report_source_load_finished(
189 &self,
190 epoch: EpochPair,
191 actor_id: ActorId,
192 table_id: TableId,
193 associated_source_id: SourceId,
194 ) {
195 self.send_event(LocalBarrierEvent::ReportSourceLoadFinished {
196 epoch,
197 actor_id,
198 table_id,
199 associated_source_id,
200 });
201 }
202
203 pub fn report_refresh_finished(
204 &self,
205 epoch: EpochPair,
206 actor_id: ActorId,
207 table_id: TableId,
208 staging_table_id: TableId,
209 ) {
210 self.send_event(LocalBarrierEvent::RefreshFinished {
211 epoch,
212 actor_id,
213 table_id,
214 staging_table_id,
215 });
216 }
217}
218
219#[cfg(test)]
220impl LocalBarrierManager {
221 pub(super) fn spawn_for_test()
222 -> crate::task::barrier_worker::EventSender<crate::task::barrier_worker::LocalActorOperation>
223 {
224 use std::sync::Arc;
225 use std::sync::atomic::AtomicU64;
226
227 use crate::executor::monitor::StreamingMetrics;
228 use crate::task::barrier_worker::{EventSender, LocalBarrierWorker};
229
230 let (tx, rx) = unbounded_channel();
231 let _join_handle = LocalBarrierWorker::spawn(
232 StreamEnvironment::for_test(),
233 Arc::new(StreamingMetrics::unused()),
234 None,
235 Arc::new(AtomicU64::new(0)),
236 rx,
237 );
238 EventSender(tx)
239 }
240}