risingwave_meta/manager/
notification.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::sync::Arc;
18
19use risingwave_common::system_param::reader::SystemParamsReader;
20use risingwave_pb::common::{WorkerNode, WorkerType};
21use risingwave_pb::meta::object::PbObjectInfo;
22use risingwave_pb::meta::subscribe_response::{Info, Operation};
23use risingwave_pb::meta::{
24    MetaSnapshot, PbObject, PbObjectGroup, SubscribeResponse, SubscribeType,
25};
26use thiserror_ext::AsReport;
27use tokio::sync::Mutex;
28use tokio::sync::mpsc::{self, UnboundedSender};
29use tonic::Status;
30
31use crate::controller::SqlMetaStore;
32use crate::manager::WorkerKey;
33use crate::manager::notification_version::NotificationVersionGenerator;
34use crate::model::FragmentId;
35
36pub type MessageStatus = Status;
37pub type Notification = Result<SubscribeResponse, Status>;
38pub type NotificationManagerRef = Arc<NotificationManager>;
39pub type NotificationVersion = u64;
40/// NOTE(kwannoel): This is just ignored, used in background DDL
41pub const IGNORED_NOTIFICATION_VERSION: u64 = 0;
42
43#[derive(Clone, Debug)]
44pub enum LocalNotification {
45    WorkerNodeDeleted(WorkerNode),
46    WorkerNodeActivated(WorkerNode),
47    SystemParamsChange(SystemParamsReader),
48    FragmentMappingsUpsert(Vec<FragmentId>),
49    FragmentMappingsDelete(Vec<FragmentId>),
50}
51
52#[derive(Debug)]
53struct Target {
54    subscribe_type: SubscribeType,
55    // `None` indicates sending to all subscribers of `subscribe_type`.
56    worker_key: Option<WorkerKey>,
57}
58
59impl From<SubscribeType> for Target {
60    fn from(value: SubscribeType) -> Self {
61        Self {
62            subscribe_type: value,
63            worker_key: None,
64        }
65    }
66}
67
68#[derive(Debug)]
69struct Task {
70    target: Target,
71    operation: Operation,
72    info: Info,
73    version: Option<NotificationVersion>,
74}
75
76/// [`NotificationManager`] is used to send notification to frontends and compute nodes.
77pub struct NotificationManager {
78    core: Arc<Mutex<NotificationManagerCore>>,
79    /// Sender used to add a notification into the waiting queue.
80    task_tx: UnboundedSender<Task>,
81    /// The current notification version generator.
82    version_generator: Mutex<NotificationVersionGenerator>,
83}
84
85impl NotificationManager {
86    pub async fn new(meta_store_impl: SqlMetaStore) -> Self {
87        // notification waiting queue.
88        let (task_tx, mut task_rx) = mpsc::unbounded_channel::<Task>();
89        let core = Arc::new(Mutex::new(NotificationManagerCore::new()));
90        let core_clone = core.clone();
91        let version_generator = NotificationVersionGenerator::new(meta_store_impl)
92            .await
93            .unwrap();
94
95        tokio::spawn(async move {
96            while let Some(task) = task_rx.recv().await {
97                let response = SubscribeResponse {
98                    status: None,
99                    operation: task.operation as i32,
100                    info: Some(task.info),
101                    version: task.version.unwrap_or_default(),
102                };
103                core.lock().await.notify(task.target, response);
104            }
105        });
106
107        Self {
108            core: core_clone,
109            task_tx,
110            version_generator: Mutex::new(version_generator),
111        }
112    }
113
114    pub async fn abort_all(&self) {
115        let mut guard = self.core.lock().await;
116        *guard = NotificationManagerCore::new();
117        guard.exiting = true;
118    }
119
120    #[inline(always)]
121    fn notify(
122        &self,
123        target: Target,
124        operation: Operation,
125        info: Info,
126        version: Option<NotificationVersion>,
127    ) {
128        let task = Task {
129            target,
130            operation,
131            info,
132            version,
133        };
134        self.task_tx.send(task).unwrap();
135    }
136
137    /// Add a notification to the waiting queue and increase notification version.
138    async fn notify_with_version(
139        &self,
140        target: Target,
141        operation: Operation,
142        info: Info,
143    ) -> NotificationVersion {
144        let mut version_guard = self.version_generator.lock().await;
145        version_guard.increase_version().await;
146        let version = version_guard.current_version();
147        self.notify(target, operation, info, Some(version));
148        version
149    }
150
151    /// Add a notification to the waiting queue and return immediately
152    #[inline(always)]
153    fn notify_without_version(&self, target: Target, operation: Operation, info: Info) {
154        self.notify(target, operation, info, None);
155    }
156
157    pub fn notify_snapshot(
158        &self,
159        worker_key: WorkerKey,
160        subscribe_type: SubscribeType,
161        meta_snapshot: MetaSnapshot,
162    ) {
163        self.notify_without_version(
164            Target {
165                subscribe_type,
166                worker_key: Some(worker_key),
167            },
168            Operation::Snapshot,
169            Info::Snapshot(meta_snapshot),
170        )
171    }
172
173    pub fn notify_all_without_version(&self, operation: Operation, info: Info) {
174        for subscribe_type in [
175            SubscribeType::Frontend,
176            SubscribeType::Hummock,
177            SubscribeType::Compactor,
178            SubscribeType::Compute,
179        ] {
180            self.notify_without_version(subscribe_type.into(), operation, info.clone());
181        }
182    }
183
184    pub async fn notify_frontend(&self, operation: Operation, info: Info) -> NotificationVersion {
185        self.notify_with_version(SubscribeType::Frontend.into(), operation, info)
186            .await
187    }
188
189    pub async fn notify_frontend_object_info(
190        &self,
191        operation: Operation,
192        object_info: PbObjectInfo,
193    ) -> NotificationVersion {
194        self.notify_with_version(
195            SubscribeType::Frontend.into(),
196            operation,
197            Info::ObjectGroup(PbObjectGroup {
198                objects: vec![PbObject {
199                    object_info: object_info.into(),
200                }],
201            }),
202        )
203        .await
204    }
205
206    pub async fn notify_hummock(&self, operation: Operation, info: Info) -> NotificationVersion {
207        self.notify_with_version(SubscribeType::Hummock.into(), operation, info)
208            .await
209    }
210
211    pub async fn notify_hummock_object_info(
212        &self,
213        operation: Operation,
214        object_info: PbObjectInfo,
215    ) -> NotificationVersion {
216        self.notify_with_version(
217            SubscribeType::Hummock.into(),
218            operation,
219            Info::ObjectGroup(PbObjectGroup {
220                objects: vec![PbObject {
221                    object_info: object_info.into(),
222                }],
223            }),
224        )
225        .await
226    }
227
228    pub async fn notify_compactor(&self, operation: Operation, info: Info) -> NotificationVersion {
229        self.notify_with_version(SubscribeType::Compactor.into(), operation, info)
230            .await
231    }
232
233    pub async fn notify_compactor_object_info(
234        &self,
235        operation: Operation,
236        object_info: PbObjectInfo,
237    ) -> NotificationVersion {
238        self.notify_with_version(
239            SubscribeType::Compactor.into(),
240            operation,
241            Info::ObjectGroup(PbObjectGroup {
242                objects: vec![PbObject {
243                    object_info: object_info.into(),
244                }],
245            }),
246        )
247        .await
248    }
249
250    pub async fn notify_compute(&self, operation: Operation, info: Info) -> NotificationVersion {
251        self.notify_with_version(SubscribeType::Compute.into(), operation, info)
252            .await
253    }
254
255    pub fn notify_compute_without_version(&self, operation: Operation, info: Info) {
256        self.notify_without_version(SubscribeType::Compute.into(), operation, info)
257    }
258
259    pub fn notify_frontend_without_version(&self, operation: Operation, info: Info) {
260        self.notify_without_version(SubscribeType::Frontend.into(), operation, info)
261    }
262
263    pub fn notify_hummock_without_version(&self, operation: Operation, info: Info) {
264        self.notify_without_version(SubscribeType::Hummock.into(), operation, info)
265    }
266
267    pub fn notify_compactor_without_version(&self, operation: Operation, info: Info) {
268        self.notify_without_version(SubscribeType::Compactor.into(), operation, info)
269    }
270
271    #[cfg(any(test, feature = "test"))]
272    pub fn notify_hummock_with_version(
273        &self,
274        operation: Operation,
275        info: Info,
276        version: Option<NotificationVersion>,
277    ) {
278        self.notify(SubscribeType::Hummock.into(), operation, info, version)
279    }
280
281    pub async fn notify_local_subscribers(&self, notification: LocalNotification) {
282        let mut core_guard = self.core.lock().await;
283        core_guard.local_senders.retain(|sender| {
284            if let Err(err) = sender.send(notification.clone()) {
285                tracing::warn!(error = %err.as_report(), "Failed to notify local subscriber");
286                return false;
287            }
288            true
289        });
290    }
291
292    /// Tell `NotificationManagerCore` to delete sender.
293    pub async fn delete_sender(&self, worker_type: WorkerType, worker_key: WorkerKey) {
294        let mut core_guard = self.core.lock().await;
295        // TODO: we may avoid passing the worker_type and remove the `worker_key` in all sender
296        // holders anyway
297        match worker_type {
298            WorkerType::Frontend => core_guard.frontend_senders.remove(&worker_key),
299            WorkerType::ComputeNode | WorkerType::RiseCtl => {
300                core_guard.hummock_senders.remove(&worker_key)
301            }
302            WorkerType::Compactor => core_guard.compactor_senders.remove(&worker_key),
303            _ => unreachable!(),
304        };
305    }
306
307    /// Tell `NotificationManagerCore` to insert sender by `worker_type`.
308    pub async fn insert_sender(
309        &self,
310        subscribe_type: SubscribeType,
311        worker_key: WorkerKey,
312        sender: UnboundedSender<Notification>,
313    ) {
314        let mut core_guard = self.core.lock().await;
315        if core_guard.exiting {
316            tracing::warn!("notification manager exiting.");
317            return;
318        }
319        let senders = core_guard.senders_of(subscribe_type);
320
321        senders.insert(worker_key, sender);
322    }
323
324    pub async fn insert_local_sender(&self, sender: UnboundedSender<LocalNotification>) {
325        let mut core_guard = self.core.lock().await;
326        if core_guard.exiting {
327            tracing::warn!("notification manager exiting.");
328            return;
329        }
330        core_guard.local_senders.push(sender);
331    }
332
333    #[cfg(test)]
334    pub async fn clear_local_sender(&self) {
335        self.core.lock().await.local_senders.clear();
336    }
337
338    pub async fn current_version(&self) -> NotificationVersion {
339        let version_guard = self.version_generator.lock().await;
340        version_guard.current_version()
341    }
342}
343
344type SenderMap = HashMap<WorkerKey, UnboundedSender<Notification>>;
345
346struct NotificationManagerCore {
347    /// The notification sender to frontends.
348    frontend_senders: SenderMap,
349    /// The notification sender to nodes that subscribes the hummock.
350    hummock_senders: SenderMap,
351    /// The notification sender to compactor nodes.
352    compactor_senders: SenderMap,
353    /// The notification sender to compute nodes.
354    compute_senders: HashMap<WorkerKey, UnboundedSender<Notification>>,
355    /// The notification sender to local subscribers.
356    local_senders: Vec<UnboundedSender<LocalNotification>>,
357    exiting: bool,
358}
359
360impl NotificationManagerCore {
361    fn new() -> Self {
362        Self {
363            frontend_senders: HashMap::new(),
364            hummock_senders: HashMap::new(),
365            compactor_senders: HashMap::new(),
366            compute_senders: HashMap::new(),
367            local_senders: vec![],
368            exiting: false,
369        }
370    }
371
372    fn notify(&mut self, target: Target, response: SubscribeResponse) {
373        macro_rules! warn_send_failure {
374            ($subscribe_type:expr, $worker_key:expr, $err:expr) => {
375                tracing::warn!(
376                    "Failed to notify {:?} {:?}: {}",
377                    $subscribe_type,
378                    $worker_key,
379                    $err
380                );
381            };
382        }
383
384        let senders = self.senders_of(target.subscribe_type);
385
386        if let Some(worker_key) = target.worker_key {
387            match senders.entry(worker_key.clone()) {
388                Entry::Occupied(entry) => {
389                    let _ = entry.get().send(Ok(response)).inspect_err(|err| {
390                        warn_send_failure!(target.subscribe_type, &worker_key, err.as_report());
391                        entry.remove_entry();
392                    });
393                }
394                Entry::Vacant(_) => {
395                    tracing::warn!("Failed to find notification sender of {:?}", worker_key)
396                }
397            }
398        } else {
399            senders.retain(|worker_key, sender| {
400                sender
401                    .send(Ok(response.clone()))
402                    .inspect_err(|err| {
403                        warn_send_failure!(target.subscribe_type, &worker_key, err.as_report());
404                    })
405                    .is_ok()
406            });
407        }
408    }
409
410    fn senders_of(&mut self, subscribe_type: SubscribeType) -> &mut SenderMap {
411        match subscribe_type {
412            SubscribeType::Frontend => &mut self.frontend_senders,
413            SubscribeType::Hummock => &mut self.hummock_senders,
414            SubscribeType::Compactor => &mut self.compactor_senders,
415            SubscribeType::Compute => &mut self.compute_senders,
416            SubscribeType::Unspecified => unreachable!(),
417        }
418    }
419}
420
421#[cfg(test)]
422mod tests {
423    use risingwave_pb::common::HostAddress;
424
425    use super::*;
426    use crate::manager::WorkerKey;
427
428    #[tokio::test]
429    async fn test_multiple_subscribers_one_worker() {
430        let mgr = NotificationManager::new(SqlMetaStore::for_test().await).await;
431        let worker_key1 = WorkerKey(HostAddress {
432            host: "a".to_owned(),
433            port: 1,
434        });
435        let worker_key2 = WorkerKey(HostAddress {
436            host: "a".to_owned(),
437            port: 2,
438        });
439        let (tx1, mut rx1) = mpsc::unbounded_channel();
440        let (tx2, mut rx2) = mpsc::unbounded_channel();
441        let (tx3, mut rx3) = mpsc::unbounded_channel();
442        mgr.insert_sender(SubscribeType::Hummock, worker_key1.clone(), tx1)
443            .await;
444        mgr.insert_sender(SubscribeType::Frontend, worker_key1.clone(), tx2)
445            .await;
446        mgr.insert_sender(SubscribeType::Frontend, worker_key2, tx3)
447            .await;
448        mgr.notify_snapshot(
449            worker_key1.clone(),
450            SubscribeType::Hummock,
451            MetaSnapshot::default(),
452        );
453        assert!(rx1.recv().await.is_some());
454        assert!(rx2.try_recv().is_err());
455        assert!(rx3.try_recv().is_err());
456
457        mgr.notify_frontend(Operation::Add, Info::Database(Default::default()))
458            .await;
459        assert!(rx1.try_recv().is_err());
460        assert!(rx2.recv().await.is_some());
461        assert!(rx3.recv().await.is_some());
462    }
463}