risingwave_meta/hummock/manager/
worker.rs1use risingwave_common::system_param::reader::SystemParamsRead;
16use risingwave_hummock_sdk::HummockVersionId;
17use risingwave_meta_model::ObjectId;
18use risingwave_object_store::object::{ObjectResult, ObjectStoreRef};
19use sync_point::sync_point;
20use thiserror_ext::AsReport;
21use tokio::task::JoinHandle;
22
23use crate::hummock::{HummockManager, HummockManagerRef};
24use crate::manager::LocalNotification;
25
26pub type HummockManagerEventSender = tokio::sync::mpsc::UnboundedSender<HummockManagerEvent>;
27pub type HummockManagerEventReceiver = tokio::sync::mpsc::UnboundedReceiver<HummockManagerEvent>;
28
29pub enum HummockManagerEvent {
30 DropSafePoint(HummockVersionId),
31 #[allow(dead_code)]
32 Shutdown,
33}
34
35impl HummockManager {
36 pub fn start_worker(
37 self: &HummockManagerRef,
38 mut receiver: HummockManagerEventReceiver,
39 ) -> JoinHandle<()> {
40 let (local_notification_tx, mut local_notification_rx) =
41 tokio::sync::mpsc::unbounded_channel();
42 self.env
43 .notification_manager()
44 .insert_local_sender(local_notification_tx);
45 let hummock_manager = self.clone();
46 tokio::spawn(async move {
47 loop {
48 tokio::select! {
49 notification = local_notification_rx.recv() => {
50 match notification {
51 Some(notification) => {
52 hummock_manager
53 .handle_local_notification(notification)
54 .await;
55 }
56 None => {
57 return;
58 }
59 }
60 }
61 hummock_manager_event = receiver.recv() => {
62 match hummock_manager_event {
63 Some(hummock_manager_event) => {
64 if !hummock_manager
65 .handle_hummock_manager_event(hummock_manager_event)
66 .await {
67 return;
68 }
69 }
70 None => {
71 return;
72 }
73 }
74 }
75 }
76 }
77 })
78 }
79
80 async fn handle_hummock_manager_event(&self, event: HummockManagerEvent) -> bool {
82 match event {
83 HummockManagerEvent::DropSafePoint(id) => {
84 self.unregister_safe_point(id).await;
85 sync_point!("UNREGISTER_HUMMOCK_VERSION_SAFE_POINT");
86 }
87 HummockManagerEvent::Shutdown => {
88 tracing::info!("Hummock manager worker is stopped");
89 return false;
90 }
91 }
92 true
93 }
94
95 async fn handle_local_notification(&self, notification: LocalNotification) {
96 match notification {
97 LocalNotification::WorkerNodeDeleted(worker_node) => {
98 self.release_contexts(vec![worker_node.id])
99 .await
100 .unwrap_or_else(|err| {
101 panic!(
102 "Failed to release hummock context {}, error={}",
103 worker_node.id,
104 err.as_report()
105 )
106 });
107 tracing::info!("Released hummock context {}", worker_node.id);
108 sync_point!("AFTER_RELEASE_HUMMOCK_CONTEXTS_ASYNC");
109 }
110 LocalNotification::SourceDropped(source_id) => {
111 let object_store = self.object_store.clone();
112 let sys_params = self.env.system_params_reader().await;
113 let data_directory = sys_params.data_directory().to_owned();
114 tokio::spawn(async move {
117 if let Err(e) = try_clean_up_cdc_source_schema_history(
118 source_id,
119 object_store,
120 data_directory,
121 )
122 .await
123 {
124 use thiserror_ext::AsReport;
125 tracing::error!(
126 "Failed to clean up cdc source {source_id} schema history: {}.",
127 e.as_report()
128 )
129 }
130 });
131 }
132 _ => {}
133 }
134 }
135}
136
137async fn try_clean_up_cdc_source_schema_history(
138 source_id: ObjectId,
139 object_store: ObjectStoreRef,
140 data_directory: String,
141) -> ObjectResult<()> {
142 let object_dir: String = format!("{data_directory}/rw-cdc-schema-history/source-{source_id}");
144 let mut keys: Vec<String> = Vec::new();
145 let mut stream = object_store.list(&object_dir, None, None).await?;
146 use futures::StreamExt;
147 while let Some(obj) = stream.next().await {
148 let obj = obj?;
149 keys.push(obj.key);
150 }
151 tracing::debug!(?keys, "Deleting schema history files");
152 object_store.delete_objects(&keys).await?;
153 Ok(())
154}