risingwave_meta/manager/
notification.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::sync::Arc;
18
19use risingwave_common::id::JobId;
20use risingwave_common::system_param::reader::SystemParamsReader;
21use risingwave_meta_model::ObjectId;
22use risingwave_pb::common::{WorkerNode, WorkerType};
23use risingwave_pb::meta::object::PbObjectInfo;
24use risingwave_pb::meta::subscribe_response::{Info, Operation};
25use risingwave_pb::meta::{
26    MetaSnapshot, PbObject, PbObjectGroup, SubscribeResponse, SubscribeType,
27};
28use thiserror_ext::AsReport;
29use tokio::sync::Mutex;
30use tokio::sync::mpsc::{self, UnboundedSender};
31use tonic::Status;
32
33use crate::controller::SqlMetaStore;
34use crate::manager::WorkerKey;
35use crate::manager::notification_version::NotificationVersionGenerator;
36use crate::model::FragmentId;
37
38pub type MessageStatus = Status;
39pub type Notification = Result<SubscribeResponse, Status>;
40pub type NotificationManagerRef = Arc<NotificationManager>;
41pub type NotificationVersion = u64;
42/// NOTE(kwannoel): This is just ignored, used in background DDL
43pub const IGNORED_NOTIFICATION_VERSION: u64 = 0;
44
45#[derive(Clone, Debug)]
46pub enum LocalNotification {
47    WorkerNodeDeleted(WorkerNode),
48    WorkerNodeActivated(WorkerNode),
49    SystemParamsChange(SystemParamsReader),
50    BatchParallelismChange,
51    FragmentMappingsUpsert(Vec<FragmentId>),
52    FragmentMappingsDelete(Vec<FragmentId>),
53    SourceDropped(ObjectId),
54    StreamingJobBackfillFinished(JobId),
55}
56
57#[derive(Debug)]
58struct Target {
59    subscribe_type: SubscribeType,
60    // `None` indicates sending to all subscribers of `subscribe_type`.
61    worker_key: Option<WorkerKey>,
62}
63
64impl From<SubscribeType> for Target {
65    fn from(value: SubscribeType) -> Self {
66        Self {
67            subscribe_type: value,
68            worker_key: None,
69        }
70    }
71}
72
73#[derive(Debug)]
74struct Task {
75    target: Target,
76    operation: Operation,
77    info: Info,
78    version: Option<NotificationVersion>,
79}
80
81/// [`NotificationManager`] is used to send notification to frontends and compute nodes.
82pub struct NotificationManager {
83    core: Arc<parking_lot::Mutex<NotificationManagerCore>>,
84    /// Sender used to add a notification into the waiting queue.
85    task_tx: UnboundedSender<Task>,
86    /// The current notification version generator.
87    version_generator: Mutex<NotificationVersionGenerator>,
88}
89
90impl NotificationManager {
91    pub async fn new(meta_store_impl: SqlMetaStore) -> Self {
92        // notification waiting queue.
93        let (task_tx, mut task_rx) = mpsc::unbounded_channel::<Task>();
94        let core = Arc::new(parking_lot::Mutex::new(NotificationManagerCore::new()));
95        let core_clone = core.clone();
96        let version_generator = NotificationVersionGenerator::new(meta_store_impl)
97            .await
98            .unwrap();
99
100        tokio::spawn(async move {
101            while let Some(task) = task_rx.recv().await {
102                let response = SubscribeResponse {
103                    status: None,
104                    operation: task.operation as i32,
105                    info: Some(task.info),
106                    version: task.version.unwrap_or_default(),
107                };
108                core.lock().notify(task.target, response);
109            }
110        });
111
112        Self {
113            core: core_clone,
114            task_tx,
115            version_generator: Mutex::new(version_generator),
116        }
117    }
118
119    pub fn abort_all(&self) {
120        let mut guard = self.core.lock();
121        *guard = NotificationManagerCore::new();
122        guard.exiting = true;
123    }
124
125    #[inline(always)]
126    fn notify(
127        &self,
128        target: Target,
129        operation: Operation,
130        info: Info,
131        version: Option<NotificationVersion>,
132    ) {
133        let task = Task {
134            target,
135            operation,
136            info,
137            version,
138        };
139        self.task_tx.send(task).unwrap();
140    }
141
142    /// Add a notification to the waiting queue and increase notification version.
143    async fn notify_with_version(
144        &self,
145        target: Target,
146        operation: Operation,
147        info: Info,
148    ) -> NotificationVersion {
149        let mut version_guard = self.version_generator.lock().await;
150        version_guard.increase_version().await;
151        let version = version_guard.current_version();
152        self.notify(target, operation, info, Some(version));
153        version
154    }
155
156    /// Add a notification to the waiting queue and return immediately
157    #[inline(always)]
158    fn notify_without_version(&self, target: Target, operation: Operation, info: Info) {
159        self.notify(target, operation, info, None);
160    }
161
162    pub fn notify_snapshot(
163        &self,
164        worker_key: WorkerKey,
165        subscribe_type: SubscribeType,
166        meta_snapshot: MetaSnapshot,
167    ) {
168        self.notify_without_version(
169            Target {
170                subscribe_type,
171                worker_key: Some(worker_key),
172            },
173            Operation::Snapshot,
174            Info::Snapshot(meta_snapshot),
175        )
176    }
177
178    pub fn notify_all_without_version(&self, operation: Operation, info: Info) {
179        for subscribe_type in [
180            SubscribeType::Frontend,
181            SubscribeType::Hummock,
182            SubscribeType::Compactor,
183            SubscribeType::Compute,
184        ] {
185            self.notify_without_version(subscribe_type.into(), operation, info.clone());
186        }
187    }
188
189    pub async fn notify_frontend(&self, operation: Operation, info: Info) -> NotificationVersion {
190        self.notify_with_version(SubscribeType::Frontend.into(), operation, info)
191            .await
192    }
193
194    pub async fn notify_frontend_object_info(
195        &self,
196        operation: Operation,
197        object_info: PbObjectInfo,
198    ) -> NotificationVersion {
199        self.notify_with_version(
200            SubscribeType::Frontend.into(),
201            operation,
202            Info::ObjectGroup(PbObjectGroup {
203                objects: vec![PbObject {
204                    object_info: object_info.into(),
205                }],
206            }),
207        )
208        .await
209    }
210
211    pub async fn notify_hummock(&self, operation: Operation, info: Info) -> NotificationVersion {
212        self.notify_with_version(SubscribeType::Hummock.into(), operation, info)
213            .await
214    }
215
216    pub async fn notify_compactor(&self, operation: Operation, info: Info) -> NotificationVersion {
217        self.notify_with_version(SubscribeType::Compactor.into(), operation, info)
218            .await
219    }
220
221    pub fn notify_compute_without_version(&self, operation: Operation, info: Info) {
222        self.notify_without_version(SubscribeType::Compute.into(), operation, info)
223    }
224
225    pub fn notify_frontend_without_version(&self, operation: Operation, info: Info) {
226        self.notify_without_version(SubscribeType::Frontend.into(), operation, info)
227    }
228
229    pub fn notify_hummock_without_version(&self, operation: Operation, info: Info) {
230        self.notify_without_version(SubscribeType::Hummock.into(), operation, info)
231    }
232
233    pub fn notify_compactor_without_version(&self, operation: Operation, info: Info) {
234        self.notify_without_version(SubscribeType::Compactor.into(), operation, info)
235    }
236
237    #[cfg(any(test, feature = "test"))]
238    pub fn notify_hummock_with_version(
239        &self,
240        operation: Operation,
241        info: Info,
242        version: Option<NotificationVersion>,
243    ) {
244        self.notify(SubscribeType::Hummock.into(), operation, info, version)
245    }
246
247    pub fn notify_local_subscribers(&self, notification: LocalNotification) {
248        let mut core_guard = self.core.lock();
249        core_guard.local_senders.retain(|sender| {
250            if let Err(err) = sender.send(notification.clone()) {
251                tracing::warn!(error = %err.as_report(), "Failed to notify local subscriber");
252                return false;
253            }
254            true
255        });
256    }
257
258    /// Tell `NotificationManagerCore` to delete sender.
259    pub fn delete_sender(&self, worker_type: WorkerType, worker_key: WorkerKey) {
260        let mut core_guard = self.core.lock();
261        // TODO: we may avoid passing the worker_type and remove the `worker_key` in all sender
262        // holders anyway
263        match worker_type {
264            WorkerType::Frontend => core_guard.frontend_senders.remove(&worker_key),
265            WorkerType::ComputeNode | WorkerType::RiseCtl => {
266                core_guard.hummock_senders.remove(&worker_key)
267            }
268            WorkerType::Compactor => core_guard.compactor_senders.remove(&worker_key),
269            _ => unreachable!(),
270        };
271    }
272
273    /// Tell `NotificationManagerCore` to insert sender by `worker_type`.
274    pub fn insert_sender(
275        &self,
276        subscribe_type: SubscribeType,
277        worker_key: WorkerKey,
278        sender: UnboundedSender<Notification>,
279    ) {
280        let mut core_guard = self.core.lock();
281        if core_guard.exiting {
282            tracing::warn!("notification manager exiting.");
283            return;
284        }
285        let senders = core_guard.senders_of(subscribe_type);
286
287        senders.insert(worker_key, sender);
288    }
289
290    pub fn insert_local_sender(&self, sender: UnboundedSender<LocalNotification>) {
291        let mut core_guard = self.core.lock();
292        if core_guard.exiting {
293            tracing::warn!("notification manager exiting.");
294            return;
295        }
296        core_guard.local_senders.push(sender);
297    }
298
299    #[cfg(test)]
300    pub fn clear_local_sender(&self) {
301        self.core.lock().local_senders.clear();
302    }
303
304    pub async fn current_version(&self) -> NotificationVersion {
305        let version_guard = self.version_generator.lock().await;
306        version_guard.current_version()
307    }
308}
309
310type SenderMap = HashMap<WorkerKey, UnboundedSender<Notification>>;
311
312struct NotificationManagerCore {
313    /// The notification sender to frontends.
314    frontend_senders: SenderMap,
315    /// The notification sender to nodes that subscribes the hummock.
316    hummock_senders: SenderMap,
317    /// The notification sender to compactor nodes.
318    compactor_senders: SenderMap,
319    /// The notification sender to compute nodes.
320    compute_senders: HashMap<WorkerKey, UnboundedSender<Notification>>,
321    /// The notification sender to local subscribers.
322    local_senders: Vec<UnboundedSender<LocalNotification>>,
323    exiting: bool,
324}
325
326impl NotificationManagerCore {
327    fn new() -> Self {
328        Self {
329            frontend_senders: HashMap::new(),
330            hummock_senders: HashMap::new(),
331            compactor_senders: HashMap::new(),
332            compute_senders: HashMap::new(),
333            local_senders: vec![],
334            exiting: false,
335        }
336    }
337
338    fn notify(&mut self, target: Target, response: SubscribeResponse) {
339        macro_rules! warn_send_failure {
340            ($subscribe_type:expr, $worker_key:expr, $err:expr) => {
341                tracing::warn!(
342                    "Failed to notify {:?} {:?}: {}",
343                    $subscribe_type,
344                    $worker_key,
345                    $err
346                );
347            };
348        }
349
350        let senders = self.senders_of(target.subscribe_type);
351
352        if let Some(worker_key) = target.worker_key {
353            match senders.entry(worker_key.clone()) {
354                Entry::Occupied(entry) => {
355                    let _ = entry.get().send(Ok(response)).inspect_err(|err| {
356                        warn_send_failure!(target.subscribe_type, &worker_key, err.as_report());
357                        entry.remove_entry();
358                    });
359                }
360                Entry::Vacant(_) => {
361                    tracing::warn!("Failed to find notification sender of {:?}", worker_key)
362                }
363            }
364        } else {
365            senders.retain(|worker_key, sender| {
366                sender
367                    .send(Ok(response.clone()))
368                    .inspect_err(|err| {
369                        warn_send_failure!(target.subscribe_type, &worker_key, err.as_report());
370                    })
371                    .is_ok()
372            });
373        }
374    }
375
376    fn senders_of(&mut self, subscribe_type: SubscribeType) -> &mut SenderMap {
377        match subscribe_type {
378            SubscribeType::Frontend => &mut self.frontend_senders,
379            SubscribeType::Hummock => &mut self.hummock_senders,
380            SubscribeType::Compactor => &mut self.compactor_senders,
381            SubscribeType::Compute => &mut self.compute_senders,
382            SubscribeType::Unspecified => unreachable!(),
383        }
384    }
385}
386
387#[cfg(test)]
388mod tests {
389    use risingwave_common::id::JobId;
390    use risingwave_pb::common::HostAddress;
391
392    use super::*;
393    use crate::manager::WorkerKey;
394
395    #[tokio::test]
396    async fn test_multiple_subscribers_one_worker() {
397        let mgr = NotificationManager::new(SqlMetaStore::for_test().await).await;
398        let worker_key1 = WorkerKey(HostAddress {
399            host: "a".to_owned(),
400            port: 1,
401        });
402        let worker_key2 = WorkerKey(HostAddress {
403            host: "a".to_owned(),
404            port: 2,
405        });
406        let (tx1, mut rx1) = mpsc::unbounded_channel();
407        let (tx2, mut rx2) = mpsc::unbounded_channel();
408        let (tx3, mut rx3) = mpsc::unbounded_channel();
409        mgr.insert_sender(SubscribeType::Hummock, worker_key1.clone(), tx1);
410        mgr.insert_sender(SubscribeType::Frontend, worker_key1.clone(), tx2);
411        mgr.insert_sender(SubscribeType::Frontend, worker_key2, tx3);
412        mgr.notify_snapshot(
413            worker_key1.clone(),
414            SubscribeType::Hummock,
415            MetaSnapshot::default(),
416        );
417        assert!(rx1.recv().await.is_some());
418        assert!(rx2.try_recv().is_err());
419        assert!(rx3.try_recv().is_err());
420
421        mgr.notify_frontend(Operation::Add, Info::Database(Default::default()))
422            .await;
423        assert!(rx1.try_recv().is_err());
424        assert!(rx2.recv().await.is_some());
425        assert!(rx3.recv().await.is_some());
426    }
427
428    #[tokio::test]
429    async fn test_local_notification_backfill_finished() {
430        let mgr = NotificationManager::new(SqlMetaStore::for_test().await).await;
431        let (tx, mut rx) = mpsc::unbounded_channel();
432        mgr.insert_local_sender(tx);
433
434        let job_id = JobId::new(42);
435        mgr.notify_local_subscribers(LocalNotification::StreamingJobBackfillFinished(job_id));
436
437        match rx.recv().await.expect("should receive notification") {
438            LocalNotification::StreamingJobBackfillFinished(received) => {
439                assert_eq!(received, job_id);
440            }
441            other => panic!("unexpected notification: {other:?}"),
442        }
443    }
444}