risingwave_stream/task/barrier_manager/
mod.rs1pub mod cdc_progress;
16pub mod progress;
17
18pub use progress::CreateMviewProgressReporter;
19use risingwave_common::catalog::DatabaseId;
20use risingwave_common::util::epoch::EpochPair;
21use tokio::sync::mpsc;
22use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
23
24use crate::error::{IntoUnexpectedExit, StreamError};
25use crate::executor::exchange::permit::{self, channel_from_config};
26use crate::executor::{Barrier, BarrierInner};
27use crate::task::barrier_manager::progress::BackfillState;
28use crate::task::cdc_progress::CdcTableBackfillState;
29use crate::task::{ActorId, StreamEnvironment};
30
31pub(super) enum LocalBarrierEvent {
35 ReportActorCollected {
36 actor_id: ActorId,
37 epoch: EpochPair,
38 },
39 ReportCreateProgress {
40 epoch: EpochPair,
41 actor: ActorId,
42 state: BackfillState,
43 },
44 ReportSourceLoadFinished {
45 epoch: EpochPair,
46 actor_id: ActorId,
47 table_id: u32,
48 associated_source_id: u32,
49 },
50 RegisterBarrierSender {
51 actor_id: ActorId,
52 barrier_sender: mpsc::UnboundedSender<Barrier>,
53 },
54 RegisterLocalUpstreamOutput {
55 actor_id: ActorId,
56 upstream_actor_id: ActorId,
57 tx: permit::Sender,
58 },
59 ReportCdcTableBackfillProgress {
60 actor_id: ActorId,
61 epoch: EpochPair,
62 state: CdcTableBackfillState,
63 },
64}
65
66#[derive(Clone)]
70pub struct LocalBarrierManager {
71 barrier_event_sender: UnboundedSender<LocalBarrierEvent>,
72 actor_failure_sender: UnboundedSender<(ActorId, StreamError)>,
73 pub(crate) database_id: DatabaseId,
74 pub(crate) term_id: String,
75 pub(crate) env: StreamEnvironment,
76}
77
78impl LocalBarrierManager {
79 pub(super) fn new(
80 database_id: DatabaseId,
81 term_id: String,
82 env: StreamEnvironment,
83 ) -> (
84 Self,
85 UnboundedReceiver<LocalBarrierEvent>,
86 UnboundedReceiver<(ActorId, StreamError)>,
87 ) {
88 let (event_tx, event_rx) = unbounded_channel();
89 let (err_tx, err_rx) = unbounded_channel();
90 (
91 Self {
92 barrier_event_sender: event_tx,
93 actor_failure_sender: err_tx,
94 database_id,
95 term_id,
96 env,
97 },
98 event_rx,
99 err_rx,
100 )
101 }
102
103 #[cfg(test)]
104 pub fn for_test() -> Self {
105 Self::new(
106 DatabaseId {
107 database_id: 114514,
108 },
109 "114514".to_owned(),
110 StreamEnvironment::for_test(),
111 )
112 .0
113 }
114
115 fn send_event(&self, event: LocalBarrierEvent) {
117 let _ = self.barrier_event_sender.send(event);
119 }
120
121 pub fn collect<M>(&self, actor_id: ActorId, barrier: &BarrierInner<M>) {
124 self.send_event(LocalBarrierEvent::ReportActorCollected {
125 actor_id,
126 epoch: barrier.epoch,
127 })
128 }
129
130 pub fn notify_failure(&self, actor_id: ActorId, err: StreamError) {
133 let _ = self
134 .actor_failure_sender
135 .send((actor_id, err.into_unexpected_exit(actor_id)));
136 }
137
138 pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver<Barrier> {
139 let (tx, rx) = mpsc::unbounded_channel();
140 self.send_event(LocalBarrierEvent::RegisterBarrierSender {
141 actor_id,
142 barrier_sender: tx,
143 });
144 rx
145 }
146
147 pub fn register_local_upstream_output(
148 &self,
149 actor_id: ActorId,
150 upstream_actor_id: ActorId,
151 ) -> permit::Receiver {
152 let (tx, rx) = channel_from_config(self.env.config());
153 self.send_event(LocalBarrierEvent::RegisterLocalUpstreamOutput {
154 actor_id,
155 upstream_actor_id,
156 tx,
157 });
158 rx
159 }
160
161 pub fn report_source_load_finished(
162 &self,
163 epoch: EpochPair,
164 actor_id: ActorId,
165 table_id: u32,
166 associated_source_id: u32,
167 ) {
168 self.send_event(LocalBarrierEvent::ReportSourceLoadFinished {
169 epoch,
170 actor_id,
171 table_id,
172 associated_source_id,
173 });
174 }
175}
176
177#[cfg(test)]
178impl LocalBarrierManager {
179 pub(super) fn spawn_for_test()
180 -> crate::task::barrier_worker::EventSender<crate::task::barrier_worker::LocalActorOperation>
181 {
182 use std::sync::Arc;
183 use std::sync::atomic::AtomicU64;
184
185 use crate::executor::monitor::StreamingMetrics;
186 use crate::task::barrier_worker::{EventSender, LocalBarrierWorker};
187
188 let (tx, rx) = unbounded_channel();
189 let _join_handle = LocalBarrierWorker::spawn(
190 StreamEnvironment::for_test(),
191 Arc::new(StreamingMetrics::unused()),
192 None,
193 Arc::new(AtomicU64::new(0)),
194 rx,
195 );
196 EventSender(tx)
197 }
198}