1use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::sync::Arc;
18
19use risingwave_common::system_param::reader::SystemParamsReader;
20use risingwave_meta_model::ObjectId;
21use risingwave_pb::common::{WorkerNode, WorkerType};
22use risingwave_pb::meta::object::PbObjectInfo;
23use risingwave_pb::meta::subscribe_response::{Info, Operation};
24use risingwave_pb::meta::{
25 MetaSnapshot, PbObject, PbObjectGroup, SubscribeResponse, SubscribeType,
26};
27use thiserror_ext::AsReport;
28use tokio::sync::Mutex;
29use tokio::sync::mpsc::{self, UnboundedSender};
30use tonic::Status;
31
32use crate::controller::SqlMetaStore;
33use crate::manager::WorkerKey;
34use crate::manager::notification_version::NotificationVersionGenerator;
35use crate::model::FragmentId;
36
37pub type MessageStatus = Status;
38pub type Notification = Result<SubscribeResponse, Status>;
39pub type NotificationManagerRef = Arc<NotificationManager>;
40pub type NotificationVersion = u64;
41pub const IGNORED_NOTIFICATION_VERSION: u64 = 0;
43
44#[derive(Clone, Debug)]
45pub enum LocalNotification {
46 WorkerNodeDeleted(WorkerNode),
47 WorkerNodeActivated(WorkerNode),
48 SystemParamsChange(SystemParamsReader),
49 BatchParallelismChange,
50 FragmentMappingsUpsert(Vec<FragmentId>),
51 FragmentMappingsDelete(Vec<FragmentId>),
52 SourceDropped(ObjectId),
53}
54
55#[derive(Debug)]
56struct Target {
57 subscribe_type: SubscribeType,
58 worker_key: Option<WorkerKey>,
60}
61
62impl From<SubscribeType> for Target {
63 fn from(value: SubscribeType) -> Self {
64 Self {
65 subscribe_type: value,
66 worker_key: None,
67 }
68 }
69}
70
71#[derive(Debug)]
72struct Task {
73 target: Target,
74 operation: Operation,
75 info: Info,
76 version: Option<NotificationVersion>,
77}
78
79pub struct NotificationManager {
81 core: Arc<parking_lot::Mutex<NotificationManagerCore>>,
82 task_tx: UnboundedSender<Task>,
84 version_generator: Mutex<NotificationVersionGenerator>,
86}
87
88impl NotificationManager {
89 pub async fn new(meta_store_impl: SqlMetaStore) -> Self {
90 let (task_tx, mut task_rx) = mpsc::unbounded_channel::<Task>();
92 let core = Arc::new(parking_lot::Mutex::new(NotificationManagerCore::new()));
93 let core_clone = core.clone();
94 let version_generator = NotificationVersionGenerator::new(meta_store_impl)
95 .await
96 .unwrap();
97
98 tokio::spawn(async move {
99 while let Some(task) = task_rx.recv().await {
100 let response = SubscribeResponse {
101 status: None,
102 operation: task.operation as i32,
103 info: Some(task.info),
104 version: task.version.unwrap_or_default(),
105 };
106 core.lock().notify(task.target, response);
107 }
108 });
109
110 Self {
111 core: core_clone,
112 task_tx,
113 version_generator: Mutex::new(version_generator),
114 }
115 }
116
117 pub fn abort_all(&self) {
118 let mut guard = self.core.lock();
119 *guard = NotificationManagerCore::new();
120 guard.exiting = true;
121 }
122
123 #[inline(always)]
124 fn notify(
125 &self,
126 target: Target,
127 operation: Operation,
128 info: Info,
129 version: Option<NotificationVersion>,
130 ) {
131 let task = Task {
132 target,
133 operation,
134 info,
135 version,
136 };
137 self.task_tx.send(task).unwrap();
138 }
139
140 async fn notify_with_version(
142 &self,
143 target: Target,
144 operation: Operation,
145 info: Info,
146 ) -> NotificationVersion {
147 let mut version_guard = self.version_generator.lock().await;
148 version_guard.increase_version().await;
149 let version = version_guard.current_version();
150 self.notify(target, operation, info, Some(version));
151 version
152 }
153
154 #[inline(always)]
156 fn notify_without_version(&self, target: Target, operation: Operation, info: Info) {
157 self.notify(target, operation, info, None);
158 }
159
160 pub fn notify_snapshot(
161 &self,
162 worker_key: WorkerKey,
163 subscribe_type: SubscribeType,
164 meta_snapshot: MetaSnapshot,
165 ) {
166 self.notify_without_version(
167 Target {
168 subscribe_type,
169 worker_key: Some(worker_key),
170 },
171 Operation::Snapshot,
172 Info::Snapshot(meta_snapshot),
173 )
174 }
175
176 pub fn notify_all_without_version(&self, operation: Operation, info: Info) {
177 for subscribe_type in [
178 SubscribeType::Frontend,
179 SubscribeType::Hummock,
180 SubscribeType::Compactor,
181 SubscribeType::Compute,
182 ] {
183 self.notify_without_version(subscribe_type.into(), operation, info.clone());
184 }
185 }
186
187 pub async fn notify_frontend(&self, operation: Operation, info: Info) -> NotificationVersion {
188 self.notify_with_version(SubscribeType::Frontend.into(), operation, info)
189 .await
190 }
191
192 pub async fn notify_frontend_object_info(
193 &self,
194 operation: Operation,
195 object_info: PbObjectInfo,
196 ) -> NotificationVersion {
197 self.notify_with_version(
198 SubscribeType::Frontend.into(),
199 operation,
200 Info::ObjectGroup(PbObjectGroup {
201 objects: vec![PbObject {
202 object_info: object_info.into(),
203 }],
204 }),
205 )
206 .await
207 }
208
209 pub async fn notify_hummock(&self, operation: Operation, info: Info) -> NotificationVersion {
210 self.notify_with_version(SubscribeType::Hummock.into(), operation, info)
211 .await
212 }
213
214 pub async fn notify_compactor(&self, operation: Operation, info: Info) -> NotificationVersion {
215 self.notify_with_version(SubscribeType::Compactor.into(), operation, info)
216 .await
217 }
218
219 pub fn notify_compute_without_version(&self, operation: Operation, info: Info) {
220 self.notify_without_version(SubscribeType::Compute.into(), operation, info)
221 }
222
223 pub fn notify_frontend_without_version(&self, operation: Operation, info: Info) {
224 self.notify_without_version(SubscribeType::Frontend.into(), operation, info)
225 }
226
227 pub fn notify_hummock_without_version(&self, operation: Operation, info: Info) {
228 self.notify_without_version(SubscribeType::Hummock.into(), operation, info)
229 }
230
231 pub fn notify_compactor_without_version(&self, operation: Operation, info: Info) {
232 self.notify_without_version(SubscribeType::Compactor.into(), operation, info)
233 }
234
235 #[cfg(any(test, feature = "test"))]
236 pub fn notify_hummock_with_version(
237 &self,
238 operation: Operation,
239 info: Info,
240 version: Option<NotificationVersion>,
241 ) {
242 self.notify(SubscribeType::Hummock.into(), operation, info, version)
243 }
244
245 pub fn notify_local_subscribers(&self, notification: LocalNotification) {
246 let mut core_guard = self.core.lock();
247 core_guard.local_senders.retain(|sender| {
248 if let Err(err) = sender.send(notification.clone()) {
249 tracing::warn!(error = %err.as_report(), "Failed to notify local subscriber");
250 return false;
251 }
252 true
253 });
254 }
255
256 pub fn delete_sender(&self, worker_type: WorkerType, worker_key: WorkerKey) {
258 let mut core_guard = self.core.lock();
259 match worker_type {
262 WorkerType::Frontend => core_guard.frontend_senders.remove(&worker_key),
263 WorkerType::ComputeNode | WorkerType::RiseCtl => {
264 core_guard.hummock_senders.remove(&worker_key)
265 }
266 WorkerType::Compactor => core_guard.compactor_senders.remove(&worker_key),
267 _ => unreachable!(),
268 };
269 }
270
271 pub fn insert_sender(
273 &self,
274 subscribe_type: SubscribeType,
275 worker_key: WorkerKey,
276 sender: UnboundedSender<Notification>,
277 ) {
278 let mut core_guard = self.core.lock();
279 if core_guard.exiting {
280 tracing::warn!("notification manager exiting.");
281 return;
282 }
283 let senders = core_guard.senders_of(subscribe_type);
284
285 senders.insert(worker_key, sender);
286 }
287
288 pub fn insert_local_sender(&self, sender: UnboundedSender<LocalNotification>) {
289 let mut core_guard = self.core.lock();
290 if core_guard.exiting {
291 tracing::warn!("notification manager exiting.");
292 return;
293 }
294 core_guard.local_senders.push(sender);
295 }
296
297 #[cfg(test)]
298 pub fn clear_local_sender(&self) {
299 self.core.lock().local_senders.clear();
300 }
301
302 pub async fn current_version(&self) -> NotificationVersion {
303 let version_guard = self.version_generator.lock().await;
304 version_guard.current_version()
305 }
306}
307
308type SenderMap = HashMap<WorkerKey, UnboundedSender<Notification>>;
309
310struct NotificationManagerCore {
311 frontend_senders: SenderMap,
313 hummock_senders: SenderMap,
315 compactor_senders: SenderMap,
317 compute_senders: HashMap<WorkerKey, UnboundedSender<Notification>>,
319 local_senders: Vec<UnboundedSender<LocalNotification>>,
321 exiting: bool,
322}
323
324impl NotificationManagerCore {
325 fn new() -> Self {
326 Self {
327 frontend_senders: HashMap::new(),
328 hummock_senders: HashMap::new(),
329 compactor_senders: HashMap::new(),
330 compute_senders: HashMap::new(),
331 local_senders: vec![],
332 exiting: false,
333 }
334 }
335
336 fn notify(&mut self, target: Target, response: SubscribeResponse) {
337 macro_rules! warn_send_failure {
338 ($subscribe_type:expr, $worker_key:expr, $err:expr) => {
339 tracing::warn!(
340 "Failed to notify {:?} {:?}: {}",
341 $subscribe_type,
342 $worker_key,
343 $err
344 );
345 };
346 }
347
348 let senders = self.senders_of(target.subscribe_type);
349
350 if let Some(worker_key) = target.worker_key {
351 match senders.entry(worker_key.clone()) {
352 Entry::Occupied(entry) => {
353 let _ = entry.get().send(Ok(response)).inspect_err(|err| {
354 warn_send_failure!(target.subscribe_type, &worker_key, err.as_report());
355 entry.remove_entry();
356 });
357 }
358 Entry::Vacant(_) => {
359 tracing::warn!("Failed to find notification sender of {:?}", worker_key)
360 }
361 }
362 } else {
363 senders.retain(|worker_key, sender| {
364 sender
365 .send(Ok(response.clone()))
366 .inspect_err(|err| {
367 warn_send_failure!(target.subscribe_type, &worker_key, err.as_report());
368 })
369 .is_ok()
370 });
371 }
372 }
373
374 fn senders_of(&mut self, subscribe_type: SubscribeType) -> &mut SenderMap {
375 match subscribe_type {
376 SubscribeType::Frontend => &mut self.frontend_senders,
377 SubscribeType::Hummock => &mut self.hummock_senders,
378 SubscribeType::Compactor => &mut self.compactor_senders,
379 SubscribeType::Compute => &mut self.compute_senders,
380 SubscribeType::Unspecified => unreachable!(),
381 }
382 }
383}
384
385#[cfg(test)]
386mod tests {
387 use risingwave_pb::common::HostAddress;
388
389 use super::*;
390 use crate::manager::WorkerKey;
391
392 #[tokio::test]
393 async fn test_multiple_subscribers_one_worker() {
394 let mgr = NotificationManager::new(SqlMetaStore::for_test().await).await;
395 let worker_key1 = WorkerKey(HostAddress {
396 host: "a".to_owned(),
397 port: 1,
398 });
399 let worker_key2 = WorkerKey(HostAddress {
400 host: "a".to_owned(),
401 port: 2,
402 });
403 let (tx1, mut rx1) = mpsc::unbounded_channel();
404 let (tx2, mut rx2) = mpsc::unbounded_channel();
405 let (tx3, mut rx3) = mpsc::unbounded_channel();
406 mgr.insert_sender(SubscribeType::Hummock, worker_key1.clone(), tx1);
407 mgr.insert_sender(SubscribeType::Frontend, worker_key1.clone(), tx2);
408 mgr.insert_sender(SubscribeType::Frontend, worker_key2, tx3);
409 mgr.notify_snapshot(
410 worker_key1.clone(),
411 SubscribeType::Hummock,
412 MetaSnapshot::default(),
413 );
414 assert!(rx1.recv().await.is_some());
415 assert!(rx2.try_recv().is_err());
416 assert!(rx3.try_recv().is_err());
417
418 mgr.notify_frontend(Operation::Add, Info::Database(Default::default()))
419 .await;
420 assert!(rx1.try_recv().is_err());
421 assert!(rx2.recv().await.is_some());
422 assert!(rx3.recv().await.is_some());
423 }
424}