risingwave_meta/hummock/manager/
worker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_hummock_sdk::HummockVersionId;
16use sync_point::sync_point;
17use thiserror_ext::AsReport;
18use tokio::task::JoinHandle;
19
20use crate::hummock::{HummockManager, HummockManagerRef};
21use crate::manager::LocalNotification;
22
23pub type HummockManagerEventSender = tokio::sync::mpsc::UnboundedSender<HummockManagerEvent>;
24pub type HummockManagerEventReceiver = tokio::sync::mpsc::UnboundedReceiver<HummockManagerEvent>;
25
26pub enum HummockManagerEvent {
27    DropSafePoint(HummockVersionId),
28    #[allow(dead_code)]
29    Shutdown,
30}
31
32impl HummockManager {
33    pub async fn start_worker(
34        self: &HummockManagerRef,
35        mut receiver: HummockManagerEventReceiver,
36    ) -> JoinHandle<()> {
37        let (local_notification_tx, mut local_notification_rx) =
38            tokio::sync::mpsc::unbounded_channel();
39        self.env
40            .notification_manager()
41            .insert_local_sender(local_notification_tx)
42            .await;
43        let hummock_manager = self.clone();
44        tokio::spawn(async move {
45            loop {
46                tokio::select! {
47                    notification = local_notification_rx.recv() => {
48                        match notification {
49                            Some(notification) => {
50                                hummock_manager
51                                    .handle_local_notification(notification)
52                                    .await;
53                            }
54                            None => {
55                                return;
56                            }
57                        }
58                    }
59                    hummock_manager_event = receiver.recv() => {
60                        match hummock_manager_event {
61                            Some(hummock_manager_event) => {
62                                if !hummock_manager
63                                    .handle_hummock_manager_event(hummock_manager_event)
64                                    .await {
65                                    return;
66                                }
67                            }
68                            None => {
69                                return;
70                            }
71                        }
72                    }
73                }
74            }
75        })
76    }
77
78    /// Returns false indicates to shutdown worker
79    async fn handle_hummock_manager_event(&self, event: HummockManagerEvent) -> bool {
80        match event {
81            HummockManagerEvent::DropSafePoint(id) => {
82                self.unregister_safe_point(id).await;
83                sync_point!("UNREGISTER_HUMMOCK_VERSION_SAFE_POINT");
84            }
85            HummockManagerEvent::Shutdown => {
86                tracing::info!("Hummock manager worker is stopped");
87                return false;
88            }
89        }
90        true
91    }
92
93    async fn handle_local_notification(&self, notification: LocalNotification) {
94        if let LocalNotification::WorkerNodeDeleted(worker_node) = notification {
95            self.release_contexts(vec![worker_node.id])
96                .await
97                .unwrap_or_else(|err| {
98                    panic!(
99                        "Failed to release hummock context {}, error={}",
100                        worker_node.id,
101                        err.as_report()
102                    )
103                });
104            tracing::info!("Released hummock context {}", worker_node.id);
105            sync_point!("AFTER_RELEASE_HUMMOCK_CONTEXTS_ASYNC");
106        }
107    }
108}