risingwave_meta/hummock/manager/
worker.rs1use risingwave_hummock_sdk::HummockVersionId;
16use sync_point::sync_point;
17use thiserror_ext::AsReport;
18use tokio::task::JoinHandle;
19
20use crate::hummock::{HummockManager, HummockManagerRef};
21use crate::manager::LocalNotification;
22
23pub type HummockManagerEventSender = tokio::sync::mpsc::UnboundedSender<HummockManagerEvent>;
24pub type HummockManagerEventReceiver = tokio::sync::mpsc::UnboundedReceiver<HummockManagerEvent>;
25
26pub enum HummockManagerEvent {
27 DropSafePoint(HummockVersionId),
28 #[allow(dead_code)]
29 Shutdown,
30}
31
32impl HummockManager {
33 pub async fn start_worker(
34 self: &HummockManagerRef,
35 mut receiver: HummockManagerEventReceiver,
36 ) -> JoinHandle<()> {
37 let (local_notification_tx, mut local_notification_rx) =
38 tokio::sync::mpsc::unbounded_channel();
39 self.env
40 .notification_manager()
41 .insert_local_sender(local_notification_tx)
42 .await;
43 let hummock_manager = self.clone();
44 tokio::spawn(async move {
45 loop {
46 tokio::select! {
47 notification = local_notification_rx.recv() => {
48 match notification {
49 Some(notification) => {
50 hummock_manager
51 .handle_local_notification(notification)
52 .await;
53 }
54 None => {
55 return;
56 }
57 }
58 }
59 hummock_manager_event = receiver.recv() => {
60 match hummock_manager_event {
61 Some(hummock_manager_event) => {
62 if !hummock_manager
63 .handle_hummock_manager_event(hummock_manager_event)
64 .await {
65 return;
66 }
67 }
68 None => {
69 return;
70 }
71 }
72 }
73 }
74 }
75 })
76 }
77
78 async fn handle_hummock_manager_event(&self, event: HummockManagerEvent) -> bool {
80 match event {
81 HummockManagerEvent::DropSafePoint(id) => {
82 self.unregister_safe_point(id).await;
83 sync_point!("UNREGISTER_HUMMOCK_VERSION_SAFE_POINT");
84 }
85 HummockManagerEvent::Shutdown => {
86 tracing::info!("Hummock manager worker is stopped");
87 return false;
88 }
89 }
90 true
91 }
92
93 async fn handle_local_notification(&self, notification: LocalNotification) {
94 if let LocalNotification::WorkerNodeDeleted(worker_node) = notification {
95 self.release_contexts(vec![worker_node.id])
96 .await
97 .unwrap_or_else(|err| {
98 panic!(
99 "Failed to release hummock context {}, error={}",
100 worker_node.id,
101 err.as_report()
102 )
103 });
104 tracing::info!("Released hummock context {}", worker_node.id);
105 sync_point!("AFTER_RELEASE_HUMMOCK_CONTEXTS_ASYNC");
106 }
107 }
108}