risingwave_meta/hummock/
mod.rs1pub mod compaction;
16pub mod compactor_manager;
17pub mod error;
18mod manager;
19
20pub use manager::*;
21use thiserror_ext::AsReport;
22
23mod level_handler;
24mod metrics_utils;
25#[cfg(any(test, feature = "test"))]
26pub mod mock_hummock_meta_client;
27pub mod model;
28pub mod test_utils;
29use std::time::Duration;
30
31pub use compactor_manager::*;
32use futures::future::BoxFuture;
33#[cfg(any(test, feature = "test"))]
34pub use mock_hummock_meta_client::MockHummockMetaClient;
35use tokio::sync::oneshot::Sender;
36use tokio::task::JoinHandle;
37
38use crate::MetaOpts;
39use crate::backup_restore::BackupManagerRef;
40
41pub fn start_hummock_workers(
43 hummock_manager: HummockManagerRef,
44 backup_manager: BackupManagerRef,
45 meta_opts: &MetaOpts,
46 should_pause_vacuum_time_travel: Box<dyn Fn() -> BoxFuture<'static, bool> + Send>,
47) -> Vec<(JoinHandle<()>, Sender<()>)> {
48 let workers = vec![
51 start_checkpoint_loop(
52 hummock_manager.clone(),
53 backup_manager,
54 Duration::from_secs(meta_opts.hummock_version_checkpoint_interval_sec),
55 meta_opts.min_delta_log_num_for_hummock_version_checkpoint,
56 ),
57 start_vacuum_metadata_loop(
58 hummock_manager.clone(),
59 Duration::from_secs(meta_opts.vacuum_interval_sec),
60 ),
61 start_vacuum_time_travel_metadata_loop(
62 hummock_manager,
63 Duration::from_secs(meta_opts.time_travel_vacuum_interval_sec),
64 should_pause_vacuum_time_travel,
65 ),
66 ];
67 workers
68}
69
70pub fn start_vacuum_metadata_loop(
72 hummock_manager: HummockManagerRef,
73 interval: Duration,
74) -> (JoinHandle<()>, Sender<()>) {
75 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
76 let join_handle = tokio::spawn(async move {
77 let mut min_trigger_interval = tokio::time::interval(interval);
78 min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
79 loop {
80 tokio::select! {
81 _ = min_trigger_interval.tick() => {},
83 _ = &mut shutdown_rx => {
85 tracing::info!("Vacuum metadata loop is stopped");
86 return;
87 }
88 }
89 if let Err(err) = hummock_manager.delete_version_deltas().await {
90 tracing::warn!(error = %err.as_report(), "Vacuum metadata error");
91 }
92 }
93 });
94 (join_handle, shutdown_tx)
95}
96
97pub fn start_vacuum_time_travel_metadata_loop(
98 hummock_manager: HummockManagerRef,
99 interval: Duration,
100 should_pause_vacuum_time_travel: Box<dyn Fn() -> BoxFuture<'static, bool> + Send>,
101) -> (JoinHandle<()>, Sender<()>) {
102 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
103 let join_handle = tokio::spawn(async move {
104 let mut min_trigger_interval = tokio::time::interval(interval);
105 min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
106 loop {
107 tokio::select! {
108 _ = min_trigger_interval.tick() => {},
110 _ = &mut shutdown_rx => {
112 tracing::info!("Vacuum time travel metadata loop is stopped");
113 return;
114 }
115 }
116 if should_pause_vacuum_time_travel().await {
117 tracing::warn!("time travel vacuum paused");
118 continue;
119 }
120 if let Err(err) = hummock_manager.delete_time_travel_metadata().await {
121 tracing::warn!(error = %err.as_report(), "Vacuum time travel metadata error");
122 }
123 }
124 });
125 (join_handle, shutdown_tx)
126}
127
128pub fn start_checkpoint_loop(
129 hummock_manager: HummockManagerRef,
130 backup_manager: BackupManagerRef,
131 interval: Duration,
132 min_delta_log_num: u64,
133) -> (JoinHandle<()>, Sender<()>) {
134 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
135 let join_handle = tokio::spawn(async move {
136 let mut min_trigger_interval = tokio::time::interval(interval);
137 min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
138 loop {
139 tokio::select! {
140 _ = min_trigger_interval.tick() => {},
142 _ = &mut shutdown_rx => {
144 tracing::info!("Hummock version checkpoint is stopped");
145 return;
146 }
147 }
148 if hummock_manager.is_version_checkpoint_paused()
149 || hummock_manager.env.opts.compaction_deterministic_test
150 {
151 continue;
152 }
153 match hummock_manager
154 .create_version_checkpoint(min_delta_log_num)
155 .await
156 {
157 Err(err) => {
158 tracing::warn!(error = %err.as_report(), "Hummock version checkpoint error.");
159 }
160 _ => {
161 let backup_manager_2 = backup_manager.clone();
162 let hummock_manager_2 = hummock_manager.clone();
163 tokio::task::spawn(async move {
164 let _ = hummock_manager_2
165 .try_start_minor_gc(backup_manager_2)
166 .await
167 .inspect_err(|err| {
168 tracing::warn!(error = %err.as_report(), "Hummock minor GC error.");
169 });
170 });
171 }
172 }
173 }
174 });
175 (join_handle, shutdown_tx)
176}