1use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::sync::Arc;
18
19use risingwave_common::id::JobId;
20use risingwave_common::system_param::reader::SystemParamsReader;
21use risingwave_meta_model::ObjectId;
22use risingwave_pb::common::{WorkerNode, WorkerType};
23use risingwave_pb::meta::object::PbObjectInfo;
24use risingwave_pb::meta::subscribe_response::{Info, Operation};
25use risingwave_pb::meta::{
26 MetaSnapshot, PbObject, PbObjectGroup, SubscribeResponse, SubscribeType,
27};
28use thiserror_ext::AsReport;
29use tokio::sync::Mutex;
30use tokio::sync::mpsc::{self, UnboundedSender};
31use tonic::Status;
32
33use crate::controller::SqlMetaStore;
34use crate::manager::WorkerKey;
35use crate::manager::notification_version::NotificationVersionGenerator;
36use crate::model::FragmentId;
37
38pub type MessageStatus = Status;
39pub type Notification = Result<SubscribeResponse, Status>;
40pub type NotificationManagerRef = Arc<NotificationManager>;
41pub type NotificationVersion = u64;
42pub const IGNORED_NOTIFICATION_VERSION: u64 = 0;
44
45#[derive(Clone, Debug)]
46pub enum LocalNotification {
47 WorkerNodeDeleted(WorkerNode),
48 WorkerNodeActivated(WorkerNode),
49 SystemParamsChange(SystemParamsReader),
50 BatchParallelismChange,
51 FragmentMappingsUpsert(Vec<FragmentId>),
52 FragmentMappingsDelete(Vec<FragmentId>),
53 SourceDropped(ObjectId),
54 StreamingJobBackfillFinished(JobId),
55}
56
57#[derive(Debug)]
58struct Target {
59 subscribe_type: SubscribeType,
60 worker_key: Option<WorkerKey>,
62}
63
64impl From<SubscribeType> for Target {
65 fn from(value: SubscribeType) -> Self {
66 Self {
67 subscribe_type: value,
68 worker_key: None,
69 }
70 }
71}
72
73#[derive(Debug)]
74struct Task {
75 target: Target,
76 operation: Operation,
77 info: Info,
78 version: Option<NotificationVersion>,
79}
80
81pub struct NotificationManager {
83 core: Arc<parking_lot::Mutex<NotificationManagerCore>>,
84 task_tx: UnboundedSender<Task>,
86 version_generator: Mutex<NotificationVersionGenerator>,
88}
89
90impl NotificationManager {
91 pub async fn new(meta_store_impl: SqlMetaStore) -> Self {
92 let (task_tx, mut task_rx) = mpsc::unbounded_channel::<Task>();
94 let core = Arc::new(parking_lot::Mutex::new(NotificationManagerCore::new()));
95 let core_clone = core.clone();
96 let version_generator = NotificationVersionGenerator::new(meta_store_impl)
97 .await
98 .unwrap();
99
100 tokio::spawn(async move {
101 while let Some(task) = task_rx.recv().await {
102 let response = SubscribeResponse {
103 status: None,
104 operation: task.operation as i32,
105 info: Some(task.info),
106 version: task.version.unwrap_or_default(),
107 };
108 core.lock().notify(task.target, response);
109 }
110 });
111
112 Self {
113 core: core_clone,
114 task_tx,
115 version_generator: Mutex::new(version_generator),
116 }
117 }
118
119 pub fn abort_all(&self) {
120 let mut guard = self.core.lock();
121 *guard = NotificationManagerCore::new();
122 guard.exiting = true;
123 }
124
125 #[inline(always)]
126 fn notify(
127 &self,
128 target: Target,
129 operation: Operation,
130 info: Info,
131 version: Option<NotificationVersion>,
132 ) {
133 let task = Task {
134 target,
135 operation,
136 info,
137 version,
138 };
139 self.task_tx.send(task).unwrap();
140 }
141
142 async fn notify_with_version(
144 &self,
145 target: Target,
146 operation: Operation,
147 info: Info,
148 ) -> NotificationVersion {
149 let mut version_guard = self.version_generator.lock().await;
150 version_guard.increase_version().await;
151 let version = version_guard.current_version();
152 self.notify(target, operation, info, Some(version));
153 version
154 }
155
156 #[inline(always)]
158 fn notify_without_version(&self, target: Target, operation: Operation, info: Info) {
159 self.notify(target, operation, info, None);
160 }
161
162 pub fn notify_snapshot(
163 &self,
164 worker_key: WorkerKey,
165 subscribe_type: SubscribeType,
166 meta_snapshot: MetaSnapshot,
167 ) {
168 self.notify_without_version(
169 Target {
170 subscribe_type,
171 worker_key: Some(worker_key),
172 },
173 Operation::Snapshot,
174 Info::Snapshot(meta_snapshot),
175 )
176 }
177
178 pub fn notify_all_without_version(&self, operation: Operation, info: Info) {
179 for subscribe_type in [
180 SubscribeType::Frontend,
181 SubscribeType::Hummock,
182 SubscribeType::Compactor,
183 SubscribeType::Compute,
184 ] {
185 self.notify_without_version(subscribe_type.into(), operation, info.clone());
186 }
187 }
188
189 pub async fn notify_frontend(&self, operation: Operation, info: Info) -> NotificationVersion {
190 self.notify_with_version(SubscribeType::Frontend.into(), operation, info)
191 .await
192 }
193
194 pub async fn notify_frontend_object_info(
195 &self,
196 operation: Operation,
197 object_info: PbObjectInfo,
198 ) -> NotificationVersion {
199 self.notify_with_version(
200 SubscribeType::Frontend.into(),
201 operation,
202 Info::ObjectGroup(PbObjectGroup {
203 objects: vec![PbObject {
204 object_info: object_info.into(),
205 }],
206 dependencies: vec![],
207 }),
208 )
209 .await
210 }
211
212 pub async fn notify_hummock(&self, operation: Operation, info: Info) -> NotificationVersion {
213 self.notify_with_version(SubscribeType::Hummock.into(), operation, info)
214 .await
215 }
216
217 pub async fn notify_compactor(&self, operation: Operation, info: Info) -> NotificationVersion {
218 self.notify_with_version(SubscribeType::Compactor.into(), operation, info)
219 .await
220 }
221
222 pub fn notify_compute_without_version(&self, operation: Operation, info: Info) {
223 self.notify_without_version(SubscribeType::Compute.into(), operation, info)
224 }
225
226 pub fn notify_frontend_without_version(&self, operation: Operation, info: Info) {
227 self.notify_without_version(SubscribeType::Frontend.into(), operation, info)
228 }
229
230 pub fn notify_hummock_without_version(&self, operation: Operation, info: Info) {
231 self.notify_without_version(SubscribeType::Hummock.into(), operation, info)
232 }
233
234 pub fn notify_compactor_without_version(&self, operation: Operation, info: Info) {
235 self.notify_without_version(SubscribeType::Compactor.into(), operation, info)
236 }
237
238 #[cfg(any(test, feature = "test"))]
239 pub fn notify_hummock_with_version(
240 &self,
241 operation: Operation,
242 info: Info,
243 version: Option<NotificationVersion>,
244 ) {
245 self.notify(SubscribeType::Hummock.into(), operation, info, version)
246 }
247
248 pub fn notify_local_subscribers(&self, notification: LocalNotification) {
249 let mut core_guard = self.core.lock();
250 core_guard.local_senders.retain(|sender| {
251 if let Err(err) = sender.send(notification.clone()) {
252 tracing::warn!(error = %err.as_report(), "Failed to notify local subscriber");
253 return false;
254 }
255 true
256 });
257 }
258
259 pub fn delete_sender(&self, worker_type: WorkerType, worker_key: WorkerKey) {
261 let mut core_guard = self.core.lock();
262 match worker_type {
265 WorkerType::Frontend => core_guard.frontend_senders.remove(&worker_key),
266 WorkerType::ComputeNode | WorkerType::RiseCtl => {
267 core_guard.hummock_senders.remove(&worker_key)
268 }
269 WorkerType::Compactor => core_guard.compactor_senders.remove(&worker_key),
270 _ => unreachable!(),
271 };
272 }
273
274 pub fn insert_sender(
276 &self,
277 subscribe_type: SubscribeType,
278 worker_key: WorkerKey,
279 sender: UnboundedSender<Notification>,
280 ) {
281 let mut core_guard = self.core.lock();
282 if core_guard.exiting {
283 tracing::warn!("notification manager exiting.");
284 return;
285 }
286 let senders = core_guard.senders_of(subscribe_type);
287
288 senders.insert(worker_key, sender);
289 }
290
291 pub fn insert_local_sender(&self, sender: UnboundedSender<LocalNotification>) {
292 let mut core_guard = self.core.lock();
293 if core_guard.exiting {
294 tracing::warn!("notification manager exiting.");
295 return;
296 }
297 core_guard.local_senders.push(sender);
298 }
299
300 #[cfg(test)]
301 pub fn clear_local_sender(&self) {
302 self.core.lock().local_senders.clear();
303 }
304
305 pub async fn current_version(&self) -> NotificationVersion {
306 let version_guard = self.version_generator.lock().await;
307 version_guard.current_version()
308 }
309}
310
311type SenderMap = HashMap<WorkerKey, UnboundedSender<Notification>>;
312
313struct NotificationManagerCore {
314 frontend_senders: SenderMap,
316 hummock_senders: SenderMap,
318 compactor_senders: SenderMap,
320 compute_senders: SenderMap,
322 local_senders: Vec<UnboundedSender<LocalNotification>>,
324 exiting: bool,
325}
326
327impl NotificationManagerCore {
328 fn new() -> Self {
329 Self {
330 frontend_senders: HashMap::new(),
331 hummock_senders: HashMap::new(),
332 compactor_senders: HashMap::new(),
333 compute_senders: HashMap::new(),
334 local_senders: vec![],
335 exiting: false,
336 }
337 }
338
339 fn notify(&mut self, target: Target, response: SubscribeResponse) {
340 macro_rules! warn_send_failure {
341 ($subscribe_type:expr, $worker_key:expr, $err:expr) => {
342 tracing::warn!(
343 "Failed to notify {:?} {:?}: {}",
344 $subscribe_type,
345 $worker_key,
346 $err
347 );
348 };
349 }
350
351 let senders = self.senders_of(target.subscribe_type);
352
353 if let Some(worker_key) = target.worker_key {
354 match senders.entry(worker_key.clone()) {
355 Entry::Occupied(entry) => {
356 let _ = entry.get().send(Ok(response)).inspect_err(|err| {
357 warn_send_failure!(target.subscribe_type, &worker_key, err.as_report());
358 entry.remove_entry();
359 });
360 }
361 Entry::Vacant(_) => {
362 tracing::warn!("Failed to find notification sender of {:?}", worker_key)
363 }
364 }
365 } else {
366 senders.retain(|worker_key, sender| {
367 sender
368 .send(Ok(response.clone()))
369 .inspect_err(|err| {
370 warn_send_failure!(target.subscribe_type, &worker_key, err.as_report());
371 })
372 .is_ok()
373 });
374 }
375 }
376
377 fn senders_of(&mut self, subscribe_type: SubscribeType) -> &mut SenderMap {
378 match subscribe_type {
379 SubscribeType::Frontend => &mut self.frontend_senders,
380 SubscribeType::Hummock => &mut self.hummock_senders,
381 SubscribeType::Compactor => &mut self.compactor_senders,
382 SubscribeType::Compute => &mut self.compute_senders,
383 SubscribeType::Unspecified => unreachable!(),
384 }
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use risingwave_common::id::JobId;
391 use risingwave_pb::common::HostAddress;
392
393 use super::*;
394 use crate::manager::WorkerKey;
395
396 #[tokio::test]
397 async fn test_multiple_subscribers_one_worker() {
398 let mgr = NotificationManager::new(SqlMetaStore::for_test().await).await;
399 let worker_key1 = WorkerKey(HostAddress {
400 host: "a".to_owned(),
401 port: 1,
402 });
403 let worker_key2 = WorkerKey(HostAddress {
404 host: "a".to_owned(),
405 port: 2,
406 });
407 let (tx1, mut rx1) = mpsc::unbounded_channel();
408 let (tx2, mut rx2) = mpsc::unbounded_channel();
409 let (tx3, mut rx3) = mpsc::unbounded_channel();
410 mgr.insert_sender(SubscribeType::Hummock, worker_key1.clone(), tx1);
411 mgr.insert_sender(SubscribeType::Frontend, worker_key1.clone(), tx2);
412 mgr.insert_sender(SubscribeType::Frontend, worker_key2, tx3);
413 mgr.notify_snapshot(
414 worker_key1.clone(),
415 SubscribeType::Hummock,
416 MetaSnapshot::default(),
417 );
418 assert!(rx1.recv().await.is_some());
419 assert!(rx2.try_recv().is_err());
420 assert!(rx3.try_recv().is_err());
421
422 mgr.notify_frontend(Operation::Add, Info::Database(Default::default()))
423 .await;
424 assert!(rx1.try_recv().is_err());
425 assert!(rx2.recv().await.is_some());
426 assert!(rx3.recv().await.is_some());
427 }
428
429 #[tokio::test]
430 async fn test_local_notification_backfill_finished() {
431 let mgr = NotificationManager::new(SqlMetaStore::for_test().await).await;
432 let (tx, mut rx) = mpsc::unbounded_channel();
433 mgr.insert_local_sender(tx);
434
435 let job_id = JobId::new(42);
436 mgr.notify_local_subscribers(LocalNotification::StreamingJobBackfillFinished(job_id));
437
438 match rx.recv().await.expect("should receive notification") {
439 LocalNotification::StreamingJobBackfillFinished(received) => {
440 assert_eq!(received, job_id);
441 }
442 other => panic!("unexpected notification: {other:?}"),
443 }
444 }
445}