risingwave_meta/hummock/
mod.rs1pub mod compaction;
15pub mod compactor_manager;
16pub mod error;
17mod manager;
18
19pub use manager::*;
20use thiserror_ext::AsReport;
21
22mod level_handler;
23mod metrics_utils;
24#[cfg(any(test, feature = "test"))]
25pub mod mock_hummock_meta_client;
26pub mod model;
27pub mod test_utils;
28mod utils;
29use std::time::Duration;
30
31pub use compactor_manager::*;
32#[cfg(any(test, feature = "test"))]
33pub use mock_hummock_meta_client::MockHummockMetaClient;
34use tokio::sync::oneshot::Sender;
35use tokio::task::JoinHandle;
36
37use crate::MetaOpts;
38use crate::backup_restore::BackupManagerRef;
39
40pub fn start_hummock_workers(
42 hummock_manager: HummockManagerRef,
43 backup_manager: BackupManagerRef,
44 meta_opts: &MetaOpts,
45) -> Vec<(JoinHandle<()>, Sender<()>)> {
46 let workers = vec![
49 start_checkpoint_loop(
50 hummock_manager.clone(),
51 backup_manager,
52 Duration::from_secs(meta_opts.hummock_version_checkpoint_interval_sec),
53 meta_opts.min_delta_log_num_for_hummock_version_checkpoint,
54 ),
55 start_vacuum_metadata_loop(
56 hummock_manager.clone(),
57 Duration::from_secs(meta_opts.vacuum_interval_sec),
58 ),
59 start_vacuum_time_travel_metadata_loop(
60 hummock_manager.clone(),
61 Duration::from_secs(meta_opts.time_travel_vacuum_interval_sec),
62 ),
63 ];
64 workers
65}
66
67pub fn start_vacuum_metadata_loop(
69 hummock_manager: HummockManagerRef,
70 interval: Duration,
71) -> (JoinHandle<()>, Sender<()>) {
72 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
73 let join_handle = tokio::spawn(async move {
74 let mut min_trigger_interval = tokio::time::interval(interval);
75 min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
76 loop {
77 tokio::select! {
78 _ = min_trigger_interval.tick() => {},
80 _ = &mut shutdown_rx => {
82 tracing::info!("Vacuum metadata loop is stopped");
83 return;
84 }
85 }
86 if let Err(err) = hummock_manager.delete_version_deltas().await {
87 tracing::warn!(error = %err.as_report(), "Vacuum metadata error");
88 }
89 }
90 });
91 (join_handle, shutdown_tx)
92}
93
94pub fn start_vacuum_time_travel_metadata_loop(
95 hummock_manager: HummockManagerRef,
96 interval: Duration,
97) -> (JoinHandle<()>, Sender<()>) {
98 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
99 let join_handle = tokio::spawn(async move {
100 let mut min_trigger_interval = tokio::time::interval(interval);
101 min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
102 loop {
103 tokio::select! {
104 _ = min_trigger_interval.tick() => {},
106 _ = &mut shutdown_rx => {
108 tracing::info!("Vacuum time travel metadata loop is stopped");
109 return;
110 }
111 }
112 if let Err(err) = hummock_manager.delete_time_travel_metadata().await {
113 tracing::warn!(error = %err.as_report(), "Vacuum time travel metadata error");
114 }
115 }
116 });
117 (join_handle, shutdown_tx)
118}
119
120pub fn start_checkpoint_loop(
121 hummock_manager: HummockManagerRef,
122 backup_manager: BackupManagerRef,
123 interval: Duration,
124 min_delta_log_num: u64,
125) -> (JoinHandle<()>, Sender<()>) {
126 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
127 let join_handle = tokio::spawn(async move {
128 let mut min_trigger_interval = tokio::time::interval(interval);
129 min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
130 loop {
131 tokio::select! {
132 _ = min_trigger_interval.tick() => {},
134 _ = &mut shutdown_rx => {
136 tracing::info!("Hummock version checkpoint is stopped");
137 return;
138 }
139 }
140 if hummock_manager.is_version_checkpoint_paused()
141 || hummock_manager.env.opts.compaction_deterministic_test
142 {
143 continue;
144 }
145 match hummock_manager
146 .create_version_checkpoint(min_delta_log_num)
147 .await
148 {
149 Err(err) => {
150 tracing::warn!(error = %err.as_report(), "Hummock version checkpoint error.");
151 }
152 _ => {
153 let backup_manager_2 = backup_manager.clone();
154 let hummock_manager_2 = hummock_manager.clone();
155 tokio::task::spawn(async move {
156 let _ = hummock_manager_2
157 .try_start_minor_gc(backup_manager_2)
158 .await
159 .inspect_err(|err| {
160 tracing::warn!(error = %err.as_report(), "Hummock minor GC error.");
161 });
162 });
163 }
164 }
165 }
166 });
167 (join_handle, shutdown_tx)
168}