risingwave_meta/hummock/manager/
worker.rs1use risingwave_hummock_sdk::HummockVersionId;
16use sync_point::sync_point;
17use thiserror_ext::AsReport;
18use tokio::task::JoinHandle;
19
20use crate::hummock::{HummockManager, HummockManagerRef};
21use crate::manager::LocalNotification;
22
23pub type HummockManagerEventSender = tokio::sync::mpsc::UnboundedSender<HummockManagerEvent>;
24pub type HummockManagerEventReceiver = tokio::sync::mpsc::UnboundedReceiver<HummockManagerEvent>;
25
26pub enum HummockManagerEvent {
27 DropSafePoint(HummockVersionId),
28 #[allow(dead_code)]
29 Shutdown,
30}
31
32impl HummockManager {
33 pub fn start_worker(
34 self: &HummockManagerRef,
35 mut receiver: HummockManagerEventReceiver,
36 ) -> JoinHandle<()> {
37 let (local_notification_tx, mut local_notification_rx) =
38 tokio::sync::mpsc::unbounded_channel();
39 self.env
40 .notification_manager()
41 .insert_local_sender(local_notification_tx);
42 let hummock_manager = self.clone();
43 tokio::spawn(async move {
44 loop {
45 tokio::select! {
46 notification = local_notification_rx.recv() => {
47 match notification {
48 Some(notification) => {
49 hummock_manager
50 .handle_local_notification(notification)
51 .await;
52 }
53 None => {
54 return;
55 }
56 }
57 }
58 hummock_manager_event = receiver.recv() => {
59 match hummock_manager_event {
60 Some(hummock_manager_event) => {
61 if !hummock_manager
62 .handle_hummock_manager_event(hummock_manager_event)
63 .await {
64 return;
65 }
66 }
67 None => {
68 return;
69 }
70 }
71 }
72 }
73 }
74 })
75 }
76
77 async fn handle_hummock_manager_event(&self, event: HummockManagerEvent) -> bool {
79 match event {
80 HummockManagerEvent::DropSafePoint(id) => {
81 self.unregister_safe_point(id).await;
82 sync_point!("UNREGISTER_HUMMOCK_VERSION_SAFE_POINT");
83 }
84 HummockManagerEvent::Shutdown => {
85 tracing::info!("Hummock manager worker is stopped");
86 return false;
87 }
88 }
89 true
90 }
91
92 async fn handle_local_notification(&self, notification: LocalNotification) {
93 if let LocalNotification::WorkerNodeDeleted(worker_node) = notification {
94 self.release_contexts(vec![worker_node.id])
95 .await
96 .unwrap_or_else(|err| {
97 panic!(
98 "Failed to release hummock context {}, error={}",
99 worker_node.id,
100 err.as_report()
101 )
102 });
103 tracing::info!("Released hummock context {}", worker_node.id);
104 sync_point!("AFTER_RELEASE_HUMMOCK_CONTEXTS_ASYNC");
105 }
106 }
107}