1use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::sync::Arc;
18
19use risingwave_common::system_param::reader::SystemParamsReader;
20use risingwave_pb::common::{WorkerNode, WorkerType};
21use risingwave_pb::meta::object::PbObjectInfo;
22use risingwave_pb::meta::subscribe_response::{Info, Operation};
23use risingwave_pb::meta::{
24 MetaSnapshot, PbObject, PbObjectGroup, SubscribeResponse, SubscribeType,
25};
26use thiserror_ext::AsReport;
27use tokio::sync::Mutex;
28use tokio::sync::mpsc::{self, UnboundedSender};
29use tonic::Status;
30
31use crate::controller::SqlMetaStore;
32use crate::manager::WorkerKey;
33use crate::manager::notification_version::NotificationVersionGenerator;
34use crate::model::FragmentId;
35
36pub type MessageStatus = Status;
37pub type Notification = Result<SubscribeResponse, Status>;
38pub type NotificationManagerRef = Arc<NotificationManager>;
39pub type NotificationVersion = u64;
40pub const IGNORED_NOTIFICATION_VERSION: u64 = 0;
42
43#[derive(Clone, Debug)]
44pub enum LocalNotification {
45 WorkerNodeDeleted(WorkerNode),
46 WorkerNodeActivated(WorkerNode),
47 SystemParamsChange(SystemParamsReader),
48 BatchParallelismChange,
49 FragmentMappingsUpsert(Vec<FragmentId>),
50 FragmentMappingsDelete(Vec<FragmentId>),
51}
52
53#[derive(Debug)]
54struct Target {
55 subscribe_type: SubscribeType,
56 worker_key: Option<WorkerKey>,
58}
59
60impl From<SubscribeType> for Target {
61 fn from(value: SubscribeType) -> Self {
62 Self {
63 subscribe_type: value,
64 worker_key: None,
65 }
66 }
67}
68
69#[derive(Debug)]
70struct Task {
71 target: Target,
72 operation: Operation,
73 info: Info,
74 version: Option<NotificationVersion>,
75}
76
77pub struct NotificationManager {
79 core: Arc<Mutex<NotificationManagerCore>>,
80 task_tx: UnboundedSender<Task>,
82 version_generator: Mutex<NotificationVersionGenerator>,
84}
85
86impl NotificationManager {
87 pub async fn new(meta_store_impl: SqlMetaStore) -> Self {
88 let (task_tx, mut task_rx) = mpsc::unbounded_channel::<Task>();
90 let core = Arc::new(Mutex::new(NotificationManagerCore::new()));
91 let core_clone = core.clone();
92 let version_generator = NotificationVersionGenerator::new(meta_store_impl)
93 .await
94 .unwrap();
95
96 tokio::spawn(async move {
97 while let Some(task) = task_rx.recv().await {
98 let response = SubscribeResponse {
99 status: None,
100 operation: task.operation as i32,
101 info: Some(task.info),
102 version: task.version.unwrap_or_default(),
103 };
104 core.lock().await.notify(task.target, response);
105 }
106 });
107
108 Self {
109 core: core_clone,
110 task_tx,
111 version_generator: Mutex::new(version_generator),
112 }
113 }
114
115 pub async fn abort_all(&self) {
116 let mut guard = self.core.lock().await;
117 *guard = NotificationManagerCore::new();
118 guard.exiting = true;
119 }
120
121 #[inline(always)]
122 fn notify(
123 &self,
124 target: Target,
125 operation: Operation,
126 info: Info,
127 version: Option<NotificationVersion>,
128 ) {
129 let task = Task {
130 target,
131 operation,
132 info,
133 version,
134 };
135 self.task_tx.send(task).unwrap();
136 }
137
138 async fn notify_with_version(
140 &self,
141 target: Target,
142 operation: Operation,
143 info: Info,
144 ) -> NotificationVersion {
145 let mut version_guard = self.version_generator.lock().await;
146 version_guard.increase_version().await;
147 let version = version_guard.current_version();
148 self.notify(target, operation, info, Some(version));
149 version
150 }
151
152 #[inline(always)]
154 fn notify_without_version(&self, target: Target, operation: Operation, info: Info) {
155 self.notify(target, operation, info, None);
156 }
157
158 pub fn notify_snapshot(
159 &self,
160 worker_key: WorkerKey,
161 subscribe_type: SubscribeType,
162 meta_snapshot: MetaSnapshot,
163 ) {
164 self.notify_without_version(
165 Target {
166 subscribe_type,
167 worker_key: Some(worker_key),
168 },
169 Operation::Snapshot,
170 Info::Snapshot(meta_snapshot),
171 )
172 }
173
174 pub fn notify_all_without_version(&self, operation: Operation, info: Info) {
175 for subscribe_type in [
176 SubscribeType::Frontend,
177 SubscribeType::Hummock,
178 SubscribeType::Compactor,
179 SubscribeType::Compute,
180 ] {
181 self.notify_without_version(subscribe_type.into(), operation, info.clone());
182 }
183 }
184
185 pub async fn notify_frontend(&self, operation: Operation, info: Info) -> NotificationVersion {
186 self.notify_with_version(SubscribeType::Frontend.into(), operation, info)
187 .await
188 }
189
190 pub async fn notify_frontend_object_info(
191 &self,
192 operation: Operation,
193 object_info: PbObjectInfo,
194 ) -> NotificationVersion {
195 self.notify_with_version(
196 SubscribeType::Frontend.into(),
197 operation,
198 Info::ObjectGroup(PbObjectGroup {
199 objects: vec![PbObject {
200 object_info: object_info.into(),
201 }],
202 }),
203 )
204 .await
205 }
206
207 pub async fn notify_hummock(&self, operation: Operation, info: Info) -> NotificationVersion {
208 self.notify_with_version(SubscribeType::Hummock.into(), operation, info)
209 .await
210 }
211
212 pub async fn notify_hummock_object_info(
213 &self,
214 operation: Operation,
215 object_info: PbObjectInfo,
216 ) -> NotificationVersion {
217 self.notify_with_version(
218 SubscribeType::Hummock.into(),
219 operation,
220 Info::ObjectGroup(PbObjectGroup {
221 objects: vec![PbObject {
222 object_info: object_info.into(),
223 }],
224 }),
225 )
226 .await
227 }
228
229 pub async fn notify_compactor(&self, operation: Operation, info: Info) -> NotificationVersion {
230 self.notify_with_version(SubscribeType::Compactor.into(), operation, info)
231 .await
232 }
233
234 pub async fn notify_compactor_object_info(
235 &self,
236 operation: Operation,
237 object_info: PbObjectInfo,
238 ) -> NotificationVersion {
239 self.notify_with_version(
240 SubscribeType::Compactor.into(),
241 operation,
242 Info::ObjectGroup(PbObjectGroup {
243 objects: vec![PbObject {
244 object_info: object_info.into(),
245 }],
246 }),
247 )
248 .await
249 }
250
251 pub async fn notify_compute(&self, operation: Operation, info: Info) -> NotificationVersion {
252 self.notify_with_version(SubscribeType::Compute.into(), operation, info)
253 .await
254 }
255
256 pub fn notify_compute_without_version(&self, operation: Operation, info: Info) {
257 self.notify_without_version(SubscribeType::Compute.into(), operation, info)
258 }
259
260 pub fn notify_frontend_without_version(&self, operation: Operation, info: Info) {
261 self.notify_without_version(SubscribeType::Frontend.into(), operation, info)
262 }
263
264 pub fn notify_hummock_without_version(&self, operation: Operation, info: Info) {
265 self.notify_without_version(SubscribeType::Hummock.into(), operation, info)
266 }
267
268 pub fn notify_compactor_without_version(&self, operation: Operation, info: Info) {
269 self.notify_without_version(SubscribeType::Compactor.into(), operation, info)
270 }
271
272 #[cfg(any(test, feature = "test"))]
273 pub fn notify_hummock_with_version(
274 &self,
275 operation: Operation,
276 info: Info,
277 version: Option<NotificationVersion>,
278 ) {
279 self.notify(SubscribeType::Hummock.into(), operation, info, version)
280 }
281
282 pub async fn notify_local_subscribers(&self, notification: LocalNotification) {
283 let mut core_guard = self.core.lock().await;
284 core_guard.local_senders.retain(|sender| {
285 if let Err(err) = sender.send(notification.clone()) {
286 tracing::warn!(error = %err.as_report(), "Failed to notify local subscriber");
287 return false;
288 }
289 true
290 });
291 }
292
293 pub async fn delete_sender(&self, worker_type: WorkerType, worker_key: WorkerKey) {
295 let mut core_guard = self.core.lock().await;
296 match worker_type {
299 WorkerType::Frontend => core_guard.frontend_senders.remove(&worker_key),
300 WorkerType::ComputeNode | WorkerType::RiseCtl => {
301 core_guard.hummock_senders.remove(&worker_key)
302 }
303 WorkerType::Compactor => core_guard.compactor_senders.remove(&worker_key),
304 _ => unreachable!(),
305 };
306 }
307
308 pub async fn insert_sender(
310 &self,
311 subscribe_type: SubscribeType,
312 worker_key: WorkerKey,
313 sender: UnboundedSender<Notification>,
314 ) {
315 let mut core_guard = self.core.lock().await;
316 if core_guard.exiting {
317 tracing::warn!("notification manager exiting.");
318 return;
319 }
320 let senders = core_guard.senders_of(subscribe_type);
321
322 senders.insert(worker_key, sender);
323 }
324
325 pub async fn insert_local_sender(&self, sender: UnboundedSender<LocalNotification>) {
326 let mut core_guard = self.core.lock().await;
327 if core_guard.exiting {
328 tracing::warn!("notification manager exiting.");
329 return;
330 }
331 core_guard.local_senders.push(sender);
332 }
333
334 #[cfg(test)]
335 pub async fn clear_local_sender(&self) {
336 self.core.lock().await.local_senders.clear();
337 }
338
339 pub async fn current_version(&self) -> NotificationVersion {
340 let version_guard = self.version_generator.lock().await;
341 version_guard.current_version()
342 }
343}
344
345type SenderMap = HashMap<WorkerKey, UnboundedSender<Notification>>;
346
347struct NotificationManagerCore {
348 frontend_senders: SenderMap,
350 hummock_senders: SenderMap,
352 compactor_senders: SenderMap,
354 compute_senders: HashMap<WorkerKey, UnboundedSender<Notification>>,
356 local_senders: Vec<UnboundedSender<LocalNotification>>,
358 exiting: bool,
359}
360
361impl NotificationManagerCore {
362 fn new() -> Self {
363 Self {
364 frontend_senders: HashMap::new(),
365 hummock_senders: HashMap::new(),
366 compactor_senders: HashMap::new(),
367 compute_senders: HashMap::new(),
368 local_senders: vec![],
369 exiting: false,
370 }
371 }
372
373 fn notify(&mut self, target: Target, response: SubscribeResponse) {
374 macro_rules! warn_send_failure {
375 ($subscribe_type:expr, $worker_key:expr, $err:expr) => {
376 tracing::warn!(
377 "Failed to notify {:?} {:?}: {}",
378 $subscribe_type,
379 $worker_key,
380 $err
381 );
382 };
383 }
384
385 let senders = self.senders_of(target.subscribe_type);
386
387 if let Some(worker_key) = target.worker_key {
388 match senders.entry(worker_key.clone()) {
389 Entry::Occupied(entry) => {
390 let _ = entry.get().send(Ok(response)).inspect_err(|err| {
391 warn_send_failure!(target.subscribe_type, &worker_key, err.as_report());
392 entry.remove_entry();
393 });
394 }
395 Entry::Vacant(_) => {
396 tracing::warn!("Failed to find notification sender of {:?}", worker_key)
397 }
398 }
399 } else {
400 senders.retain(|worker_key, sender| {
401 sender
402 .send(Ok(response.clone()))
403 .inspect_err(|err| {
404 warn_send_failure!(target.subscribe_type, &worker_key, err.as_report());
405 })
406 .is_ok()
407 });
408 }
409 }
410
411 fn senders_of(&mut self, subscribe_type: SubscribeType) -> &mut SenderMap {
412 match subscribe_type {
413 SubscribeType::Frontend => &mut self.frontend_senders,
414 SubscribeType::Hummock => &mut self.hummock_senders,
415 SubscribeType::Compactor => &mut self.compactor_senders,
416 SubscribeType::Compute => &mut self.compute_senders,
417 SubscribeType::Unspecified => unreachable!(),
418 }
419 }
420}
421
422#[cfg(test)]
423mod tests {
424 use risingwave_pb::common::HostAddress;
425
426 use super::*;
427 use crate::manager::WorkerKey;
428
429 #[tokio::test]
430 async fn test_multiple_subscribers_one_worker() {
431 let mgr = NotificationManager::new(SqlMetaStore::for_test().await).await;
432 let worker_key1 = WorkerKey(HostAddress {
433 host: "a".to_owned(),
434 port: 1,
435 });
436 let worker_key2 = WorkerKey(HostAddress {
437 host: "a".to_owned(),
438 port: 2,
439 });
440 let (tx1, mut rx1) = mpsc::unbounded_channel();
441 let (tx2, mut rx2) = mpsc::unbounded_channel();
442 let (tx3, mut rx3) = mpsc::unbounded_channel();
443 mgr.insert_sender(SubscribeType::Hummock, worker_key1.clone(), tx1)
444 .await;
445 mgr.insert_sender(SubscribeType::Frontend, worker_key1.clone(), tx2)
446 .await;
447 mgr.insert_sender(SubscribeType::Frontend, worker_key2, tx3)
448 .await;
449 mgr.notify_snapshot(
450 worker_key1.clone(),
451 SubscribeType::Hummock,
452 MetaSnapshot::default(),
453 );
454 assert!(rx1.recv().await.is_some());
455 assert!(rx2.try_recv().is_err());
456 assert!(rx3.try_recv().is_err());
457
458 mgr.notify_frontend(Operation::Add, Info::Database(Default::default()))
459 .await;
460 assert!(rx1.try_recv().is_err());
461 assert!(rx2.recv().await.is_some());
462 assert!(rx3.recv().await.is_some());
463 }
464}