risingwave_meta/hummock/
mod.rs1pub mod compaction;
15pub mod compactor_manager;
16pub mod error;
17mod manager;
18
19pub use manager::*;
20use thiserror_ext::AsReport;
21
22mod level_handler;
23mod metrics_utils;
24#[cfg(any(test, feature = "test"))]
25pub mod mock_hummock_meta_client;
26pub mod model;
27pub mod test_utils;
28use std::time::Duration;
29
30pub use compactor_manager::*;
31#[cfg(any(test, feature = "test"))]
32pub use mock_hummock_meta_client::MockHummockMetaClient;
33use tokio::sync::oneshot::Sender;
34use tokio::task::JoinHandle;
35
36use crate::MetaOpts;
37use crate::backup_restore::BackupManagerRef;
38
39pub fn start_hummock_workers(
41 hummock_manager: HummockManagerRef,
42 backup_manager: BackupManagerRef,
43 meta_opts: &MetaOpts,
44) -> Vec<(JoinHandle<()>, Sender<()>)> {
45 let workers = vec![
48 start_checkpoint_loop(
49 hummock_manager.clone(),
50 backup_manager,
51 Duration::from_secs(meta_opts.hummock_version_checkpoint_interval_sec),
52 meta_opts.min_delta_log_num_for_hummock_version_checkpoint,
53 ),
54 start_vacuum_metadata_loop(
55 hummock_manager.clone(),
56 Duration::from_secs(meta_opts.vacuum_interval_sec),
57 ),
58 start_vacuum_time_travel_metadata_loop(
59 hummock_manager.clone(),
60 Duration::from_secs(meta_opts.time_travel_vacuum_interval_sec),
61 ),
62 ];
63 workers
64}
65
66pub fn start_vacuum_metadata_loop(
68 hummock_manager: HummockManagerRef,
69 interval: Duration,
70) -> (JoinHandle<()>, Sender<()>) {
71 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
72 let join_handle = tokio::spawn(async move {
73 let mut min_trigger_interval = tokio::time::interval(interval);
74 min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
75 loop {
76 tokio::select! {
77 _ = min_trigger_interval.tick() => {},
79 _ = &mut shutdown_rx => {
81 tracing::info!("Vacuum metadata loop is stopped");
82 return;
83 }
84 }
85 if let Err(err) = hummock_manager.delete_version_deltas().await {
86 tracing::warn!(error = %err.as_report(), "Vacuum metadata error");
87 }
88 }
89 });
90 (join_handle, shutdown_tx)
91}
92
93pub fn start_vacuum_time_travel_metadata_loop(
94 hummock_manager: HummockManagerRef,
95 interval: Duration,
96) -> (JoinHandle<()>, Sender<()>) {
97 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
98 let join_handle = tokio::spawn(async move {
99 let mut min_trigger_interval = tokio::time::interval(interval);
100 min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
101 loop {
102 tokio::select! {
103 _ = min_trigger_interval.tick() => {},
105 _ = &mut shutdown_rx => {
107 tracing::info!("Vacuum time travel metadata loop is stopped");
108 return;
109 }
110 }
111 if let Err(err) = hummock_manager.delete_time_travel_metadata().await {
112 tracing::warn!(error = %err.as_report(), "Vacuum time travel metadata error");
113 }
114 }
115 });
116 (join_handle, shutdown_tx)
117}
118
119pub fn start_checkpoint_loop(
120 hummock_manager: HummockManagerRef,
121 backup_manager: BackupManagerRef,
122 interval: Duration,
123 min_delta_log_num: u64,
124) -> (JoinHandle<()>, Sender<()>) {
125 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
126 let join_handle = tokio::spawn(async move {
127 let mut min_trigger_interval = tokio::time::interval(interval);
128 min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
129 loop {
130 tokio::select! {
131 _ = min_trigger_interval.tick() => {},
133 _ = &mut shutdown_rx => {
135 tracing::info!("Hummock version checkpoint is stopped");
136 return;
137 }
138 }
139 if hummock_manager.is_version_checkpoint_paused()
140 || hummock_manager.env.opts.compaction_deterministic_test
141 {
142 continue;
143 }
144 match hummock_manager
145 .create_version_checkpoint(min_delta_log_num)
146 .await
147 {
148 Err(err) => {
149 tracing::warn!(error = %err.as_report(), "Hummock version checkpoint error.");
150 }
151 _ => {
152 let backup_manager_2 = backup_manager.clone();
153 let hummock_manager_2 = hummock_manager.clone();
154 tokio::task::spawn(async move {
155 let _ = hummock_manager_2
156 .try_start_minor_gc(backup_manager_2)
157 .await
158 .inspect_err(|err| {
159 tracing::warn!(error = %err.as_report(), "Hummock minor GC error.");
160 });
161 });
162 }
163 }
164 }
165 });
166 (join_handle, shutdown_tx)
167}