1use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::sync::Arc;
18
19use risingwave_common::system_param::reader::SystemParamsReader;
20use risingwave_pb::common::{WorkerNode, WorkerType};
21use risingwave_pb::meta::object::PbObjectInfo;
22use risingwave_pb::meta::subscribe_response::{Info, Operation};
23use risingwave_pb::meta::{
24 MetaSnapshot, PbObject, PbObjectGroup, SubscribeResponse, SubscribeType,
25};
26use thiserror_ext::AsReport;
27use tokio::sync::Mutex;
28use tokio::sync::mpsc::{self, UnboundedSender};
29use tonic::Status;
30
31use crate::controller::SqlMetaStore;
32use crate::manager::WorkerKey;
33use crate::manager::notification_version::NotificationVersionGenerator;
34use crate::model::FragmentId;
35
36pub type MessageStatus = Status;
37pub type Notification = Result<SubscribeResponse, Status>;
38pub type NotificationManagerRef = Arc<NotificationManager>;
39pub type NotificationVersion = u64;
40pub const IGNORED_NOTIFICATION_VERSION: u64 = 0;
42
43#[derive(Clone, Debug)]
44pub enum LocalNotification {
45 WorkerNodeDeleted(WorkerNode),
46 WorkerNodeActivated(WorkerNode),
47 SystemParamsChange(SystemParamsReader),
48 FragmentMappingsUpsert(Vec<FragmentId>),
49 FragmentMappingsDelete(Vec<FragmentId>),
50}
51
52#[derive(Debug)]
53struct Target {
54 subscribe_type: SubscribeType,
55 worker_key: Option<WorkerKey>,
57}
58
59impl From<SubscribeType> for Target {
60 fn from(value: SubscribeType) -> Self {
61 Self {
62 subscribe_type: value,
63 worker_key: None,
64 }
65 }
66}
67
68#[derive(Debug)]
69struct Task {
70 target: Target,
71 operation: Operation,
72 info: Info,
73 version: Option<NotificationVersion>,
74}
75
76pub struct NotificationManager {
78 core: Arc<Mutex<NotificationManagerCore>>,
79 task_tx: UnboundedSender<Task>,
81 version_generator: Mutex<NotificationVersionGenerator>,
83}
84
85impl NotificationManager {
86 pub async fn new(meta_store_impl: SqlMetaStore) -> Self {
87 let (task_tx, mut task_rx) = mpsc::unbounded_channel::<Task>();
89 let core = Arc::new(Mutex::new(NotificationManagerCore::new()));
90 let core_clone = core.clone();
91 let version_generator = NotificationVersionGenerator::new(meta_store_impl)
92 .await
93 .unwrap();
94
95 tokio::spawn(async move {
96 while let Some(task) = task_rx.recv().await {
97 let response = SubscribeResponse {
98 status: None,
99 operation: task.operation as i32,
100 info: Some(task.info),
101 version: task.version.unwrap_or_default(),
102 };
103 core.lock().await.notify(task.target, response);
104 }
105 });
106
107 Self {
108 core: core_clone,
109 task_tx,
110 version_generator: Mutex::new(version_generator),
111 }
112 }
113
114 pub async fn abort_all(&self) {
115 let mut guard = self.core.lock().await;
116 *guard = NotificationManagerCore::new();
117 guard.exiting = true;
118 }
119
120 #[inline(always)]
121 fn notify(
122 &self,
123 target: Target,
124 operation: Operation,
125 info: Info,
126 version: Option<NotificationVersion>,
127 ) {
128 let task = Task {
129 target,
130 operation,
131 info,
132 version,
133 };
134 self.task_tx.send(task).unwrap();
135 }
136
137 async fn notify_with_version(
139 &self,
140 target: Target,
141 operation: Operation,
142 info: Info,
143 ) -> NotificationVersion {
144 let mut version_guard = self.version_generator.lock().await;
145 version_guard.increase_version().await;
146 let version = version_guard.current_version();
147 self.notify(target, operation, info, Some(version));
148 version
149 }
150
151 #[inline(always)]
153 fn notify_without_version(&self, target: Target, operation: Operation, info: Info) {
154 self.notify(target, operation, info, None);
155 }
156
157 pub fn notify_snapshot(
158 &self,
159 worker_key: WorkerKey,
160 subscribe_type: SubscribeType,
161 meta_snapshot: MetaSnapshot,
162 ) {
163 self.notify_without_version(
164 Target {
165 subscribe_type,
166 worker_key: Some(worker_key),
167 },
168 Operation::Snapshot,
169 Info::Snapshot(meta_snapshot),
170 )
171 }
172
173 pub fn notify_all_without_version(&self, operation: Operation, info: Info) {
174 for subscribe_type in [
175 SubscribeType::Frontend,
176 SubscribeType::Hummock,
177 SubscribeType::Compactor,
178 SubscribeType::Compute,
179 ] {
180 self.notify_without_version(subscribe_type.into(), operation, info.clone());
181 }
182 }
183
184 pub async fn notify_frontend(&self, operation: Operation, info: Info) -> NotificationVersion {
185 self.notify_with_version(SubscribeType::Frontend.into(), operation, info)
186 .await
187 }
188
189 pub async fn notify_frontend_object_info(
190 &self,
191 operation: Operation,
192 object_info: PbObjectInfo,
193 ) -> NotificationVersion {
194 self.notify_with_version(
195 SubscribeType::Frontend.into(),
196 operation,
197 Info::ObjectGroup(PbObjectGroup {
198 objects: vec![PbObject {
199 object_info: object_info.into(),
200 }],
201 }),
202 )
203 .await
204 }
205
206 pub async fn notify_hummock(&self, operation: Operation, info: Info) -> NotificationVersion {
207 self.notify_with_version(SubscribeType::Hummock.into(), operation, info)
208 .await
209 }
210
211 pub async fn notify_hummock_object_info(
212 &self,
213 operation: Operation,
214 object_info: PbObjectInfo,
215 ) -> NotificationVersion {
216 self.notify_with_version(
217 SubscribeType::Hummock.into(),
218 operation,
219 Info::ObjectGroup(PbObjectGroup {
220 objects: vec![PbObject {
221 object_info: object_info.into(),
222 }],
223 }),
224 )
225 .await
226 }
227
228 pub async fn notify_compactor(&self, operation: Operation, info: Info) -> NotificationVersion {
229 self.notify_with_version(SubscribeType::Compactor.into(), operation, info)
230 .await
231 }
232
233 pub async fn notify_compactor_object_info(
234 &self,
235 operation: Operation,
236 object_info: PbObjectInfo,
237 ) -> NotificationVersion {
238 self.notify_with_version(
239 SubscribeType::Compactor.into(),
240 operation,
241 Info::ObjectGroup(PbObjectGroup {
242 objects: vec![PbObject {
243 object_info: object_info.into(),
244 }],
245 }),
246 )
247 .await
248 }
249
250 pub async fn notify_compute(&self, operation: Operation, info: Info) -> NotificationVersion {
251 self.notify_with_version(SubscribeType::Compute.into(), operation, info)
252 .await
253 }
254
255 pub fn notify_compute_without_version(&self, operation: Operation, info: Info) {
256 self.notify_without_version(SubscribeType::Compute.into(), operation, info)
257 }
258
259 pub fn notify_frontend_without_version(&self, operation: Operation, info: Info) {
260 self.notify_without_version(SubscribeType::Frontend.into(), operation, info)
261 }
262
263 pub fn notify_hummock_without_version(&self, operation: Operation, info: Info) {
264 self.notify_without_version(SubscribeType::Hummock.into(), operation, info)
265 }
266
267 pub fn notify_compactor_without_version(&self, operation: Operation, info: Info) {
268 self.notify_without_version(SubscribeType::Compactor.into(), operation, info)
269 }
270
271 #[cfg(any(test, feature = "test"))]
272 pub fn notify_hummock_with_version(
273 &self,
274 operation: Operation,
275 info: Info,
276 version: Option<NotificationVersion>,
277 ) {
278 self.notify(SubscribeType::Hummock.into(), operation, info, version)
279 }
280
281 pub async fn notify_local_subscribers(&self, notification: LocalNotification) {
282 let mut core_guard = self.core.lock().await;
283 core_guard.local_senders.retain(|sender| {
284 if let Err(err) = sender.send(notification.clone()) {
285 tracing::warn!(error = %err.as_report(), "Failed to notify local subscriber");
286 return false;
287 }
288 true
289 });
290 }
291
292 pub async fn delete_sender(&self, worker_type: WorkerType, worker_key: WorkerKey) {
294 let mut core_guard = self.core.lock().await;
295 match worker_type {
298 WorkerType::Frontend => core_guard.frontend_senders.remove(&worker_key),
299 WorkerType::ComputeNode | WorkerType::RiseCtl => {
300 core_guard.hummock_senders.remove(&worker_key)
301 }
302 WorkerType::Compactor => core_guard.compactor_senders.remove(&worker_key),
303 _ => unreachable!(),
304 };
305 }
306
307 pub async fn insert_sender(
309 &self,
310 subscribe_type: SubscribeType,
311 worker_key: WorkerKey,
312 sender: UnboundedSender<Notification>,
313 ) {
314 let mut core_guard = self.core.lock().await;
315 if core_guard.exiting {
316 tracing::warn!("notification manager exiting.");
317 return;
318 }
319 let senders = core_guard.senders_of(subscribe_type);
320
321 senders.insert(worker_key, sender);
322 }
323
324 pub async fn insert_local_sender(&self, sender: UnboundedSender<LocalNotification>) {
325 let mut core_guard = self.core.lock().await;
326 if core_guard.exiting {
327 tracing::warn!("notification manager exiting.");
328 return;
329 }
330 core_guard.local_senders.push(sender);
331 }
332
333 #[cfg(test)]
334 pub async fn clear_local_sender(&self) {
335 self.core.lock().await.local_senders.clear();
336 }
337
338 pub async fn current_version(&self) -> NotificationVersion {
339 let version_guard = self.version_generator.lock().await;
340 version_guard.current_version()
341 }
342}
343
344type SenderMap = HashMap<WorkerKey, UnboundedSender<Notification>>;
345
346struct NotificationManagerCore {
347 frontend_senders: SenderMap,
349 hummock_senders: SenderMap,
351 compactor_senders: SenderMap,
353 compute_senders: HashMap<WorkerKey, UnboundedSender<Notification>>,
355 local_senders: Vec<UnboundedSender<LocalNotification>>,
357 exiting: bool,
358}
359
360impl NotificationManagerCore {
361 fn new() -> Self {
362 Self {
363 frontend_senders: HashMap::new(),
364 hummock_senders: HashMap::new(),
365 compactor_senders: HashMap::new(),
366 compute_senders: HashMap::new(),
367 local_senders: vec![],
368 exiting: false,
369 }
370 }
371
372 fn notify(&mut self, target: Target, response: SubscribeResponse) {
373 macro_rules! warn_send_failure {
374 ($subscribe_type:expr, $worker_key:expr, $err:expr) => {
375 tracing::warn!(
376 "Failed to notify {:?} {:?}: {}",
377 $subscribe_type,
378 $worker_key,
379 $err
380 );
381 };
382 }
383
384 let senders = self.senders_of(target.subscribe_type);
385
386 if let Some(worker_key) = target.worker_key {
387 match senders.entry(worker_key.clone()) {
388 Entry::Occupied(entry) => {
389 let _ = entry.get().send(Ok(response)).inspect_err(|err| {
390 warn_send_failure!(target.subscribe_type, &worker_key, err.as_report());
391 entry.remove_entry();
392 });
393 }
394 Entry::Vacant(_) => {
395 tracing::warn!("Failed to find notification sender of {:?}", worker_key)
396 }
397 }
398 } else {
399 senders.retain(|worker_key, sender| {
400 sender
401 .send(Ok(response.clone()))
402 .inspect_err(|err| {
403 warn_send_failure!(target.subscribe_type, &worker_key, err.as_report());
404 })
405 .is_ok()
406 });
407 }
408 }
409
410 fn senders_of(&mut self, subscribe_type: SubscribeType) -> &mut SenderMap {
411 match subscribe_type {
412 SubscribeType::Frontend => &mut self.frontend_senders,
413 SubscribeType::Hummock => &mut self.hummock_senders,
414 SubscribeType::Compactor => &mut self.compactor_senders,
415 SubscribeType::Compute => &mut self.compute_senders,
416 SubscribeType::Unspecified => unreachable!(),
417 }
418 }
419}
420
421#[cfg(test)]
422mod tests {
423 use risingwave_pb::common::HostAddress;
424
425 use super::*;
426 use crate::manager::WorkerKey;
427
428 #[tokio::test]
429 async fn test_multiple_subscribers_one_worker() {
430 let mgr = NotificationManager::new(SqlMetaStore::for_test().await).await;
431 let worker_key1 = WorkerKey(HostAddress {
432 host: "a".to_owned(),
433 port: 1,
434 });
435 let worker_key2 = WorkerKey(HostAddress {
436 host: "a".to_owned(),
437 port: 2,
438 });
439 let (tx1, mut rx1) = mpsc::unbounded_channel();
440 let (tx2, mut rx2) = mpsc::unbounded_channel();
441 let (tx3, mut rx3) = mpsc::unbounded_channel();
442 mgr.insert_sender(SubscribeType::Hummock, worker_key1.clone(), tx1)
443 .await;
444 mgr.insert_sender(SubscribeType::Frontend, worker_key1.clone(), tx2)
445 .await;
446 mgr.insert_sender(SubscribeType::Frontend, worker_key2, tx3)
447 .await;
448 mgr.notify_snapshot(
449 worker_key1.clone(),
450 SubscribeType::Hummock,
451 MetaSnapshot::default(),
452 );
453 assert!(rx1.recv().await.is_some());
454 assert!(rx2.try_recv().is_err());
455 assert!(rx3.try_recv().is_err());
456
457 mgr.notify_frontend(Operation::Add, Info::Database(Default::default()))
458 .await;
459 assert!(rx1.try_recv().is_err());
460 assert!(rx2.recv().await.is_some());
461 assert!(rx3.recv().await.is_some());
462 }
463}