risingwave_stream/task/barrier_manager/
mod.rs1pub mod progress;
16pub use progress::CreateMviewProgressReporter;
17use risingwave_common::catalog::DatabaseId;
18use risingwave_common::util::epoch::EpochPair;
19use tokio::sync::mpsc;
20use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
21
22use crate::error::{IntoUnexpectedExit, StreamError};
23use crate::executor::exchange::permit::{self, channel_from_config};
24use crate::executor::{Barrier, BarrierInner};
25use crate::task::barrier_manager::progress::BackfillState;
26use crate::task::{ActorId, StreamEnvironment};
27
28pub(super) enum LocalBarrierEvent {
32 ReportActorCollected {
33 actor_id: ActorId,
34 epoch: EpochPair,
35 },
36 ReportCreateProgress {
37 epoch: EpochPair,
38 actor: ActorId,
39 state: BackfillState,
40 },
41 ReportSourceLoadFinished {
42 epoch: EpochPair,
43 actor_id: ActorId,
44 table_id: u32,
45 associated_source_id: u32,
46 },
47 RegisterBarrierSender {
48 actor_id: ActorId,
49 barrier_sender: mpsc::UnboundedSender<Barrier>,
50 },
51 RegisterLocalUpstreamOutput {
52 actor_id: ActorId,
53 upstream_actor_id: ActorId,
54 tx: permit::Sender,
55 },
56}
57
58#[derive(Clone)]
62pub struct LocalBarrierManager {
63 barrier_event_sender: UnboundedSender<LocalBarrierEvent>,
64 actor_failure_sender: UnboundedSender<(ActorId, StreamError)>,
65 pub(crate) database_id: DatabaseId,
66 pub(crate) term_id: String,
67 pub(crate) env: StreamEnvironment,
68}
69
70impl LocalBarrierManager {
71 pub(super) fn new(
72 database_id: DatabaseId,
73 term_id: String,
74 env: StreamEnvironment,
75 ) -> (
76 Self,
77 UnboundedReceiver<LocalBarrierEvent>,
78 UnboundedReceiver<(ActorId, StreamError)>,
79 ) {
80 let (event_tx, event_rx) = unbounded_channel();
81 let (err_tx, err_rx) = unbounded_channel();
82 (
83 Self {
84 barrier_event_sender: event_tx,
85 actor_failure_sender: err_tx,
86 database_id,
87 term_id,
88 env,
89 },
90 event_rx,
91 err_rx,
92 )
93 }
94
95 #[cfg(test)]
96 pub fn for_test() -> Self {
97 Self::new(
98 DatabaseId {
99 database_id: 114514,
100 },
101 "114514".to_owned(),
102 StreamEnvironment::for_test(),
103 )
104 .0
105 }
106
107 fn send_event(&self, event: LocalBarrierEvent) {
109 let _ = self.barrier_event_sender.send(event);
111 }
112
113 pub fn collect<M>(&self, actor_id: ActorId, barrier: &BarrierInner<M>) {
116 self.send_event(LocalBarrierEvent::ReportActorCollected {
117 actor_id,
118 epoch: barrier.epoch,
119 })
120 }
121
122 pub fn notify_failure(&self, actor_id: ActorId, err: StreamError) {
125 let _ = self
126 .actor_failure_sender
127 .send((actor_id, err.into_unexpected_exit(actor_id)));
128 }
129
130 pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver<Barrier> {
131 let (tx, rx) = mpsc::unbounded_channel();
132 self.send_event(LocalBarrierEvent::RegisterBarrierSender {
133 actor_id,
134 barrier_sender: tx,
135 });
136 rx
137 }
138
139 pub fn register_local_upstream_output(
140 &self,
141 actor_id: ActorId,
142 upstream_actor_id: ActorId,
143 ) -> permit::Receiver {
144 let (tx, rx) = channel_from_config(self.env.config());
145 self.send_event(LocalBarrierEvent::RegisterLocalUpstreamOutput {
146 actor_id,
147 upstream_actor_id,
148 tx,
149 });
150 rx
151 }
152
153 pub fn report_source_load_finished(
154 &self,
155 epoch: EpochPair,
156 actor_id: ActorId,
157 table_id: u32,
158 associated_source_id: u32,
159 ) {
160 self.send_event(LocalBarrierEvent::ReportSourceLoadFinished {
161 epoch,
162 actor_id,
163 table_id,
164 associated_source_id,
165 });
166 }
167}
168
169#[cfg(test)]
170impl LocalBarrierManager {
171 pub(super) fn spawn_for_test()
172 -> crate::task::barrier_worker::EventSender<crate::task::barrier_worker::LocalActorOperation>
173 {
174 use std::sync::Arc;
175 use std::sync::atomic::AtomicU64;
176
177 use crate::executor::monitor::StreamingMetrics;
178 use crate::task::barrier_worker::{EventSender, LocalBarrierWorker};
179
180 let (tx, rx) = unbounded_channel();
181 let _join_handle = LocalBarrierWorker::spawn(
182 StreamEnvironment::for_test(),
183 Arc::new(StreamingMetrics::unused()),
184 None,
185 Arc::new(AtomicU64::new(0)),
186 rx,
187 );
188 EventSender(tx)
189 }
190}