risingwave_meta/hummock/manager/
worker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_hummock_sdk::HummockVersionId;
16use sync_point::sync_point;
17use thiserror_ext::AsReport;
18use tokio::task::JoinHandle;
19
20use crate::hummock::{HummockManager, HummockManagerRef};
21use crate::manager::LocalNotification;
22
23pub type HummockManagerEventSender = tokio::sync::mpsc::UnboundedSender<HummockManagerEvent>;
24pub type HummockManagerEventReceiver = tokio::sync::mpsc::UnboundedReceiver<HummockManagerEvent>;
25
26pub enum HummockManagerEvent {
27    DropSafePoint(HummockVersionId),
28    #[allow(dead_code)]
29    Shutdown,
30}
31
32impl HummockManager {
33    pub fn start_worker(
34        self: &HummockManagerRef,
35        mut receiver: HummockManagerEventReceiver,
36    ) -> JoinHandle<()> {
37        let (local_notification_tx, mut local_notification_rx) =
38            tokio::sync::mpsc::unbounded_channel();
39        self.env
40            .notification_manager()
41            .insert_local_sender(local_notification_tx);
42        let hummock_manager = self.clone();
43        tokio::spawn(async move {
44            loop {
45                tokio::select! {
46                    notification = local_notification_rx.recv() => {
47                        match notification {
48                            Some(notification) => {
49                                hummock_manager
50                                    .handle_local_notification(notification)
51                                    .await;
52                            }
53                            None => {
54                                return;
55                            }
56                        }
57                    }
58                    hummock_manager_event = receiver.recv() => {
59                        match hummock_manager_event {
60                            Some(hummock_manager_event) => {
61                                if !hummock_manager
62                                    .handle_hummock_manager_event(hummock_manager_event)
63                                    .await {
64                                    return;
65                                }
66                            }
67                            None => {
68                                return;
69                            }
70                        }
71                    }
72                }
73            }
74        })
75    }
76
77    /// Returns false indicates to shutdown worker
78    async fn handle_hummock_manager_event(&self, event: HummockManagerEvent) -> bool {
79        match event {
80            HummockManagerEvent::DropSafePoint(id) => {
81                self.unregister_safe_point(id).await;
82                sync_point!("UNREGISTER_HUMMOCK_VERSION_SAFE_POINT");
83            }
84            HummockManagerEvent::Shutdown => {
85                tracing::info!("Hummock manager worker is stopped");
86                return false;
87            }
88        }
89        true
90    }
91
92    async fn handle_local_notification(&self, notification: LocalNotification) {
93        if let LocalNotification::WorkerNodeDeleted(worker_node) = notification {
94            self.release_contexts(vec![worker_node.id])
95                .await
96                .unwrap_or_else(|err| {
97                    panic!(
98                        "Failed to release hummock context {}, error={}",
99                        worker_node.id,
100                        err.as_report()
101                    )
102                });
103            tracing::info!("Released hummock context {}", worker_node.id);
104            sync_point!("AFTER_RELEASE_HUMMOCK_CONTEXTS_ASYNC");
105        }
106    }
107}