1use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::sync::Arc;
18
19use risingwave_common::system_param::reader::SystemParamsReader;
20use risingwave_pb::common::{WorkerNode, WorkerType};
21use risingwave_pb::meta::object::PbObjectInfo;
22use risingwave_pb::meta::subscribe_response::{Info, Operation};
23use risingwave_pb::meta::{
24 MetaSnapshot, PbObject, PbObjectGroup, SubscribeResponse, SubscribeType,
25};
26use thiserror_ext::AsReport;
27use tokio::sync::Mutex;
28use tokio::sync::mpsc::{self, UnboundedSender};
29use tonic::Status;
30
31use crate::controller::SqlMetaStore;
32use crate::manager::WorkerKey;
33use crate::manager::notification_version::NotificationVersionGenerator;
34use crate::model::FragmentId;
35
36pub type MessageStatus = Status;
37pub type Notification = Result<SubscribeResponse, Status>;
38pub type NotificationManagerRef = Arc<NotificationManager>;
39pub type NotificationVersion = u64;
40pub const IGNORED_NOTIFICATION_VERSION: u64 = 0;
42
43#[derive(Clone, Debug)]
44pub enum LocalNotification {
45 WorkerNodeDeleted(WorkerNode),
46 WorkerNodeActivated(WorkerNode),
47 SystemParamsChange(SystemParamsReader),
48 BatchParallelismChange,
49 FragmentMappingsUpsert(Vec<FragmentId>),
50 FragmentMappingsDelete(Vec<FragmentId>),
51}
52
53#[derive(Debug)]
54struct Target {
55 subscribe_type: SubscribeType,
56 worker_key: Option<WorkerKey>,
58}
59
60impl From<SubscribeType> for Target {
61 fn from(value: SubscribeType) -> Self {
62 Self {
63 subscribe_type: value,
64 worker_key: None,
65 }
66 }
67}
68
69#[derive(Debug)]
70struct Task {
71 target: Target,
72 operation: Operation,
73 info: Info,
74 version: Option<NotificationVersion>,
75}
76
77pub struct NotificationManager {
79 core: Arc<parking_lot::Mutex<NotificationManagerCore>>,
80 task_tx: UnboundedSender<Task>,
82 version_generator: Mutex<NotificationVersionGenerator>,
84}
85
86impl NotificationManager {
87 pub async fn new(meta_store_impl: SqlMetaStore) -> Self {
88 let (task_tx, mut task_rx) = mpsc::unbounded_channel::<Task>();
90 let core = Arc::new(parking_lot::Mutex::new(NotificationManagerCore::new()));
91 let core_clone = core.clone();
92 let version_generator = NotificationVersionGenerator::new(meta_store_impl)
93 .await
94 .unwrap();
95
96 tokio::spawn(async move {
97 while let Some(task) = task_rx.recv().await {
98 let response = SubscribeResponse {
99 status: None,
100 operation: task.operation as i32,
101 info: Some(task.info),
102 version: task.version.unwrap_or_default(),
103 };
104 core.lock().notify(task.target, response);
105 }
106 });
107
108 Self {
109 core: core_clone,
110 task_tx,
111 version_generator: Mutex::new(version_generator),
112 }
113 }
114
115 pub fn abort_all(&self) {
116 let mut guard = self.core.lock();
117 *guard = NotificationManagerCore::new();
118 guard.exiting = true;
119 }
120
121 #[inline(always)]
122 fn notify(
123 &self,
124 target: Target,
125 operation: Operation,
126 info: Info,
127 version: Option<NotificationVersion>,
128 ) {
129 let task = Task {
130 target,
131 operation,
132 info,
133 version,
134 };
135 self.task_tx.send(task).unwrap();
136 }
137
138 async fn notify_with_version(
140 &self,
141 target: Target,
142 operation: Operation,
143 info: Info,
144 ) -> NotificationVersion {
145 let mut version_guard = self.version_generator.lock().await;
146 version_guard.increase_version().await;
147 let version = version_guard.current_version();
148 self.notify(target, operation, info, Some(version));
149 version
150 }
151
152 #[inline(always)]
154 fn notify_without_version(&self, target: Target, operation: Operation, info: Info) {
155 self.notify(target, operation, info, None);
156 }
157
158 pub fn notify_snapshot(
159 &self,
160 worker_key: WorkerKey,
161 subscribe_type: SubscribeType,
162 meta_snapshot: MetaSnapshot,
163 ) {
164 self.notify_without_version(
165 Target {
166 subscribe_type,
167 worker_key: Some(worker_key),
168 },
169 Operation::Snapshot,
170 Info::Snapshot(meta_snapshot),
171 )
172 }
173
174 pub fn notify_all_without_version(&self, operation: Operation, info: Info) {
175 for subscribe_type in [
176 SubscribeType::Frontend,
177 SubscribeType::Hummock,
178 SubscribeType::Compactor,
179 SubscribeType::Compute,
180 ] {
181 self.notify_without_version(subscribe_type.into(), operation, info.clone());
182 }
183 }
184
185 pub async fn notify_frontend(&self, operation: Operation, info: Info) -> NotificationVersion {
186 self.notify_with_version(SubscribeType::Frontend.into(), operation, info)
187 .await
188 }
189
190 pub async fn notify_frontend_object_info(
191 &self,
192 operation: Operation,
193 object_info: PbObjectInfo,
194 ) -> NotificationVersion {
195 self.notify_with_version(
196 SubscribeType::Frontend.into(),
197 operation,
198 Info::ObjectGroup(PbObjectGroup {
199 objects: vec![PbObject {
200 object_info: object_info.into(),
201 }],
202 }),
203 )
204 .await
205 }
206
207 pub async fn notify_hummock(&self, operation: Operation, info: Info) -> NotificationVersion {
208 self.notify_with_version(SubscribeType::Hummock.into(), operation, info)
209 .await
210 }
211
212 pub async fn notify_compactor(&self, operation: Operation, info: Info) -> NotificationVersion {
213 self.notify_with_version(SubscribeType::Compactor.into(), operation, info)
214 .await
215 }
216
217 pub fn notify_compute_without_version(&self, operation: Operation, info: Info) {
218 self.notify_without_version(SubscribeType::Compute.into(), operation, info)
219 }
220
221 pub fn notify_frontend_without_version(&self, operation: Operation, info: Info) {
222 self.notify_without_version(SubscribeType::Frontend.into(), operation, info)
223 }
224
225 pub fn notify_hummock_without_version(&self, operation: Operation, info: Info) {
226 self.notify_without_version(SubscribeType::Hummock.into(), operation, info)
227 }
228
229 pub fn notify_compactor_without_version(&self, operation: Operation, info: Info) {
230 self.notify_without_version(SubscribeType::Compactor.into(), operation, info)
231 }
232
233 #[cfg(any(test, feature = "test"))]
234 pub fn notify_hummock_with_version(
235 &self,
236 operation: Operation,
237 info: Info,
238 version: Option<NotificationVersion>,
239 ) {
240 self.notify(SubscribeType::Hummock.into(), operation, info, version)
241 }
242
243 pub fn notify_local_subscribers(&self, notification: LocalNotification) {
244 let mut core_guard = self.core.lock();
245 core_guard.local_senders.retain(|sender| {
246 if let Err(err) = sender.send(notification.clone()) {
247 tracing::warn!(error = %err.as_report(), "Failed to notify local subscriber");
248 return false;
249 }
250 true
251 });
252 }
253
254 pub fn delete_sender(&self, worker_type: WorkerType, worker_key: WorkerKey) {
256 let mut core_guard = self.core.lock();
257 match worker_type {
260 WorkerType::Frontend => core_guard.frontend_senders.remove(&worker_key),
261 WorkerType::ComputeNode | WorkerType::RiseCtl => {
262 core_guard.hummock_senders.remove(&worker_key)
263 }
264 WorkerType::Compactor => core_guard.compactor_senders.remove(&worker_key),
265 _ => unreachable!(),
266 };
267 }
268
269 pub fn insert_sender(
271 &self,
272 subscribe_type: SubscribeType,
273 worker_key: WorkerKey,
274 sender: UnboundedSender<Notification>,
275 ) {
276 let mut core_guard = self.core.lock();
277 if core_guard.exiting {
278 tracing::warn!("notification manager exiting.");
279 return;
280 }
281 let senders = core_guard.senders_of(subscribe_type);
282
283 senders.insert(worker_key, sender);
284 }
285
286 pub fn insert_local_sender(&self, sender: UnboundedSender<LocalNotification>) {
287 let mut core_guard = self.core.lock();
288 if core_guard.exiting {
289 tracing::warn!("notification manager exiting.");
290 return;
291 }
292 core_guard.local_senders.push(sender);
293 }
294
295 #[cfg(test)]
296 pub fn clear_local_sender(&self) {
297 self.core.lock().local_senders.clear();
298 }
299
300 pub async fn current_version(&self) -> NotificationVersion {
301 let version_guard = self.version_generator.lock().await;
302 version_guard.current_version()
303 }
304}
305
306type SenderMap = HashMap<WorkerKey, UnboundedSender<Notification>>;
307
308struct NotificationManagerCore {
309 frontend_senders: SenderMap,
311 hummock_senders: SenderMap,
313 compactor_senders: SenderMap,
315 compute_senders: HashMap<WorkerKey, UnboundedSender<Notification>>,
317 local_senders: Vec<UnboundedSender<LocalNotification>>,
319 exiting: bool,
320}
321
322impl NotificationManagerCore {
323 fn new() -> Self {
324 Self {
325 frontend_senders: HashMap::new(),
326 hummock_senders: HashMap::new(),
327 compactor_senders: HashMap::new(),
328 compute_senders: HashMap::new(),
329 local_senders: vec![],
330 exiting: false,
331 }
332 }
333
334 fn notify(&mut self, target: Target, response: SubscribeResponse) {
335 macro_rules! warn_send_failure {
336 ($subscribe_type:expr, $worker_key:expr, $err:expr) => {
337 tracing::warn!(
338 "Failed to notify {:?} {:?}: {}",
339 $subscribe_type,
340 $worker_key,
341 $err
342 );
343 };
344 }
345
346 let senders = self.senders_of(target.subscribe_type);
347
348 if let Some(worker_key) = target.worker_key {
349 match senders.entry(worker_key.clone()) {
350 Entry::Occupied(entry) => {
351 let _ = entry.get().send(Ok(response)).inspect_err(|err| {
352 warn_send_failure!(target.subscribe_type, &worker_key, err.as_report());
353 entry.remove_entry();
354 });
355 }
356 Entry::Vacant(_) => {
357 tracing::warn!("Failed to find notification sender of {:?}", worker_key)
358 }
359 }
360 } else {
361 senders.retain(|worker_key, sender| {
362 sender
363 .send(Ok(response.clone()))
364 .inspect_err(|err| {
365 warn_send_failure!(target.subscribe_type, &worker_key, err.as_report());
366 })
367 .is_ok()
368 });
369 }
370 }
371
372 fn senders_of(&mut self, subscribe_type: SubscribeType) -> &mut SenderMap {
373 match subscribe_type {
374 SubscribeType::Frontend => &mut self.frontend_senders,
375 SubscribeType::Hummock => &mut self.hummock_senders,
376 SubscribeType::Compactor => &mut self.compactor_senders,
377 SubscribeType::Compute => &mut self.compute_senders,
378 SubscribeType::Unspecified => unreachable!(),
379 }
380 }
381}
382
383#[cfg(test)]
384mod tests {
385 use risingwave_pb::common::HostAddress;
386
387 use super::*;
388 use crate::manager::WorkerKey;
389
390 #[tokio::test]
391 async fn test_multiple_subscribers_one_worker() {
392 let mgr = NotificationManager::new(SqlMetaStore::for_test().await).await;
393 let worker_key1 = WorkerKey(HostAddress {
394 host: "a".to_owned(),
395 port: 1,
396 });
397 let worker_key2 = WorkerKey(HostAddress {
398 host: "a".to_owned(),
399 port: 2,
400 });
401 let (tx1, mut rx1) = mpsc::unbounded_channel();
402 let (tx2, mut rx2) = mpsc::unbounded_channel();
403 let (tx3, mut rx3) = mpsc::unbounded_channel();
404 mgr.insert_sender(SubscribeType::Hummock, worker_key1.clone(), tx1);
405 mgr.insert_sender(SubscribeType::Frontend, worker_key1.clone(), tx2);
406 mgr.insert_sender(SubscribeType::Frontend, worker_key2, tx3);
407 mgr.notify_snapshot(
408 worker_key1.clone(),
409 SubscribeType::Hummock,
410 MetaSnapshot::default(),
411 );
412 assert!(rx1.recv().await.is_some());
413 assert!(rx2.try_recv().is_err());
414 assert!(rx3.try_recv().is_err());
415
416 mgr.notify_frontend(Operation::Add, Info::Database(Default::default()))
417 .await;
418 assert!(rx1.try_recv().is_err());
419 assert!(rx2.recv().await.is_some());
420 assert!(rx3.recv().await.is_some());
421 }
422}