risingwave_meta/hummock/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14pub mod compaction;
15pub mod compactor_manager;
16pub mod error;
17mod manager;
18
19pub use manager::*;
20use thiserror_ext::AsReport;
21
22mod level_handler;
23mod metrics_utils;
24#[cfg(any(test, feature = "test"))]
25pub mod mock_hummock_meta_client;
26pub mod model;
27pub mod test_utils;
28mod utils;
29use std::time::Duration;
30
31pub use compactor_manager::*;
32#[cfg(any(test, feature = "test"))]
33pub use mock_hummock_meta_client::MockHummockMetaClient;
34use tokio::sync::oneshot::Sender;
35use tokio::task::JoinHandle;
36
37use crate::MetaOpts;
38use crate::backup_restore::BackupManagerRef;
39
40/// Start hummock's asynchronous tasks.
41pub fn start_hummock_workers(
42    hummock_manager: HummockManagerRef,
43    backup_manager: BackupManagerRef,
44    meta_opts: &MetaOpts,
45) -> Vec<(JoinHandle<()>, Sender<()>)> {
46    // These critical tasks are put in their own timer loop deliberately, to avoid long-running ones
47    // from blocking others.
48    let workers = vec![
49        start_checkpoint_loop(
50            hummock_manager.clone(),
51            backup_manager,
52            Duration::from_secs(meta_opts.hummock_version_checkpoint_interval_sec),
53            meta_opts.min_delta_log_num_for_hummock_version_checkpoint,
54        ),
55        start_vacuum_metadata_loop(
56            hummock_manager.clone(),
57            Duration::from_secs(meta_opts.vacuum_interval_sec),
58        ),
59        start_vacuum_time_travel_metadata_loop(
60            hummock_manager.clone(),
61            Duration::from_secs(meta_opts.time_travel_vacuum_interval_sec),
62        ),
63    ];
64    workers
65}
66
67/// Starts a task to periodically vacuum stale metadata.
68pub fn start_vacuum_metadata_loop(
69    hummock_manager: HummockManagerRef,
70    interval: Duration,
71) -> (JoinHandle<()>, Sender<()>) {
72    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
73    let join_handle = tokio::spawn(async move {
74        let mut min_trigger_interval = tokio::time::interval(interval);
75        min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
76        loop {
77            tokio::select! {
78                // Wait for interval
79                _ = min_trigger_interval.tick() => {},
80                // Shutdown vacuum
81                _ = &mut shutdown_rx => {
82                    tracing::info!("Vacuum metadata loop is stopped");
83                    return;
84                }
85            }
86            if let Err(err) = hummock_manager.delete_version_deltas().await {
87                tracing::warn!(error = %err.as_report(), "Vacuum metadata error");
88            }
89        }
90    });
91    (join_handle, shutdown_tx)
92}
93
94pub fn start_vacuum_time_travel_metadata_loop(
95    hummock_manager: HummockManagerRef,
96    interval: Duration,
97) -> (JoinHandle<()>, Sender<()>) {
98    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
99    let join_handle = tokio::spawn(async move {
100        let mut min_trigger_interval = tokio::time::interval(interval);
101        min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
102        loop {
103            tokio::select! {
104                // Wait for interval
105                _ = min_trigger_interval.tick() => {},
106                // Shutdown vacuum
107                _ = &mut shutdown_rx => {
108                    tracing::info!("Vacuum time travel metadata loop is stopped");
109                    return;
110                }
111            }
112            if let Err(err) = hummock_manager.delete_time_travel_metadata().await {
113                tracing::warn!(error = %err.as_report(), "Vacuum time travel metadata error");
114            }
115        }
116    });
117    (join_handle, shutdown_tx)
118}
119
120pub fn start_checkpoint_loop(
121    hummock_manager: HummockManagerRef,
122    backup_manager: BackupManagerRef,
123    interval: Duration,
124    min_delta_log_num: u64,
125) -> (JoinHandle<()>, Sender<()>) {
126    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
127    let join_handle = tokio::spawn(async move {
128        let mut min_trigger_interval = tokio::time::interval(interval);
129        min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
130        loop {
131            tokio::select! {
132                // Wait for interval
133                _ = min_trigger_interval.tick() => {},
134                // Shutdown checkpoint
135                _ = &mut shutdown_rx => {
136                    tracing::info!("Hummock version checkpoint is stopped");
137                    return;
138                }
139            }
140            if hummock_manager.is_version_checkpoint_paused()
141                || hummock_manager.env.opts.compaction_deterministic_test
142            {
143                continue;
144            }
145            match hummock_manager
146                .create_version_checkpoint(min_delta_log_num)
147                .await
148            {
149                Err(err) => {
150                    tracing::warn!(error = %err.as_report(), "Hummock version checkpoint error.");
151                }
152                _ => {
153                    let backup_manager_2 = backup_manager.clone();
154                    let hummock_manager_2 = hummock_manager.clone();
155                    tokio::task::spawn(async move {
156                        let _ = hummock_manager_2
157                            .try_start_minor_gc(backup_manager_2)
158                            .await
159                            .inspect_err(|err| {
160                                tracing::warn!(error = %err.as_report(), "Hummock minor GC error.");
161                            });
162                    });
163                }
164            }
165        }
166    });
167    (join_handle, shutdown_tx)
168}