risingwave_meta/manager/
notification.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::sync::Arc;
18
19use risingwave_common::id::JobId;
20use risingwave_common::system_param::reader::SystemParamsReader;
21use risingwave_meta_model::ObjectId;
22use risingwave_pb::common::{WorkerNode, WorkerType};
23use risingwave_pb::meta::object::PbObjectInfo;
24use risingwave_pb::meta::subscribe_response::{Info, Operation};
25use risingwave_pb::meta::{
26    MetaSnapshot, PbObject, PbObjectGroup, SubscribeResponse, SubscribeType,
27};
28use thiserror_ext::AsReport;
29use tokio::sync::Mutex;
30use tokio::sync::mpsc::{self, UnboundedSender};
31use tonic::Status;
32
33use crate::controller::SqlMetaStore;
34use crate::manager::WorkerKey;
35use crate::manager::notification_version::NotificationVersionGenerator;
36use crate::model::FragmentId;
37
38pub type MessageStatus = Status;
39pub type Notification = Result<SubscribeResponse, Status>;
40pub type NotificationManagerRef = Arc<NotificationManager>;
41pub type NotificationVersion = u64;
42/// NOTE(kwannoel): This is just ignored, used in background DDL
43pub const IGNORED_NOTIFICATION_VERSION: u64 = 0;
44
45#[derive(Clone, Debug)]
46pub enum LocalNotification {
47    WorkerNodeDeleted(WorkerNode),
48    WorkerNodeActivated(WorkerNode),
49    SystemParamsChange(SystemParamsReader),
50    BatchParallelismChange,
51    FragmentMappingsUpsert(Vec<FragmentId>),
52    FragmentMappingsDelete(Vec<FragmentId>),
53    SourceDropped(ObjectId),
54    StreamingJobBackfillFinished(JobId),
55}
56
57#[derive(Debug)]
58struct Target {
59    subscribe_type: SubscribeType,
60    // `None` indicates sending to all subscribers of `subscribe_type`.
61    worker_key: Option<WorkerKey>,
62}
63
64impl From<SubscribeType> for Target {
65    fn from(value: SubscribeType) -> Self {
66        Self {
67            subscribe_type: value,
68            worker_key: None,
69        }
70    }
71}
72
73#[derive(Debug)]
74struct Task {
75    target: Target,
76    operation: Operation,
77    info: Info,
78    version: Option<NotificationVersion>,
79}
80
81/// [`NotificationManager`] is used to send notification to frontends and compute nodes.
82pub struct NotificationManager {
83    core: Arc<parking_lot::Mutex<NotificationManagerCore>>,
84    /// Sender used to add a notification into the waiting queue.
85    task_tx: UnboundedSender<Task>,
86    /// The current notification version generator.
87    version_generator: Mutex<NotificationVersionGenerator>,
88}
89
90impl NotificationManager {
91    pub async fn new(meta_store_impl: SqlMetaStore) -> Self {
92        // notification waiting queue.
93        let (task_tx, mut task_rx) = mpsc::unbounded_channel::<Task>();
94        let core = Arc::new(parking_lot::Mutex::new(NotificationManagerCore::new()));
95        let core_clone = core.clone();
96        let version_generator = NotificationVersionGenerator::new(meta_store_impl)
97            .await
98            .unwrap();
99
100        tokio::spawn(async move {
101            while let Some(task) = task_rx.recv().await {
102                let response = SubscribeResponse {
103                    status: None,
104                    operation: task.operation as i32,
105                    info: Some(task.info),
106                    version: task.version.unwrap_or_default(),
107                };
108                core.lock().notify(task.target, response);
109            }
110        });
111
112        Self {
113            core: core_clone,
114            task_tx,
115            version_generator: Mutex::new(version_generator),
116        }
117    }
118
119    pub fn abort_all(&self) {
120        let mut guard = self.core.lock();
121        *guard = NotificationManagerCore::new();
122        guard.exiting = true;
123    }
124
125    #[inline(always)]
126    fn notify(
127        &self,
128        target: Target,
129        operation: Operation,
130        info: Info,
131        version: Option<NotificationVersion>,
132    ) {
133        let task = Task {
134            target,
135            operation,
136            info,
137            version,
138        };
139        self.task_tx.send(task).unwrap();
140    }
141
142    /// Add a notification to the waiting queue and increase notification version.
143    async fn notify_with_version(
144        &self,
145        target: Target,
146        operation: Operation,
147        info: Info,
148    ) -> NotificationVersion {
149        let mut version_guard = self.version_generator.lock().await;
150        version_guard.increase_version().await;
151        let version = version_guard.current_version();
152        self.notify(target, operation, info, Some(version));
153        version
154    }
155
156    /// Add a notification to the waiting queue and return immediately
157    #[inline(always)]
158    fn notify_without_version(&self, target: Target, operation: Operation, info: Info) {
159        self.notify(target, operation, info, None);
160    }
161
162    pub fn notify_snapshot(
163        &self,
164        worker_key: WorkerKey,
165        subscribe_type: SubscribeType,
166        meta_snapshot: MetaSnapshot,
167    ) {
168        self.notify_without_version(
169            Target {
170                subscribe_type,
171                worker_key: Some(worker_key),
172            },
173            Operation::Snapshot,
174            Info::Snapshot(meta_snapshot),
175        )
176    }
177
178    pub fn notify_all_without_version(&self, operation: Operation, info: Info) {
179        for subscribe_type in [
180            SubscribeType::Frontend,
181            SubscribeType::Hummock,
182            SubscribeType::Compactor,
183            SubscribeType::Compute,
184        ] {
185            self.notify_without_version(subscribe_type.into(), operation, info.clone());
186        }
187    }
188
189    pub async fn notify_frontend(&self, operation: Operation, info: Info) -> NotificationVersion {
190        self.notify_with_version(SubscribeType::Frontend.into(), operation, info)
191            .await
192    }
193
194    pub async fn notify_frontend_object_info(
195        &self,
196        operation: Operation,
197        object_info: PbObjectInfo,
198    ) -> NotificationVersion {
199        self.notify_with_version(
200            SubscribeType::Frontend.into(),
201            operation,
202            Info::ObjectGroup(PbObjectGroup {
203                objects: vec![PbObject {
204                    object_info: object_info.into(),
205                }],
206                dependencies: vec![],
207            }),
208        )
209        .await
210    }
211
212    pub async fn notify_hummock(&self, operation: Operation, info: Info) -> NotificationVersion {
213        self.notify_with_version(SubscribeType::Hummock.into(), operation, info)
214            .await
215    }
216
217    pub async fn notify_compactor(&self, operation: Operation, info: Info) -> NotificationVersion {
218        self.notify_with_version(SubscribeType::Compactor.into(), operation, info)
219            .await
220    }
221
222    pub fn notify_compute_without_version(&self, operation: Operation, info: Info) {
223        self.notify_without_version(SubscribeType::Compute.into(), operation, info)
224    }
225
226    pub fn notify_frontend_without_version(&self, operation: Operation, info: Info) {
227        self.notify_without_version(SubscribeType::Frontend.into(), operation, info)
228    }
229
230    pub fn notify_hummock_without_version(&self, operation: Operation, info: Info) {
231        self.notify_without_version(SubscribeType::Hummock.into(), operation, info)
232    }
233
234    pub fn notify_compactor_without_version(&self, operation: Operation, info: Info) {
235        self.notify_without_version(SubscribeType::Compactor.into(), operation, info)
236    }
237
238    #[cfg(any(test, feature = "test"))]
239    pub fn notify_hummock_with_version(
240        &self,
241        operation: Operation,
242        info: Info,
243        version: Option<NotificationVersion>,
244    ) {
245        self.notify(SubscribeType::Hummock.into(), operation, info, version)
246    }
247
248    pub fn notify_local_subscribers(&self, notification: LocalNotification) {
249        let mut core_guard = self.core.lock();
250        core_guard.local_senders.retain(|sender| {
251            if let Err(err) = sender.send(notification.clone()) {
252                tracing::warn!(error = %err.as_report(), "Failed to notify local subscriber");
253                return false;
254            }
255            true
256        });
257    }
258
259    /// Tell `NotificationManagerCore` to delete sender.
260    pub fn delete_sender(&self, worker_type: WorkerType, worker_key: WorkerKey) {
261        let mut core_guard = self.core.lock();
262        // TODO: we may avoid passing the worker_type and remove the `worker_key` in all sender
263        // holders anyway
264        match worker_type {
265            WorkerType::Frontend => core_guard.frontend_senders.remove(&worker_key),
266            WorkerType::ComputeNode | WorkerType::RiseCtl => {
267                core_guard.hummock_senders.remove(&worker_key)
268            }
269            WorkerType::Compactor => core_guard.compactor_senders.remove(&worker_key),
270            _ => unreachable!(),
271        };
272    }
273
274    /// Tell `NotificationManagerCore` to insert sender by `worker_type`.
275    pub fn insert_sender(
276        &self,
277        subscribe_type: SubscribeType,
278        worker_key: WorkerKey,
279        sender: UnboundedSender<Notification>,
280    ) {
281        let mut core_guard = self.core.lock();
282        if core_guard.exiting {
283            tracing::warn!("notification manager exiting.");
284            return;
285        }
286        let senders = core_guard.senders_of(subscribe_type);
287
288        senders.insert(worker_key, sender);
289    }
290
291    pub fn insert_local_sender(&self, sender: UnboundedSender<LocalNotification>) {
292        let mut core_guard = self.core.lock();
293        if core_guard.exiting {
294            tracing::warn!("notification manager exiting.");
295            return;
296        }
297        core_guard.local_senders.push(sender);
298    }
299
300    #[cfg(test)]
301    pub fn clear_local_sender(&self) {
302        self.core.lock().local_senders.clear();
303    }
304
305    pub async fn current_version(&self) -> NotificationVersion {
306        let version_guard = self.version_generator.lock().await;
307        version_guard.current_version()
308    }
309}
310
311type SenderMap = HashMap<WorkerKey, UnboundedSender<Notification>>;
312
313struct NotificationManagerCore {
314    /// The notification sender to frontends.
315    frontend_senders: SenderMap,
316    /// The notification sender to nodes that subscribes the hummock.
317    hummock_senders: SenderMap,
318    /// The notification sender to compactor nodes.
319    compactor_senders: SenderMap,
320    /// The notification sender to compute nodes.
321    compute_senders: SenderMap,
322    /// The notification sender to local subscribers.
323    local_senders: Vec<UnboundedSender<LocalNotification>>,
324    exiting: bool,
325}
326
327impl NotificationManagerCore {
328    fn new() -> Self {
329        Self {
330            frontend_senders: HashMap::new(),
331            hummock_senders: HashMap::new(),
332            compactor_senders: HashMap::new(),
333            compute_senders: HashMap::new(),
334            local_senders: vec![],
335            exiting: false,
336        }
337    }
338
339    fn notify(&mut self, target: Target, response: SubscribeResponse) {
340        macro_rules! warn_send_failure {
341            ($subscribe_type:expr, $worker_key:expr, $err:expr) => {
342                tracing::warn!(
343                    "Failed to notify {:?} {:?}: {}",
344                    $subscribe_type,
345                    $worker_key,
346                    $err
347                );
348            };
349        }
350
351        let senders = self.senders_of(target.subscribe_type);
352
353        if let Some(worker_key) = target.worker_key {
354            match senders.entry(worker_key.clone()) {
355                Entry::Occupied(entry) => {
356                    let _ = entry.get().send(Ok(response)).inspect_err(|err| {
357                        warn_send_failure!(target.subscribe_type, &worker_key, err.as_report());
358                        entry.remove_entry();
359                    });
360                }
361                Entry::Vacant(_) => {
362                    tracing::warn!("Failed to find notification sender of {:?}", worker_key)
363                }
364            }
365        } else {
366            senders.retain(|worker_key, sender| {
367                sender
368                    .send(Ok(response.clone()))
369                    .inspect_err(|err| {
370                        warn_send_failure!(target.subscribe_type, &worker_key, err.as_report());
371                    })
372                    .is_ok()
373            });
374        }
375    }
376
377    fn senders_of(&mut self, subscribe_type: SubscribeType) -> &mut SenderMap {
378        match subscribe_type {
379            SubscribeType::Frontend => &mut self.frontend_senders,
380            SubscribeType::Hummock => &mut self.hummock_senders,
381            SubscribeType::Compactor => &mut self.compactor_senders,
382            SubscribeType::Compute => &mut self.compute_senders,
383            SubscribeType::Unspecified => unreachable!(),
384        }
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use risingwave_common::id::JobId;
391    use risingwave_pb::common::HostAddress;
392
393    use super::*;
394    use crate::manager::WorkerKey;
395
396    #[tokio::test]
397    async fn test_multiple_subscribers_one_worker() {
398        let mgr = NotificationManager::new(SqlMetaStore::for_test().await).await;
399        let worker_key1 = WorkerKey(HostAddress {
400            host: "a".to_owned(),
401            port: 1,
402        });
403        let worker_key2 = WorkerKey(HostAddress {
404            host: "a".to_owned(),
405            port: 2,
406        });
407        let (tx1, mut rx1) = mpsc::unbounded_channel();
408        let (tx2, mut rx2) = mpsc::unbounded_channel();
409        let (tx3, mut rx3) = mpsc::unbounded_channel();
410        mgr.insert_sender(SubscribeType::Hummock, worker_key1.clone(), tx1);
411        mgr.insert_sender(SubscribeType::Frontend, worker_key1.clone(), tx2);
412        mgr.insert_sender(SubscribeType::Frontend, worker_key2, tx3);
413        mgr.notify_snapshot(
414            worker_key1.clone(),
415            SubscribeType::Hummock,
416            MetaSnapshot::default(),
417        );
418        assert!(rx1.recv().await.is_some());
419        assert!(rx2.try_recv().is_err());
420        assert!(rx3.try_recv().is_err());
421
422        mgr.notify_frontend(Operation::Add, Info::Database(Default::default()))
423            .await;
424        assert!(rx1.try_recv().is_err());
425        assert!(rx2.recv().await.is_some());
426        assert!(rx3.recv().await.is_some());
427    }
428
429    #[tokio::test]
430    async fn test_local_notification_backfill_finished() {
431        let mgr = NotificationManager::new(SqlMetaStore::for_test().await).await;
432        let (tx, mut rx) = mpsc::unbounded_channel();
433        mgr.insert_local_sender(tx);
434
435        let job_id = JobId::new(42);
436        mgr.notify_local_subscribers(LocalNotification::StreamingJobBackfillFinished(job_id));
437
438        match rx.recv().await.expect("should receive notification") {
439            LocalNotification::StreamingJobBackfillFinished(received) => {
440                assert_eq!(received, job_id);
441            }
442            other => panic!("unexpected notification: {other:?}"),
443        }
444    }
445}