risingwave_meta/hummock/manager/
worker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::system_param::reader::SystemParamsRead;
16use risingwave_hummock_sdk::HummockVersionId;
17use risingwave_meta_model::ObjectId;
18use risingwave_object_store::object::{ObjectResult, ObjectStoreRef};
19use sync_point::sync_point;
20use thiserror_ext::AsReport;
21use tokio::task::JoinHandle;
22
23use crate::hummock::{HummockManager, HummockManagerRef};
24use crate::manager::LocalNotification;
25
26pub type HummockManagerEventSender = tokio::sync::mpsc::UnboundedSender<HummockManagerEvent>;
27pub type HummockManagerEventReceiver = tokio::sync::mpsc::UnboundedReceiver<HummockManagerEvent>;
28
29pub enum HummockManagerEvent {
30    DropSafePoint(HummockVersionId),
31    #[allow(dead_code)]
32    Shutdown,
33}
34
35impl HummockManager {
36    pub fn start_worker(
37        self: &HummockManagerRef,
38        mut receiver: HummockManagerEventReceiver,
39    ) -> JoinHandle<()> {
40        let (local_notification_tx, mut local_notification_rx) =
41            tokio::sync::mpsc::unbounded_channel();
42        self.env
43            .notification_manager()
44            .insert_local_sender(local_notification_tx);
45        let hummock_manager = self.clone();
46        tokio::spawn(async move {
47            loop {
48                tokio::select! {
49                    notification = local_notification_rx.recv() => {
50                        match notification {
51                            Some(notification) => {
52                                hummock_manager
53                                    .handle_local_notification(notification)
54                                    .await;
55                            }
56                            None => {
57                                return;
58                            }
59                        }
60                    }
61                    hummock_manager_event = receiver.recv() => {
62                        match hummock_manager_event {
63                            Some(hummock_manager_event) => {
64                                if !hummock_manager
65                                    .handle_hummock_manager_event(hummock_manager_event)
66                                    .await {
67                                    return;
68                                }
69                            }
70                            None => {
71                                return;
72                            }
73                        }
74                    }
75                }
76            }
77        })
78    }
79
80    /// Returns false indicates to shutdown worker
81    async fn handle_hummock_manager_event(&self, event: HummockManagerEvent) -> bool {
82        match event {
83            HummockManagerEvent::DropSafePoint(id) => {
84                self.unregister_safe_point(id).await;
85                sync_point!("UNREGISTER_HUMMOCK_VERSION_SAFE_POINT");
86            }
87            HummockManagerEvent::Shutdown => {
88                tracing::info!("Hummock manager worker is stopped");
89                return false;
90            }
91        }
92        true
93    }
94
95    async fn handle_local_notification(&self, notification: LocalNotification) {
96        match notification {
97            LocalNotification::WorkerNodeDeleted(worker_node) => {
98                self.release_contexts(vec![worker_node.id])
99                    .await
100                    .unwrap_or_else(|err| {
101                        panic!(
102                            "Failed to release hummock context {}, error={}",
103                            worker_node.id,
104                            err.as_report()
105                        )
106                    });
107                tracing::info!("Released hummock context {}", worker_node.id);
108                sync_point!("AFTER_RELEASE_HUMMOCK_CONTEXTS_ASYNC");
109            }
110            LocalNotification::SourceDropped(source_id) => {
111                let object_store = self.object_store.clone();
112                let sys_params = self.env.system_params_reader().await;
113                let data_directory = sys_params.data_directory().to_owned();
114                // Best effort.
115                // The source_id may not belong to a CDC source, in which case this is a no-op.
116                tokio::spawn(async move {
117                    if let Err(e) = try_clean_up_cdc_source_schema_history(
118                        source_id,
119                        object_store,
120                        data_directory,
121                    )
122                    .await
123                    {
124                        use thiserror_ext::AsReport;
125                        tracing::error!(
126                            "Failed to clean up cdc source {source_id} schema history: {}.",
127                            e.as_report()
128                        )
129                    }
130                });
131            }
132            _ => {}
133        }
134    }
135}
136
137async fn try_clean_up_cdc_source_schema_history(
138    source_id: ObjectId,
139    object_store: ObjectStoreRef,
140    data_directory: String,
141) -> ObjectResult<()> {
142    // The path should follow that defined in OpenDalSchemaHIstory.java prefixed by data_directory.
143    let object_dir: String = format!("{data_directory}/rw-cdc-schema-history/source-{source_id}");
144    let mut keys: Vec<String> = Vec::new();
145    let mut stream = object_store.list(&object_dir, None, None).await?;
146    use futures::StreamExt;
147    while let Some(obj) = stream.next().await {
148        let obj = obj?;
149        keys.push(obj.key);
150    }
151    tracing::debug!(?keys, "Deleting schema history files");
152    object_store.delete_objects(&keys).await?;
153    Ok(())
154}