risingwave_meta/manager/
notification.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::sync::Arc;
18
19use risingwave_common::system_param::reader::SystemParamsReader;
20use risingwave_pb::common::{WorkerNode, WorkerType};
21use risingwave_pb::meta::object::PbObjectInfo;
22use risingwave_pb::meta::subscribe_response::{Info, Operation};
23use risingwave_pb::meta::{
24    MetaSnapshot, PbObject, PbObjectGroup, SubscribeResponse, SubscribeType,
25};
26use thiserror_ext::AsReport;
27use tokio::sync::Mutex;
28use tokio::sync::mpsc::{self, UnboundedSender};
29use tonic::Status;
30
31use crate::controller::SqlMetaStore;
32use crate::manager::WorkerKey;
33use crate::manager::notification_version::NotificationVersionGenerator;
34use crate::model::FragmentId;
35
36pub type MessageStatus = Status;
37pub type Notification = Result<SubscribeResponse, Status>;
38pub type NotificationManagerRef = Arc<NotificationManager>;
39pub type NotificationVersion = u64;
40/// NOTE(kwannoel): This is just ignored, used in background DDL
41pub const IGNORED_NOTIFICATION_VERSION: u64 = 0;
42
43#[derive(Clone, Debug)]
44pub enum LocalNotification {
45    WorkerNodeDeleted(WorkerNode),
46    WorkerNodeActivated(WorkerNode),
47    SystemParamsChange(SystemParamsReader),
48    BatchParallelismChange,
49    FragmentMappingsUpsert(Vec<FragmentId>),
50    FragmentMappingsDelete(Vec<FragmentId>),
51}
52
53#[derive(Debug)]
54struct Target {
55    subscribe_type: SubscribeType,
56    // `None` indicates sending to all subscribers of `subscribe_type`.
57    worker_key: Option<WorkerKey>,
58}
59
60impl From<SubscribeType> for Target {
61    fn from(value: SubscribeType) -> Self {
62        Self {
63            subscribe_type: value,
64            worker_key: None,
65        }
66    }
67}
68
69#[derive(Debug)]
70struct Task {
71    target: Target,
72    operation: Operation,
73    info: Info,
74    version: Option<NotificationVersion>,
75}
76
77/// [`NotificationManager`] is used to send notification to frontends and compute nodes.
78pub struct NotificationManager {
79    core: Arc<Mutex<NotificationManagerCore>>,
80    /// Sender used to add a notification into the waiting queue.
81    task_tx: UnboundedSender<Task>,
82    /// The current notification version generator.
83    version_generator: Mutex<NotificationVersionGenerator>,
84}
85
86impl NotificationManager {
87    pub async fn new(meta_store_impl: SqlMetaStore) -> Self {
88        // notification waiting queue.
89        let (task_tx, mut task_rx) = mpsc::unbounded_channel::<Task>();
90        let core = Arc::new(Mutex::new(NotificationManagerCore::new()));
91        let core_clone = core.clone();
92        let version_generator = NotificationVersionGenerator::new(meta_store_impl)
93            .await
94            .unwrap();
95
96        tokio::spawn(async move {
97            while let Some(task) = task_rx.recv().await {
98                let response = SubscribeResponse {
99                    status: None,
100                    operation: task.operation as i32,
101                    info: Some(task.info),
102                    version: task.version.unwrap_or_default(),
103                };
104                core.lock().await.notify(task.target, response);
105            }
106        });
107
108        Self {
109            core: core_clone,
110            task_tx,
111            version_generator: Mutex::new(version_generator),
112        }
113    }
114
115    pub async fn abort_all(&self) {
116        let mut guard = self.core.lock().await;
117        *guard = NotificationManagerCore::new();
118        guard.exiting = true;
119    }
120
121    #[inline(always)]
122    fn notify(
123        &self,
124        target: Target,
125        operation: Operation,
126        info: Info,
127        version: Option<NotificationVersion>,
128    ) {
129        let task = Task {
130            target,
131            operation,
132            info,
133            version,
134        };
135        self.task_tx.send(task).unwrap();
136    }
137
138    /// Add a notification to the waiting queue and increase notification version.
139    async fn notify_with_version(
140        &self,
141        target: Target,
142        operation: Operation,
143        info: Info,
144    ) -> NotificationVersion {
145        let mut version_guard = self.version_generator.lock().await;
146        version_guard.increase_version().await;
147        let version = version_guard.current_version();
148        self.notify(target, operation, info, Some(version));
149        version
150    }
151
152    /// Add a notification to the waiting queue and return immediately
153    #[inline(always)]
154    fn notify_without_version(&self, target: Target, operation: Operation, info: Info) {
155        self.notify(target, operation, info, None);
156    }
157
158    pub fn notify_snapshot(
159        &self,
160        worker_key: WorkerKey,
161        subscribe_type: SubscribeType,
162        meta_snapshot: MetaSnapshot,
163    ) {
164        self.notify_without_version(
165            Target {
166                subscribe_type,
167                worker_key: Some(worker_key),
168            },
169            Operation::Snapshot,
170            Info::Snapshot(meta_snapshot),
171        )
172    }
173
174    pub fn notify_all_without_version(&self, operation: Operation, info: Info) {
175        for subscribe_type in [
176            SubscribeType::Frontend,
177            SubscribeType::Hummock,
178            SubscribeType::Compactor,
179            SubscribeType::Compute,
180        ] {
181            self.notify_without_version(subscribe_type.into(), operation, info.clone());
182        }
183    }
184
185    pub async fn notify_frontend(&self, operation: Operation, info: Info) -> NotificationVersion {
186        self.notify_with_version(SubscribeType::Frontend.into(), operation, info)
187            .await
188    }
189
190    pub async fn notify_frontend_object_info(
191        &self,
192        operation: Operation,
193        object_info: PbObjectInfo,
194    ) -> NotificationVersion {
195        self.notify_with_version(
196            SubscribeType::Frontend.into(),
197            operation,
198            Info::ObjectGroup(PbObjectGroup {
199                objects: vec![PbObject {
200                    object_info: object_info.into(),
201                }],
202            }),
203        )
204        .await
205    }
206
207    pub async fn notify_hummock(&self, operation: Operation, info: Info) -> NotificationVersion {
208        self.notify_with_version(SubscribeType::Hummock.into(), operation, info)
209            .await
210    }
211
212    pub async fn notify_hummock_object_info(
213        &self,
214        operation: Operation,
215        object_info: PbObjectInfo,
216    ) -> NotificationVersion {
217        self.notify_with_version(
218            SubscribeType::Hummock.into(),
219            operation,
220            Info::ObjectGroup(PbObjectGroup {
221                objects: vec![PbObject {
222                    object_info: object_info.into(),
223                }],
224            }),
225        )
226        .await
227    }
228
229    pub async fn notify_compactor(&self, operation: Operation, info: Info) -> NotificationVersion {
230        self.notify_with_version(SubscribeType::Compactor.into(), operation, info)
231            .await
232    }
233
234    pub async fn notify_compactor_object_info(
235        &self,
236        operation: Operation,
237        object_info: PbObjectInfo,
238    ) -> NotificationVersion {
239        self.notify_with_version(
240            SubscribeType::Compactor.into(),
241            operation,
242            Info::ObjectGroup(PbObjectGroup {
243                objects: vec![PbObject {
244                    object_info: object_info.into(),
245                }],
246            }),
247        )
248        .await
249    }
250
251    pub async fn notify_compute(&self, operation: Operation, info: Info) -> NotificationVersion {
252        self.notify_with_version(SubscribeType::Compute.into(), operation, info)
253            .await
254    }
255
256    pub fn notify_compute_without_version(&self, operation: Operation, info: Info) {
257        self.notify_without_version(SubscribeType::Compute.into(), operation, info)
258    }
259
260    pub fn notify_frontend_without_version(&self, operation: Operation, info: Info) {
261        self.notify_without_version(SubscribeType::Frontend.into(), operation, info)
262    }
263
264    pub fn notify_hummock_without_version(&self, operation: Operation, info: Info) {
265        self.notify_without_version(SubscribeType::Hummock.into(), operation, info)
266    }
267
268    pub fn notify_compactor_without_version(&self, operation: Operation, info: Info) {
269        self.notify_without_version(SubscribeType::Compactor.into(), operation, info)
270    }
271
272    #[cfg(any(test, feature = "test"))]
273    pub fn notify_hummock_with_version(
274        &self,
275        operation: Operation,
276        info: Info,
277        version: Option<NotificationVersion>,
278    ) {
279        self.notify(SubscribeType::Hummock.into(), operation, info, version)
280    }
281
282    pub async fn notify_local_subscribers(&self, notification: LocalNotification) {
283        let mut core_guard = self.core.lock().await;
284        core_guard.local_senders.retain(|sender| {
285            if let Err(err) = sender.send(notification.clone()) {
286                tracing::warn!(error = %err.as_report(), "Failed to notify local subscriber");
287                return false;
288            }
289            true
290        });
291    }
292
293    /// Tell `NotificationManagerCore` to delete sender.
294    pub async fn delete_sender(&self, worker_type: WorkerType, worker_key: WorkerKey) {
295        let mut core_guard = self.core.lock().await;
296        // TODO: we may avoid passing the worker_type and remove the `worker_key` in all sender
297        // holders anyway
298        match worker_type {
299            WorkerType::Frontend => core_guard.frontend_senders.remove(&worker_key),
300            WorkerType::ComputeNode | WorkerType::RiseCtl => {
301                core_guard.hummock_senders.remove(&worker_key)
302            }
303            WorkerType::Compactor => core_guard.compactor_senders.remove(&worker_key),
304            _ => unreachable!(),
305        };
306    }
307
308    /// Tell `NotificationManagerCore` to insert sender by `worker_type`.
309    pub async fn insert_sender(
310        &self,
311        subscribe_type: SubscribeType,
312        worker_key: WorkerKey,
313        sender: UnboundedSender<Notification>,
314    ) {
315        let mut core_guard = self.core.lock().await;
316        if core_guard.exiting {
317            tracing::warn!("notification manager exiting.");
318            return;
319        }
320        let senders = core_guard.senders_of(subscribe_type);
321
322        senders.insert(worker_key, sender);
323    }
324
325    pub async fn insert_local_sender(&self, sender: UnboundedSender<LocalNotification>) {
326        let mut core_guard = self.core.lock().await;
327        if core_guard.exiting {
328            tracing::warn!("notification manager exiting.");
329            return;
330        }
331        core_guard.local_senders.push(sender);
332    }
333
334    #[cfg(test)]
335    pub async fn clear_local_sender(&self) {
336        self.core.lock().await.local_senders.clear();
337    }
338
339    pub async fn current_version(&self) -> NotificationVersion {
340        let version_guard = self.version_generator.lock().await;
341        version_guard.current_version()
342    }
343}
344
345type SenderMap = HashMap<WorkerKey, UnboundedSender<Notification>>;
346
347struct NotificationManagerCore {
348    /// The notification sender to frontends.
349    frontend_senders: SenderMap,
350    /// The notification sender to nodes that subscribes the hummock.
351    hummock_senders: SenderMap,
352    /// The notification sender to compactor nodes.
353    compactor_senders: SenderMap,
354    /// The notification sender to compute nodes.
355    compute_senders: HashMap<WorkerKey, UnboundedSender<Notification>>,
356    /// The notification sender to local subscribers.
357    local_senders: Vec<UnboundedSender<LocalNotification>>,
358    exiting: bool,
359}
360
361impl NotificationManagerCore {
362    fn new() -> Self {
363        Self {
364            frontend_senders: HashMap::new(),
365            hummock_senders: HashMap::new(),
366            compactor_senders: HashMap::new(),
367            compute_senders: HashMap::new(),
368            local_senders: vec![],
369            exiting: false,
370        }
371    }
372
373    fn notify(&mut self, target: Target, response: SubscribeResponse) {
374        macro_rules! warn_send_failure {
375            ($subscribe_type:expr, $worker_key:expr, $err:expr) => {
376                tracing::warn!(
377                    "Failed to notify {:?} {:?}: {}",
378                    $subscribe_type,
379                    $worker_key,
380                    $err
381                );
382            };
383        }
384
385        let senders = self.senders_of(target.subscribe_type);
386
387        if let Some(worker_key) = target.worker_key {
388            match senders.entry(worker_key.clone()) {
389                Entry::Occupied(entry) => {
390                    let _ = entry.get().send(Ok(response)).inspect_err(|err| {
391                        warn_send_failure!(target.subscribe_type, &worker_key, err.as_report());
392                        entry.remove_entry();
393                    });
394                }
395                Entry::Vacant(_) => {
396                    tracing::warn!("Failed to find notification sender of {:?}", worker_key)
397                }
398            }
399        } else {
400            senders.retain(|worker_key, sender| {
401                sender
402                    .send(Ok(response.clone()))
403                    .inspect_err(|err| {
404                        warn_send_failure!(target.subscribe_type, &worker_key, err.as_report());
405                    })
406                    .is_ok()
407            });
408        }
409    }
410
411    fn senders_of(&mut self, subscribe_type: SubscribeType) -> &mut SenderMap {
412        match subscribe_type {
413            SubscribeType::Frontend => &mut self.frontend_senders,
414            SubscribeType::Hummock => &mut self.hummock_senders,
415            SubscribeType::Compactor => &mut self.compactor_senders,
416            SubscribeType::Compute => &mut self.compute_senders,
417            SubscribeType::Unspecified => unreachable!(),
418        }
419    }
420}
421
422#[cfg(test)]
423mod tests {
424    use risingwave_pb::common::HostAddress;
425
426    use super::*;
427    use crate::manager::WorkerKey;
428
429    #[tokio::test]
430    async fn test_multiple_subscribers_one_worker() {
431        let mgr = NotificationManager::new(SqlMetaStore::for_test().await).await;
432        let worker_key1 = WorkerKey(HostAddress {
433            host: "a".to_owned(),
434            port: 1,
435        });
436        let worker_key2 = WorkerKey(HostAddress {
437            host: "a".to_owned(),
438            port: 2,
439        });
440        let (tx1, mut rx1) = mpsc::unbounded_channel();
441        let (tx2, mut rx2) = mpsc::unbounded_channel();
442        let (tx3, mut rx3) = mpsc::unbounded_channel();
443        mgr.insert_sender(SubscribeType::Hummock, worker_key1.clone(), tx1)
444            .await;
445        mgr.insert_sender(SubscribeType::Frontend, worker_key1.clone(), tx2)
446            .await;
447        mgr.insert_sender(SubscribeType::Frontend, worker_key2, tx3)
448            .await;
449        mgr.notify_snapshot(
450            worker_key1.clone(),
451            SubscribeType::Hummock,
452            MetaSnapshot::default(),
453        );
454        assert!(rx1.recv().await.is_some());
455        assert!(rx2.try_recv().is_err());
456        assert!(rx3.try_recv().is_err());
457
458        mgr.notify_frontend(Operation::Add, Info::Database(Default::default()))
459            .await;
460        assert!(rx1.try_recv().is_err());
461        assert!(rx2.recv().await.is_some());
462        assert!(rx3.recv().await.is_some());
463    }
464}