risingwave_meta/hummock/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14pub mod compaction;
15pub mod compactor_manager;
16pub mod error;
17mod manager;
18
19pub use manager::*;
20use thiserror_ext::AsReport;
21
22mod level_handler;
23mod metrics_utils;
24#[cfg(any(test, feature = "test"))]
25pub mod mock_hummock_meta_client;
26pub mod model;
27pub mod test_utils;
28use std::time::Duration;
29
30pub use compactor_manager::*;
31#[cfg(any(test, feature = "test"))]
32pub use mock_hummock_meta_client::MockHummockMetaClient;
33use tokio::sync::oneshot::Sender;
34use tokio::task::JoinHandle;
35
36use crate::MetaOpts;
37use crate::backup_restore::BackupManagerRef;
38
39/// Start hummock's asynchronous tasks.
40pub fn start_hummock_workers(
41    hummock_manager: HummockManagerRef,
42    backup_manager: BackupManagerRef,
43    meta_opts: &MetaOpts,
44) -> Vec<(JoinHandle<()>, Sender<()>)> {
45    // These critical tasks are put in their own timer loop deliberately, to avoid long-running ones
46    // from blocking others.
47    let workers = vec![
48        start_checkpoint_loop(
49            hummock_manager.clone(),
50            backup_manager,
51            Duration::from_secs(meta_opts.hummock_version_checkpoint_interval_sec),
52            meta_opts.min_delta_log_num_for_hummock_version_checkpoint,
53        ),
54        start_vacuum_metadata_loop(
55            hummock_manager.clone(),
56            Duration::from_secs(meta_opts.vacuum_interval_sec),
57        ),
58        start_vacuum_time_travel_metadata_loop(
59            hummock_manager.clone(),
60            Duration::from_secs(meta_opts.time_travel_vacuum_interval_sec),
61        ),
62    ];
63    workers
64}
65
66/// Starts a task to periodically vacuum stale metadata.
67pub fn start_vacuum_metadata_loop(
68    hummock_manager: HummockManagerRef,
69    interval: Duration,
70) -> (JoinHandle<()>, Sender<()>) {
71    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
72    let join_handle = tokio::spawn(async move {
73        let mut min_trigger_interval = tokio::time::interval(interval);
74        min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
75        loop {
76            tokio::select! {
77                // Wait for interval
78                _ = min_trigger_interval.tick() => {},
79                // Shutdown vacuum
80                _ = &mut shutdown_rx => {
81                    tracing::info!("Vacuum metadata loop is stopped");
82                    return;
83                }
84            }
85            if let Err(err) = hummock_manager.delete_version_deltas().await {
86                tracing::warn!(error = %err.as_report(), "Vacuum metadata error");
87            }
88        }
89    });
90    (join_handle, shutdown_tx)
91}
92
93pub fn start_vacuum_time_travel_metadata_loop(
94    hummock_manager: HummockManagerRef,
95    interval: Duration,
96) -> (JoinHandle<()>, Sender<()>) {
97    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
98    let join_handle = tokio::spawn(async move {
99        let mut min_trigger_interval = tokio::time::interval(interval);
100        min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
101        loop {
102            tokio::select! {
103                // Wait for interval
104                _ = min_trigger_interval.tick() => {},
105                // Shutdown vacuum
106                _ = &mut shutdown_rx => {
107                    tracing::info!("Vacuum time travel metadata loop is stopped");
108                    return;
109                }
110            }
111            if let Err(err) = hummock_manager.delete_time_travel_metadata().await {
112                tracing::warn!(error = %err.as_report(), "Vacuum time travel metadata error");
113            }
114        }
115    });
116    (join_handle, shutdown_tx)
117}
118
119pub fn start_checkpoint_loop(
120    hummock_manager: HummockManagerRef,
121    backup_manager: BackupManagerRef,
122    interval: Duration,
123    min_delta_log_num: u64,
124) -> (JoinHandle<()>, Sender<()>) {
125    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
126    let join_handle = tokio::spawn(async move {
127        let mut min_trigger_interval = tokio::time::interval(interval);
128        min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
129        loop {
130            tokio::select! {
131                // Wait for interval
132                _ = min_trigger_interval.tick() => {},
133                // Shutdown checkpoint
134                _ = &mut shutdown_rx => {
135                    tracing::info!("Hummock version checkpoint is stopped");
136                    return;
137                }
138            }
139            if hummock_manager.is_version_checkpoint_paused()
140                || hummock_manager.env.opts.compaction_deterministic_test
141            {
142                continue;
143            }
144            match hummock_manager
145                .create_version_checkpoint(min_delta_log_num)
146                .await
147            {
148                Err(err) => {
149                    tracing::warn!(error = %err.as_report(), "Hummock version checkpoint error.");
150                }
151                _ => {
152                    let backup_manager_2 = backup_manager.clone();
153                    let hummock_manager_2 = hummock_manager.clone();
154                    tokio::task::spawn(async move {
155                        let _ = hummock_manager_2
156                            .try_start_minor_gc(backup_manager_2)
157                            .await
158                            .inspect_err(|err| {
159                                tracing::warn!(error = %err.as_report(), "Hummock minor GC error.");
160                            });
161                    });
162                }
163            }
164        }
165    });
166    (join_handle, shutdown_tx)
167}