risingwave_meta/hummock/manager/
worker.rs1use risingwave_hummock_sdk::HummockVersionId;
16use risingwave_pb::common::WorkerType;
17use sync_point::sync_point;
18use thiserror_ext::AsReport;
19use tokio::task::JoinHandle;
20
21use crate::hummock::{HummockManager, HummockManagerRef};
22use crate::manager::LocalNotification;
23
24pub type HummockManagerEventSender = tokio::sync::mpsc::UnboundedSender<HummockManagerEvent>;
25pub type HummockManagerEventReceiver = tokio::sync::mpsc::UnboundedReceiver<HummockManagerEvent>;
26
27pub enum HummockManagerEvent {
28 DropSafePoint(HummockVersionId),
29 #[allow(dead_code)]
30 Shutdown,
31}
32
33impl HummockManager {
34 pub async fn start_worker(
35 self: &HummockManagerRef,
36 mut receiver: HummockManagerEventReceiver,
37 ) -> JoinHandle<()> {
38 let (local_notification_tx, mut local_notification_rx) =
39 tokio::sync::mpsc::unbounded_channel();
40 self.env
41 .notification_manager()
42 .insert_local_sender(local_notification_tx)
43 .await;
44 let hummock_manager = self.clone();
45 tokio::spawn(async move {
46 loop {
47 tokio::select! {
48 notification = local_notification_rx.recv() => {
49 match notification {
50 Some(notification) => {
51 hummock_manager
52 .handle_local_notification(notification)
53 .await;
54 }
55 None => {
56 return;
57 }
58 }
59 }
60 hummock_manager_event = receiver.recv() => {
61 match hummock_manager_event {
62 Some(hummock_manager_event) => {
63 if !hummock_manager
64 .handle_hummock_manager_event(hummock_manager_event)
65 .await {
66 return;
67 }
68 }
69 None => {
70 return;
71 }
72 }
73 }
74 }
75 }
76 })
77 }
78
79 async fn handle_hummock_manager_event(&self, event: HummockManagerEvent) -> bool {
81 match event {
82 HummockManagerEvent::DropSafePoint(id) => {
83 self.unregister_safe_point(id).await;
84 sync_point!("UNREGISTER_HUMMOCK_VERSION_SAFE_POINT");
85 }
86 HummockManagerEvent::Shutdown => {
87 tracing::info!("Hummock manager worker is stopped");
88 return false;
89 }
90 }
91 true
92 }
93
94 async fn handle_local_notification(&self, notification: LocalNotification) {
95 if let LocalNotification::WorkerNodeDeleted(worker_node) = notification {
96 if worker_node.get_type().unwrap() == WorkerType::Compactor {
97 self.compactor_manager.remove_compactor(worker_node.id);
98 }
99 self.release_contexts(vec![worker_node.id])
100 .await
101 .unwrap_or_else(|err| {
102 panic!(
103 "Failed to release hummock context {}, error={}",
104 worker_node.id,
105 err.as_report()
106 )
107 });
108 tracing::info!("Released hummock context {}", worker_node.id);
109 sync_point!("AFTER_RELEASE_HUMMOCK_CONTEXTS_ASYNC");
110 }
111 }
112}