risingwave_meta/manager/
notification.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::sync::Arc;
18
19use risingwave_common::system_param::reader::SystemParamsReader;
20use risingwave_meta_model::ObjectId;
21use risingwave_pb::common::{WorkerNode, WorkerType};
22use risingwave_pb::meta::object::PbObjectInfo;
23use risingwave_pb::meta::subscribe_response::{Info, Operation};
24use risingwave_pb::meta::{
25    MetaSnapshot, PbObject, PbObjectGroup, SubscribeResponse, SubscribeType,
26};
27use thiserror_ext::AsReport;
28use tokio::sync::Mutex;
29use tokio::sync::mpsc::{self, UnboundedSender};
30use tonic::Status;
31
32use crate::controller::SqlMetaStore;
33use crate::manager::WorkerKey;
34use crate::manager::notification_version::NotificationVersionGenerator;
35use crate::model::FragmentId;
36
37pub type MessageStatus = Status;
38pub type Notification = Result<SubscribeResponse, Status>;
39pub type NotificationManagerRef = Arc<NotificationManager>;
40pub type NotificationVersion = u64;
41/// NOTE(kwannoel): This is just ignored, used in background DDL
42pub const IGNORED_NOTIFICATION_VERSION: u64 = 0;
43
44#[derive(Clone, Debug)]
45pub enum LocalNotification {
46    WorkerNodeDeleted(WorkerNode),
47    WorkerNodeActivated(WorkerNode),
48    SystemParamsChange(SystemParamsReader),
49    BatchParallelismChange,
50    FragmentMappingsUpsert(Vec<FragmentId>),
51    FragmentMappingsDelete(Vec<FragmentId>),
52    SourceDropped(ObjectId),
53}
54
55#[derive(Debug)]
56struct Target {
57    subscribe_type: SubscribeType,
58    // `None` indicates sending to all subscribers of `subscribe_type`.
59    worker_key: Option<WorkerKey>,
60}
61
62impl From<SubscribeType> for Target {
63    fn from(value: SubscribeType) -> Self {
64        Self {
65            subscribe_type: value,
66            worker_key: None,
67        }
68    }
69}
70
71#[derive(Debug)]
72struct Task {
73    target: Target,
74    operation: Operation,
75    info: Info,
76    version: Option<NotificationVersion>,
77}
78
79/// [`NotificationManager`] is used to send notification to frontends and compute nodes.
80pub struct NotificationManager {
81    core: Arc<parking_lot::Mutex<NotificationManagerCore>>,
82    /// Sender used to add a notification into the waiting queue.
83    task_tx: UnboundedSender<Task>,
84    /// The current notification version generator.
85    version_generator: Mutex<NotificationVersionGenerator>,
86}
87
88impl NotificationManager {
89    pub async fn new(meta_store_impl: SqlMetaStore) -> Self {
90        // notification waiting queue.
91        let (task_tx, mut task_rx) = mpsc::unbounded_channel::<Task>();
92        let core = Arc::new(parking_lot::Mutex::new(NotificationManagerCore::new()));
93        let core_clone = core.clone();
94        let version_generator = NotificationVersionGenerator::new(meta_store_impl)
95            .await
96            .unwrap();
97
98        tokio::spawn(async move {
99            while let Some(task) = task_rx.recv().await {
100                let response = SubscribeResponse {
101                    status: None,
102                    operation: task.operation as i32,
103                    info: Some(task.info),
104                    version: task.version.unwrap_or_default(),
105                };
106                core.lock().notify(task.target, response);
107            }
108        });
109
110        Self {
111            core: core_clone,
112            task_tx,
113            version_generator: Mutex::new(version_generator),
114        }
115    }
116
117    pub fn abort_all(&self) {
118        let mut guard = self.core.lock();
119        *guard = NotificationManagerCore::new();
120        guard.exiting = true;
121    }
122
123    #[inline(always)]
124    fn notify(
125        &self,
126        target: Target,
127        operation: Operation,
128        info: Info,
129        version: Option<NotificationVersion>,
130    ) {
131        let task = Task {
132            target,
133            operation,
134            info,
135            version,
136        };
137        self.task_tx.send(task).unwrap();
138    }
139
140    /// Add a notification to the waiting queue and increase notification version.
141    async fn notify_with_version(
142        &self,
143        target: Target,
144        operation: Operation,
145        info: Info,
146    ) -> NotificationVersion {
147        let mut version_guard = self.version_generator.lock().await;
148        version_guard.increase_version().await;
149        let version = version_guard.current_version();
150        self.notify(target, operation, info, Some(version));
151        version
152    }
153
154    /// Add a notification to the waiting queue and return immediately
155    #[inline(always)]
156    fn notify_without_version(&self, target: Target, operation: Operation, info: Info) {
157        self.notify(target, operation, info, None);
158    }
159
160    pub fn notify_snapshot(
161        &self,
162        worker_key: WorkerKey,
163        subscribe_type: SubscribeType,
164        meta_snapshot: MetaSnapshot,
165    ) {
166        self.notify_without_version(
167            Target {
168                subscribe_type,
169                worker_key: Some(worker_key),
170            },
171            Operation::Snapshot,
172            Info::Snapshot(meta_snapshot),
173        )
174    }
175
176    pub fn notify_all_without_version(&self, operation: Operation, info: Info) {
177        for subscribe_type in [
178            SubscribeType::Frontend,
179            SubscribeType::Hummock,
180            SubscribeType::Compactor,
181            SubscribeType::Compute,
182        ] {
183            self.notify_without_version(subscribe_type.into(), operation, info.clone());
184        }
185    }
186
187    pub async fn notify_frontend(&self, operation: Operation, info: Info) -> NotificationVersion {
188        self.notify_with_version(SubscribeType::Frontend.into(), operation, info)
189            .await
190    }
191
192    pub async fn notify_frontend_object_info(
193        &self,
194        operation: Operation,
195        object_info: PbObjectInfo,
196    ) -> NotificationVersion {
197        self.notify_with_version(
198            SubscribeType::Frontend.into(),
199            operation,
200            Info::ObjectGroup(PbObjectGroup {
201                objects: vec![PbObject {
202                    object_info: object_info.into(),
203                }],
204            }),
205        )
206        .await
207    }
208
209    pub async fn notify_hummock(&self, operation: Operation, info: Info) -> NotificationVersion {
210        self.notify_with_version(SubscribeType::Hummock.into(), operation, info)
211            .await
212    }
213
214    pub async fn notify_compactor(&self, operation: Operation, info: Info) -> NotificationVersion {
215        self.notify_with_version(SubscribeType::Compactor.into(), operation, info)
216            .await
217    }
218
219    pub fn notify_compute_without_version(&self, operation: Operation, info: Info) {
220        self.notify_without_version(SubscribeType::Compute.into(), operation, info)
221    }
222
223    pub fn notify_frontend_without_version(&self, operation: Operation, info: Info) {
224        self.notify_without_version(SubscribeType::Frontend.into(), operation, info)
225    }
226
227    pub fn notify_hummock_without_version(&self, operation: Operation, info: Info) {
228        self.notify_without_version(SubscribeType::Hummock.into(), operation, info)
229    }
230
231    pub fn notify_compactor_without_version(&self, operation: Operation, info: Info) {
232        self.notify_without_version(SubscribeType::Compactor.into(), operation, info)
233    }
234
235    #[cfg(any(test, feature = "test"))]
236    pub fn notify_hummock_with_version(
237        &self,
238        operation: Operation,
239        info: Info,
240        version: Option<NotificationVersion>,
241    ) {
242        self.notify(SubscribeType::Hummock.into(), operation, info, version)
243    }
244
245    pub fn notify_local_subscribers(&self, notification: LocalNotification) {
246        let mut core_guard = self.core.lock();
247        core_guard.local_senders.retain(|sender| {
248            if let Err(err) = sender.send(notification.clone()) {
249                tracing::warn!(error = %err.as_report(), "Failed to notify local subscriber");
250                return false;
251            }
252            true
253        });
254    }
255
256    /// Tell `NotificationManagerCore` to delete sender.
257    pub fn delete_sender(&self, worker_type: WorkerType, worker_key: WorkerKey) {
258        let mut core_guard = self.core.lock();
259        // TODO: we may avoid passing the worker_type and remove the `worker_key` in all sender
260        // holders anyway
261        match worker_type {
262            WorkerType::Frontend => core_guard.frontend_senders.remove(&worker_key),
263            WorkerType::ComputeNode | WorkerType::RiseCtl => {
264                core_guard.hummock_senders.remove(&worker_key)
265            }
266            WorkerType::Compactor => core_guard.compactor_senders.remove(&worker_key),
267            _ => unreachable!(),
268        };
269    }
270
271    /// Tell `NotificationManagerCore` to insert sender by `worker_type`.
272    pub fn insert_sender(
273        &self,
274        subscribe_type: SubscribeType,
275        worker_key: WorkerKey,
276        sender: UnboundedSender<Notification>,
277    ) {
278        let mut core_guard = self.core.lock();
279        if core_guard.exiting {
280            tracing::warn!("notification manager exiting.");
281            return;
282        }
283        let senders = core_guard.senders_of(subscribe_type);
284
285        senders.insert(worker_key, sender);
286    }
287
288    pub fn insert_local_sender(&self, sender: UnboundedSender<LocalNotification>) {
289        let mut core_guard = self.core.lock();
290        if core_guard.exiting {
291            tracing::warn!("notification manager exiting.");
292            return;
293        }
294        core_guard.local_senders.push(sender);
295    }
296
297    #[cfg(test)]
298    pub fn clear_local_sender(&self) {
299        self.core.lock().local_senders.clear();
300    }
301
302    pub async fn current_version(&self) -> NotificationVersion {
303        let version_guard = self.version_generator.lock().await;
304        version_guard.current_version()
305    }
306}
307
308type SenderMap = HashMap<WorkerKey, UnboundedSender<Notification>>;
309
310struct NotificationManagerCore {
311    /// The notification sender to frontends.
312    frontend_senders: SenderMap,
313    /// The notification sender to nodes that subscribes the hummock.
314    hummock_senders: SenderMap,
315    /// The notification sender to compactor nodes.
316    compactor_senders: SenderMap,
317    /// The notification sender to compute nodes.
318    compute_senders: HashMap<WorkerKey, UnboundedSender<Notification>>,
319    /// The notification sender to local subscribers.
320    local_senders: Vec<UnboundedSender<LocalNotification>>,
321    exiting: bool,
322}
323
324impl NotificationManagerCore {
325    fn new() -> Self {
326        Self {
327            frontend_senders: HashMap::new(),
328            hummock_senders: HashMap::new(),
329            compactor_senders: HashMap::new(),
330            compute_senders: HashMap::new(),
331            local_senders: vec![],
332            exiting: false,
333        }
334    }
335
336    fn notify(&mut self, target: Target, response: SubscribeResponse) {
337        macro_rules! warn_send_failure {
338            ($subscribe_type:expr, $worker_key:expr, $err:expr) => {
339                tracing::warn!(
340                    "Failed to notify {:?} {:?}: {}",
341                    $subscribe_type,
342                    $worker_key,
343                    $err
344                );
345            };
346        }
347
348        let senders = self.senders_of(target.subscribe_type);
349
350        if let Some(worker_key) = target.worker_key {
351            match senders.entry(worker_key.clone()) {
352                Entry::Occupied(entry) => {
353                    let _ = entry.get().send(Ok(response)).inspect_err(|err| {
354                        warn_send_failure!(target.subscribe_type, &worker_key, err.as_report());
355                        entry.remove_entry();
356                    });
357                }
358                Entry::Vacant(_) => {
359                    tracing::warn!("Failed to find notification sender of {:?}", worker_key)
360                }
361            }
362        } else {
363            senders.retain(|worker_key, sender| {
364                sender
365                    .send(Ok(response.clone()))
366                    .inspect_err(|err| {
367                        warn_send_failure!(target.subscribe_type, &worker_key, err.as_report());
368                    })
369                    .is_ok()
370            });
371        }
372    }
373
374    fn senders_of(&mut self, subscribe_type: SubscribeType) -> &mut SenderMap {
375        match subscribe_type {
376            SubscribeType::Frontend => &mut self.frontend_senders,
377            SubscribeType::Hummock => &mut self.hummock_senders,
378            SubscribeType::Compactor => &mut self.compactor_senders,
379            SubscribeType::Compute => &mut self.compute_senders,
380            SubscribeType::Unspecified => unreachable!(),
381        }
382    }
383}
384
385#[cfg(test)]
386mod tests {
387    use risingwave_pb::common::HostAddress;
388
389    use super::*;
390    use crate::manager::WorkerKey;
391
392    #[tokio::test]
393    async fn test_multiple_subscribers_one_worker() {
394        let mgr = NotificationManager::new(SqlMetaStore::for_test().await).await;
395        let worker_key1 = WorkerKey(HostAddress {
396            host: "a".to_owned(),
397            port: 1,
398        });
399        let worker_key2 = WorkerKey(HostAddress {
400            host: "a".to_owned(),
401            port: 2,
402        });
403        let (tx1, mut rx1) = mpsc::unbounded_channel();
404        let (tx2, mut rx2) = mpsc::unbounded_channel();
405        let (tx3, mut rx3) = mpsc::unbounded_channel();
406        mgr.insert_sender(SubscribeType::Hummock, worker_key1.clone(), tx1);
407        mgr.insert_sender(SubscribeType::Frontend, worker_key1.clone(), tx2);
408        mgr.insert_sender(SubscribeType::Frontend, worker_key2, tx3);
409        mgr.notify_snapshot(
410            worker_key1.clone(),
411            SubscribeType::Hummock,
412            MetaSnapshot::default(),
413        );
414        assert!(rx1.recv().await.is_some());
415        assert!(rx2.try_recv().is_err());
416        assert!(rx3.try_recv().is_err());
417
418        mgr.notify_frontend(Operation::Add, Info::Database(Default::default()))
419            .await;
420        assert!(rx1.try_recv().is_err());
421        assert!(rx2.recv().await.is_some());
422        assert!(rx3.recv().await.is_some());
423    }
424}