risingwave_stream/task/barrier_manager/
mod.rs1pub mod cdc_progress;
16pub mod progress;
17
18pub use progress::CreateMviewProgressReporter;
19use risingwave_common::catalog::DatabaseId;
20use risingwave_common::util::epoch::EpochPair;
21use tokio::sync::mpsc;
22use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
23
24use crate::error::{IntoUnexpectedExit, StreamError};
25use crate::executor::exchange::permit::{self, channel_from_config};
26use crate::executor::{Barrier, BarrierInner};
27use crate::task::barrier_manager::progress::BackfillState;
28use crate::task::cdc_progress::CdcTableBackfillState;
29use crate::task::{ActorId, StreamEnvironment};
30
31pub(super) enum LocalBarrierEvent {
35 ReportActorCollected {
36 actor_id: ActorId,
37 epoch: EpochPair,
38 },
39 ReportCreateProgress {
40 epoch: EpochPair,
41 actor: ActorId,
42 state: BackfillState,
43 },
44 ReportSourceLoadFinished {
45 epoch: EpochPair,
46 actor_id: ActorId,
47 table_id: u32,
48 associated_source_id: u32,
49 },
50 RefreshFinished {
51 epoch: EpochPair,
52 actor_id: ActorId,
53 table_id: u32,
54 staging_table_id: u32,
55 },
56 RegisterBarrierSender {
57 actor_id: ActorId,
58 barrier_sender: mpsc::UnboundedSender<Barrier>,
59 },
60 RegisterLocalUpstreamOutput {
61 actor_id: ActorId,
62 upstream_actor_id: ActorId,
63 tx: permit::Sender,
64 },
65 ReportCdcTableBackfillProgress {
66 actor_id: ActorId,
67 epoch: EpochPair,
68 state: CdcTableBackfillState,
69 },
70}
71
72#[derive(Clone)]
76pub struct LocalBarrierManager {
77 barrier_event_sender: UnboundedSender<LocalBarrierEvent>,
78 actor_failure_sender: UnboundedSender<(ActorId, StreamError)>,
79 pub(crate) database_id: DatabaseId,
80 pub(crate) term_id: String,
81 pub(crate) env: StreamEnvironment,
82}
83
84impl LocalBarrierManager {
85 pub(super) fn new(
86 database_id: DatabaseId,
87 term_id: String,
88 env: StreamEnvironment,
89 ) -> (
90 Self,
91 UnboundedReceiver<LocalBarrierEvent>,
92 UnboundedReceiver<(ActorId, StreamError)>,
93 ) {
94 let (event_tx, event_rx) = unbounded_channel();
95 let (err_tx, err_rx) = unbounded_channel();
96 (
97 Self {
98 barrier_event_sender: event_tx,
99 actor_failure_sender: err_tx,
100 database_id,
101 term_id,
102 env,
103 },
104 event_rx,
105 err_rx,
106 )
107 }
108
109 pub fn for_test() -> Self {
110 Self::new(
111 DatabaseId {
112 database_id: 114514,
113 },
114 "114514".to_owned(),
115 StreamEnvironment::for_test(),
116 )
117 .0
118 }
119
120 fn send_event(&self, event: LocalBarrierEvent) {
122 let _ = self.barrier_event_sender.send(event);
124 }
125
126 pub fn collect<M>(&self, actor_id: ActorId, barrier: &BarrierInner<M>) {
129 self.send_event(LocalBarrierEvent::ReportActorCollected {
130 actor_id,
131 epoch: barrier.epoch,
132 })
133 }
134
135 pub fn notify_failure(&self, actor_id: ActorId, err: StreamError) {
138 let _ = self
139 .actor_failure_sender
140 .send((actor_id, err.into_unexpected_exit(actor_id)));
141 }
142
143 pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver<Barrier> {
144 let (tx, rx) = mpsc::unbounded_channel();
145 self.send_event(LocalBarrierEvent::RegisterBarrierSender {
146 actor_id,
147 barrier_sender: tx,
148 });
149 rx
150 }
151
152 pub fn register_local_upstream_output(
153 &self,
154 actor_id: ActorId,
155 upstream_actor_id: ActorId,
156 ) -> permit::Receiver {
157 let (tx, rx) = channel_from_config(self.env.config());
158 self.send_event(LocalBarrierEvent::RegisterLocalUpstreamOutput {
159 actor_id,
160 upstream_actor_id,
161 tx,
162 });
163 rx
164 }
165
166 pub fn report_source_load_finished(
167 &self,
168 epoch: EpochPair,
169 actor_id: ActorId,
170 table_id: u32,
171 associated_source_id: u32,
172 ) {
173 self.send_event(LocalBarrierEvent::ReportSourceLoadFinished {
174 epoch,
175 actor_id,
176 table_id,
177 associated_source_id,
178 });
179 }
180
181 pub fn report_refresh_finished(
182 &self,
183 epoch: EpochPair,
184 actor_id: ActorId,
185 table_id: u32,
186 staging_table_id: u32,
187 ) {
188 self.send_event(LocalBarrierEvent::RefreshFinished {
189 epoch,
190 actor_id,
191 table_id,
192 staging_table_id,
193 });
194 }
195}
196
197#[cfg(test)]
198impl LocalBarrierManager {
199 pub(super) fn spawn_for_test()
200 -> crate::task::barrier_worker::EventSender<crate::task::barrier_worker::LocalActorOperation>
201 {
202 use std::sync::Arc;
203 use std::sync::atomic::AtomicU64;
204
205 use crate::executor::monitor::StreamingMetrics;
206 use crate::task::barrier_worker::{EventSender, LocalBarrierWorker};
207
208 let (tx, rx) = unbounded_channel();
209 let _join_handle = LocalBarrierWorker::spawn(
210 StreamEnvironment::for_test(),
211 Arc::new(StreamingMetrics::unused()),
212 None,
213 Arc::new(AtomicU64::new(0)),
214 rx,
215 );
216 EventSender(tx)
217 }
218}