1use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::sync::Arc;
18
19use risingwave_common::id::JobId;
20use risingwave_common::system_param::reader::SystemParamsReader;
21use risingwave_meta_model::ObjectId;
22use risingwave_pb::common::{WorkerNode, WorkerType};
23use risingwave_pb::meta::object::PbObjectInfo;
24use risingwave_pb::meta::subscribe_response::{Info, Operation};
25use risingwave_pb::meta::{
26 MetaSnapshot, PbObject, PbObjectGroup, SubscribeResponse, SubscribeType,
27};
28use thiserror_ext::AsReport;
29use tokio::sync::Mutex;
30use tokio::sync::mpsc::{self, UnboundedSender};
31use tonic::Status;
32
33use crate::controller::SqlMetaStore;
34use crate::manager::WorkerKey;
35use crate::manager::notification_version::NotificationVersionGenerator;
36use crate::model::FragmentId;
37
38pub type MessageStatus = Status;
39pub type Notification = Result<SubscribeResponse, Status>;
40pub type NotificationManagerRef = Arc<NotificationManager>;
41pub type NotificationVersion = u64;
42pub const IGNORED_NOTIFICATION_VERSION: u64 = 0;
44
45#[derive(Clone, Debug)]
46pub enum LocalNotification {
47 WorkerNodeDeleted(WorkerNode),
48 WorkerNodeActivated(WorkerNode),
49 SystemParamsChange(SystemParamsReader),
50 BatchParallelismChange,
51 FragmentMappingsUpsert(Vec<FragmentId>),
52 FragmentMappingsDelete(Vec<FragmentId>),
53 SourceDropped(ObjectId),
54 StreamingJobBackfillFinished(JobId),
55}
56
57#[derive(Debug)]
58struct Target {
59 subscribe_type: SubscribeType,
60 worker_key: Option<WorkerKey>,
62}
63
64impl From<SubscribeType> for Target {
65 fn from(value: SubscribeType) -> Self {
66 Self {
67 subscribe_type: value,
68 worker_key: None,
69 }
70 }
71}
72
73#[derive(Debug)]
74struct Task {
75 target: Target,
76 operation: Operation,
77 info: Info,
78 version: Option<NotificationVersion>,
79}
80
81pub struct NotificationManager {
83 core: Arc<parking_lot::Mutex<NotificationManagerCore>>,
84 task_tx: UnboundedSender<Task>,
86 version_generator: Mutex<NotificationVersionGenerator>,
88}
89
90impl NotificationManager {
91 pub async fn new(meta_store_impl: SqlMetaStore) -> Self {
92 let (task_tx, mut task_rx) = mpsc::unbounded_channel::<Task>();
94 let core = Arc::new(parking_lot::Mutex::new(NotificationManagerCore::new()));
95 let core_clone = core.clone();
96 let version_generator = NotificationVersionGenerator::new(meta_store_impl)
97 .await
98 .unwrap();
99
100 tokio::spawn(async move {
101 while let Some(task) = task_rx.recv().await {
102 let response = SubscribeResponse {
103 status: None,
104 operation: task.operation as i32,
105 info: Some(task.info),
106 version: task.version.unwrap_or_default(),
107 };
108 core.lock().notify(task.target, response);
109 }
110 });
111
112 Self {
113 core: core_clone,
114 task_tx,
115 version_generator: Mutex::new(version_generator),
116 }
117 }
118
119 pub fn abort_all(&self) {
120 let mut guard = self.core.lock();
121 *guard = NotificationManagerCore::new();
122 guard.exiting = true;
123 }
124
125 #[inline(always)]
126 fn notify(
127 &self,
128 target: Target,
129 operation: Operation,
130 info: Info,
131 version: Option<NotificationVersion>,
132 ) {
133 let task = Task {
134 target,
135 operation,
136 info,
137 version,
138 };
139 self.task_tx.send(task).unwrap();
140 }
141
142 async fn notify_with_version(
144 &self,
145 target: Target,
146 operation: Operation,
147 info: Info,
148 ) -> NotificationVersion {
149 let mut version_guard = self.version_generator.lock().await;
150 version_guard.increase_version().await;
151 let version = version_guard.current_version();
152 self.notify(target, operation, info, Some(version));
153 version
154 }
155
156 #[inline(always)]
158 fn notify_without_version(&self, target: Target, operation: Operation, info: Info) {
159 self.notify(target, operation, info, None);
160 }
161
162 pub fn notify_snapshot(
163 &self,
164 worker_key: WorkerKey,
165 subscribe_type: SubscribeType,
166 meta_snapshot: MetaSnapshot,
167 ) {
168 self.notify_without_version(
169 Target {
170 subscribe_type,
171 worker_key: Some(worker_key),
172 },
173 Operation::Snapshot,
174 Info::Snapshot(meta_snapshot),
175 )
176 }
177
178 pub fn notify_all_without_version(&self, operation: Operation, info: Info) {
179 for subscribe_type in [
180 SubscribeType::Frontend,
181 SubscribeType::Hummock,
182 SubscribeType::Compactor,
183 SubscribeType::Compute,
184 ] {
185 self.notify_without_version(subscribe_type.into(), operation, info.clone());
186 }
187 }
188
189 pub async fn notify_frontend(&self, operation: Operation, info: Info) -> NotificationVersion {
190 self.notify_with_version(SubscribeType::Frontend.into(), operation, info)
191 .await
192 }
193
194 pub async fn notify_frontend_object_info(
195 &self,
196 operation: Operation,
197 object_info: PbObjectInfo,
198 ) -> NotificationVersion {
199 self.notify_with_version(
200 SubscribeType::Frontend.into(),
201 operation,
202 Info::ObjectGroup(PbObjectGroup {
203 objects: vec![PbObject {
204 object_info: object_info.into(),
205 }],
206 }),
207 )
208 .await
209 }
210
211 pub async fn notify_hummock(&self, operation: Operation, info: Info) -> NotificationVersion {
212 self.notify_with_version(SubscribeType::Hummock.into(), operation, info)
213 .await
214 }
215
216 pub async fn notify_compactor(&self, operation: Operation, info: Info) -> NotificationVersion {
217 self.notify_with_version(SubscribeType::Compactor.into(), operation, info)
218 .await
219 }
220
221 pub fn notify_compute_without_version(&self, operation: Operation, info: Info) {
222 self.notify_without_version(SubscribeType::Compute.into(), operation, info)
223 }
224
225 pub fn notify_frontend_without_version(&self, operation: Operation, info: Info) {
226 self.notify_without_version(SubscribeType::Frontend.into(), operation, info)
227 }
228
229 pub fn notify_hummock_without_version(&self, operation: Operation, info: Info) {
230 self.notify_without_version(SubscribeType::Hummock.into(), operation, info)
231 }
232
233 pub fn notify_compactor_without_version(&self, operation: Operation, info: Info) {
234 self.notify_without_version(SubscribeType::Compactor.into(), operation, info)
235 }
236
237 #[cfg(any(test, feature = "test"))]
238 pub fn notify_hummock_with_version(
239 &self,
240 operation: Operation,
241 info: Info,
242 version: Option<NotificationVersion>,
243 ) {
244 self.notify(SubscribeType::Hummock.into(), operation, info, version)
245 }
246
247 pub fn notify_local_subscribers(&self, notification: LocalNotification) {
248 let mut core_guard = self.core.lock();
249 core_guard.local_senders.retain(|sender| {
250 if let Err(err) = sender.send(notification.clone()) {
251 tracing::warn!(error = %err.as_report(), "Failed to notify local subscriber");
252 return false;
253 }
254 true
255 });
256 }
257
258 pub fn delete_sender(&self, worker_type: WorkerType, worker_key: WorkerKey) {
260 let mut core_guard = self.core.lock();
261 match worker_type {
264 WorkerType::Frontend => core_guard.frontend_senders.remove(&worker_key),
265 WorkerType::ComputeNode | WorkerType::RiseCtl => {
266 core_guard.hummock_senders.remove(&worker_key)
267 }
268 WorkerType::Compactor => core_guard.compactor_senders.remove(&worker_key),
269 _ => unreachable!(),
270 };
271 }
272
273 pub fn insert_sender(
275 &self,
276 subscribe_type: SubscribeType,
277 worker_key: WorkerKey,
278 sender: UnboundedSender<Notification>,
279 ) {
280 let mut core_guard = self.core.lock();
281 if core_guard.exiting {
282 tracing::warn!("notification manager exiting.");
283 return;
284 }
285 let senders = core_guard.senders_of(subscribe_type);
286
287 senders.insert(worker_key, sender);
288 }
289
290 pub fn insert_local_sender(&self, sender: UnboundedSender<LocalNotification>) {
291 let mut core_guard = self.core.lock();
292 if core_guard.exiting {
293 tracing::warn!("notification manager exiting.");
294 return;
295 }
296 core_guard.local_senders.push(sender);
297 }
298
299 #[cfg(test)]
300 pub fn clear_local_sender(&self) {
301 self.core.lock().local_senders.clear();
302 }
303
304 pub async fn current_version(&self) -> NotificationVersion {
305 let version_guard = self.version_generator.lock().await;
306 version_guard.current_version()
307 }
308}
309
310type SenderMap = HashMap<WorkerKey, UnboundedSender<Notification>>;
311
312struct NotificationManagerCore {
313 frontend_senders: SenderMap,
315 hummock_senders: SenderMap,
317 compactor_senders: SenderMap,
319 compute_senders: HashMap<WorkerKey, UnboundedSender<Notification>>,
321 local_senders: Vec<UnboundedSender<LocalNotification>>,
323 exiting: bool,
324}
325
326impl NotificationManagerCore {
327 fn new() -> Self {
328 Self {
329 frontend_senders: HashMap::new(),
330 hummock_senders: HashMap::new(),
331 compactor_senders: HashMap::new(),
332 compute_senders: HashMap::new(),
333 local_senders: vec![],
334 exiting: false,
335 }
336 }
337
338 fn notify(&mut self, target: Target, response: SubscribeResponse) {
339 macro_rules! warn_send_failure {
340 ($subscribe_type:expr, $worker_key:expr, $err:expr) => {
341 tracing::warn!(
342 "Failed to notify {:?} {:?}: {}",
343 $subscribe_type,
344 $worker_key,
345 $err
346 );
347 };
348 }
349
350 let senders = self.senders_of(target.subscribe_type);
351
352 if let Some(worker_key) = target.worker_key {
353 match senders.entry(worker_key.clone()) {
354 Entry::Occupied(entry) => {
355 let _ = entry.get().send(Ok(response)).inspect_err(|err| {
356 warn_send_failure!(target.subscribe_type, &worker_key, err.as_report());
357 entry.remove_entry();
358 });
359 }
360 Entry::Vacant(_) => {
361 tracing::warn!("Failed to find notification sender of {:?}", worker_key)
362 }
363 }
364 } else {
365 senders.retain(|worker_key, sender| {
366 sender
367 .send(Ok(response.clone()))
368 .inspect_err(|err| {
369 warn_send_failure!(target.subscribe_type, &worker_key, err.as_report());
370 })
371 .is_ok()
372 });
373 }
374 }
375
376 fn senders_of(&mut self, subscribe_type: SubscribeType) -> &mut SenderMap {
377 match subscribe_type {
378 SubscribeType::Frontend => &mut self.frontend_senders,
379 SubscribeType::Hummock => &mut self.hummock_senders,
380 SubscribeType::Compactor => &mut self.compactor_senders,
381 SubscribeType::Compute => &mut self.compute_senders,
382 SubscribeType::Unspecified => unreachable!(),
383 }
384 }
385}
386
387#[cfg(test)]
388mod tests {
389 use risingwave_common::id::JobId;
390 use risingwave_pb::common::HostAddress;
391
392 use super::*;
393 use crate::manager::WorkerKey;
394
395 #[tokio::test]
396 async fn test_multiple_subscribers_one_worker() {
397 let mgr = NotificationManager::new(SqlMetaStore::for_test().await).await;
398 let worker_key1 = WorkerKey(HostAddress {
399 host: "a".to_owned(),
400 port: 1,
401 });
402 let worker_key2 = WorkerKey(HostAddress {
403 host: "a".to_owned(),
404 port: 2,
405 });
406 let (tx1, mut rx1) = mpsc::unbounded_channel();
407 let (tx2, mut rx2) = mpsc::unbounded_channel();
408 let (tx3, mut rx3) = mpsc::unbounded_channel();
409 mgr.insert_sender(SubscribeType::Hummock, worker_key1.clone(), tx1);
410 mgr.insert_sender(SubscribeType::Frontend, worker_key1.clone(), tx2);
411 mgr.insert_sender(SubscribeType::Frontend, worker_key2, tx3);
412 mgr.notify_snapshot(
413 worker_key1.clone(),
414 SubscribeType::Hummock,
415 MetaSnapshot::default(),
416 );
417 assert!(rx1.recv().await.is_some());
418 assert!(rx2.try_recv().is_err());
419 assert!(rx3.try_recv().is_err());
420
421 mgr.notify_frontend(Operation::Add, Info::Database(Default::default()))
422 .await;
423 assert!(rx1.try_recv().is_err());
424 assert!(rx2.recv().await.is_some());
425 assert!(rx3.recv().await.is_some());
426 }
427
428 #[tokio::test]
429 async fn test_local_notification_backfill_finished() {
430 let mgr = NotificationManager::new(SqlMetaStore::for_test().await).await;
431 let (tx, mut rx) = mpsc::unbounded_channel();
432 mgr.insert_local_sender(tx);
433
434 let job_id = JobId::new(42);
435 mgr.notify_local_subscribers(LocalNotification::StreamingJobBackfillFinished(job_id));
436
437 match rx.recv().await.expect("should receive notification") {
438 LocalNotification::StreamingJobBackfillFinished(received) => {
439 assert_eq!(received, job_id);
440 }
441 other => panic!("unexpected notification: {other:?}"),
442 }
443 }
444}