risingwave_meta/hummock/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod compaction;
16pub mod compactor_manager;
17pub mod error;
18mod manager;
19
20pub use manager::*;
21use thiserror_ext::AsReport;
22
23mod level_handler;
24mod metrics_utils;
25#[cfg(any(test, feature = "test"))]
26pub mod mock_hummock_meta_client;
27pub mod model;
28pub mod test_utils;
29use std::time::Duration;
30
31pub use compactor_manager::*;
32use futures::future::BoxFuture;
33#[cfg(any(test, feature = "test"))]
34pub use mock_hummock_meta_client::MockHummockMetaClient;
35use tokio::sync::oneshot::Sender;
36use tokio::task::JoinHandle;
37
38use crate::MetaOpts;
39use crate::backup_restore::BackupManagerRef;
40
41/// Start hummock's asynchronous tasks.
42pub fn start_hummock_workers(
43    hummock_manager: HummockManagerRef,
44    backup_manager: BackupManagerRef,
45    meta_opts: &MetaOpts,
46    should_pause_vacuum_time_travel: Box<dyn Fn() -> BoxFuture<'static, bool> + Send>,
47) -> Vec<(JoinHandle<()>, Sender<()>)> {
48    // These critical tasks are put in their own timer loop deliberately, to avoid long-running ones
49    // from blocking others.
50    let workers = vec![
51        start_checkpoint_loop(
52            hummock_manager.clone(),
53            backup_manager,
54            Duration::from_secs(meta_opts.hummock_version_checkpoint_interval_sec),
55            meta_opts.min_delta_log_num_for_hummock_version_checkpoint,
56        ),
57        start_vacuum_metadata_loop(
58            hummock_manager.clone(),
59            Duration::from_secs(meta_opts.vacuum_interval_sec),
60        ),
61        start_vacuum_time_travel_metadata_loop(
62            hummock_manager,
63            Duration::from_secs(meta_opts.time_travel_vacuum_interval_sec),
64            should_pause_vacuum_time_travel,
65        ),
66    ];
67    workers
68}
69
70/// Starts a task to periodically vacuum stale metadata.
71pub fn start_vacuum_metadata_loop(
72    hummock_manager: HummockManagerRef,
73    interval: Duration,
74) -> (JoinHandle<()>, Sender<()>) {
75    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
76    let join_handle = tokio::spawn(async move {
77        let mut min_trigger_interval = tokio::time::interval(interval);
78        min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
79        loop {
80            tokio::select! {
81                // Wait for interval
82                _ = min_trigger_interval.tick() => {},
83                // Shutdown vacuum
84                _ = &mut shutdown_rx => {
85                    tracing::info!("Vacuum metadata loop is stopped");
86                    return;
87                }
88            }
89            if let Err(err) = hummock_manager.delete_version_deltas().await {
90                tracing::warn!(error = %err.as_report(), "Vacuum metadata error");
91            }
92        }
93    });
94    (join_handle, shutdown_tx)
95}
96
97pub fn start_vacuum_time_travel_metadata_loop(
98    hummock_manager: HummockManagerRef,
99    interval: Duration,
100    should_pause_vacuum_time_travel: Box<dyn Fn() -> BoxFuture<'static, bool> + Send>,
101) -> (JoinHandle<()>, Sender<()>) {
102    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
103    let join_handle = tokio::spawn(async move {
104        let mut min_trigger_interval = tokio::time::interval(interval);
105        min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
106        loop {
107            tokio::select! {
108                // Wait for interval
109                _ = min_trigger_interval.tick() => {},
110                // Shutdown vacuum
111                _ = &mut shutdown_rx => {
112                    tracing::info!("Vacuum time travel metadata loop is stopped");
113                    return;
114                }
115            }
116            if should_pause_vacuum_time_travel().await {
117                tracing::warn!("time travel vacuum paused");
118                continue;
119            }
120            if let Err(err) = hummock_manager.delete_time_travel_metadata().await {
121                tracing::warn!(error = %err.as_report(), "Vacuum time travel metadata error");
122            }
123        }
124    });
125    (join_handle, shutdown_tx)
126}
127
128pub fn start_checkpoint_loop(
129    hummock_manager: HummockManagerRef,
130    backup_manager: BackupManagerRef,
131    interval: Duration,
132    min_delta_log_num: u64,
133) -> (JoinHandle<()>, Sender<()>) {
134    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
135    let join_handle = tokio::spawn(async move {
136        let mut min_trigger_interval = tokio::time::interval(interval);
137        min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
138        loop {
139            tokio::select! {
140                // Wait for interval
141                _ = min_trigger_interval.tick() => {},
142                // Shutdown checkpoint
143                _ = &mut shutdown_rx => {
144                    tracing::info!("Hummock version checkpoint is stopped");
145                    return;
146                }
147            }
148            if hummock_manager.is_version_checkpoint_paused()
149                || hummock_manager.env.opts.compaction_deterministic_test
150            {
151                continue;
152            }
153            match hummock_manager
154                .create_version_checkpoint(min_delta_log_num)
155                .await
156            {
157                Err(err) => {
158                    tracing::warn!(error = %err.as_report(), "Hummock version checkpoint error.");
159                }
160                _ => {
161                    let backup_manager_2 = backup_manager.clone();
162                    let hummock_manager_2 = hummock_manager.clone();
163                    tokio::task::spawn(async move {
164                        let _ = hummock_manager_2
165                            .try_start_minor_gc(backup_manager_2)
166                            .await
167                            .inspect_err(|err| {
168                                tracing::warn!(error = %err.as_report(), "Hummock minor GC error.");
169                            });
170                    });
171                }
172            }
173        }
174    });
175    (join_handle, shutdown_tx)
176}