risingwave_meta/hummock/manager/
checkpoint.rs1use std::collections::HashMap;
16use std::ops::Bound::{Excluded, Included};
17use std::ops::{Deref, DerefMut};
18use std::sync::atomic::Ordering;
19
20use risingwave_hummock_sdk::HummockVersionId;
21use risingwave_hummock_sdk::compaction_group::hummock_version_ext::object_size_map;
22use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion};
23use risingwave_pb::hummock::hummock_version_checkpoint::{PbStaleObjects, StaleObjects};
24use risingwave_pb::hummock::{
25 PbHummockVersion, PbHummockVersionArchive, PbHummockVersionCheckpoint,
26};
27use thiserror_ext::AsReport;
28use tracing::warn;
29
30use crate::hummock::HummockManager;
31use crate::hummock::error::Result;
32use crate::hummock::manager::versioning::Versioning;
33use crate::hummock::metrics_utils::{trigger_gc_stat, trigger_split_stat};
34
35#[derive(Default)]
36pub struct HummockVersionCheckpoint {
37 pub version: HummockVersion,
38
39 pub stale_objects: HashMap<HummockVersionId, PbStaleObjects>,
45}
46
47impl HummockVersionCheckpoint {
48 pub fn from_protobuf(checkpoint: &PbHummockVersionCheckpoint) -> Self {
49 Self {
50 version: HummockVersion::from_persisted_protobuf(checkpoint.version.as_ref().unwrap()),
51 stale_objects: checkpoint
52 .stale_objects
53 .iter()
54 .map(|(version_id, objects)| (HummockVersionId::new(*version_id), objects.clone()))
55 .collect(),
56 }
57 }
58
59 pub fn to_protobuf(&self) -> PbHummockVersionCheckpoint {
60 PbHummockVersionCheckpoint {
61 version: Some(PbHummockVersion::from(&self.version)),
62 stale_objects: self
63 .stale_objects
64 .iter()
65 .map(|(version_id, objects)| (version_id.to_u64(), objects.clone()))
66 .collect(),
67 }
68 }
69}
70
71impl HummockManager {
74 pub async fn try_read_checkpoint(&self) -> Result<Option<HummockVersionCheckpoint>> {
76 use prost::Message;
77 let data = match self
78 .object_store
79 .read(&self.version_checkpoint_path, ..)
80 .await
81 {
82 Ok(data) => data,
83 Err(e) => {
84 if e.is_object_not_found_error() {
85 return Ok(None);
86 }
87 return Err(e.into());
88 }
89 };
90 let ckpt = PbHummockVersionCheckpoint::decode(data).map_err(|e| anyhow::anyhow!(e))?;
91 Ok(Some(HummockVersionCheckpoint::from_protobuf(&ckpt)))
92 }
93
94 pub(super) async fn write_checkpoint(
95 &self,
96 checkpoint: &HummockVersionCheckpoint,
97 ) -> Result<()> {
98 use prost::Message;
99 let buf = checkpoint.to_protobuf().encode_to_vec();
100 self.object_store
101 .upload(&self.version_checkpoint_path, buf.into())
102 .await?;
103 Ok(())
104 }
105
106 pub(super) async fn write_version_archive(
107 &self,
108 archive: &PbHummockVersionArchive,
109 ) -> Result<()> {
110 use prost::Message;
111 let buf = archive.encode_to_vec();
112 let archive_path = format!(
113 "{}/{}",
114 self.version_archive_dir,
115 archive.version.as_ref().unwrap().id
116 );
117 self.object_store.upload(&archive_path, buf.into()).await?;
118 Ok(())
119 }
120
121 pub async fn create_version_checkpoint(&self, min_delta_log_num: u64) -> Result<u64> {
126 let timer = self.metrics.version_checkpoint_latency.start_timer();
127 let versioning_guard = self.versioning.read().await;
129 let versioning: &Versioning = versioning_guard.deref();
130 let current_version: &HummockVersion = &versioning.current_version;
131 let old_checkpoint: &HummockVersionCheckpoint = &versioning.checkpoint;
132 let new_checkpoint_id = current_version.id;
133 let old_checkpoint_id = old_checkpoint.version.id;
134 if new_checkpoint_id < old_checkpoint_id + min_delta_log_num {
135 return Ok(0);
136 }
137 if cfg!(test) && new_checkpoint_id == old_checkpoint_id {
138 drop(versioning_guard);
139 let versioning = self.versioning.read().await;
140 let context_info = self.context_info.read().await;
141 let min_pinned_version_id = context_info.min_pinned_version_id();
142 trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id);
143 return Ok(0);
144 }
145 assert!(new_checkpoint_id > old_checkpoint_id);
146 let mut archive: Option<PbHummockVersionArchive> = None;
147 let mut stale_objects = old_checkpoint.stale_objects.clone();
148 let mut object_sizes = object_size_map(&old_checkpoint.version);
150 let mut versions_object_ids = old_checkpoint.version.get_object_ids(false);
152 for (_, version_delta) in versioning
153 .hummock_version_deltas
154 .range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
155 {
156 for group_deltas in version_delta.group_deltas.values() {
157 object_sizes.extend(
158 group_deltas
159 .group_deltas
160 .iter()
161 .flat_map(|delta| {
162 match delta {
163 GroupDeltaCommon::IntraLevel(level_delta) => {
164 Some(level_delta.inserted_table_infos.iter())
165 }
166 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
167 Some(inserted_table_infos.iter())
168 }
169 GroupDeltaCommon::GroupConstruct(_)
170 | GroupDeltaCommon::GroupDestroy(_)
171 | GroupDeltaCommon::GroupMerge(_) => None,
172 }
173 .into_iter()
174 .flatten()
175 .map(|t| (t.object_id, t.file_size))
176 })
177 .chain(
178 version_delta
179 .change_log_delta
180 .values()
181 .flat_map(|change_log| {
182 let new_log = &change_log.new_log;
183 new_log
184 .new_value
185 .iter()
186 .chain(new_log.old_value.iter())
187 .map(|t| (t.object_id, t.file_size))
188 }),
189 ),
190 );
191 }
192 versions_object_ids.extend(version_delta.newly_added_object_ids(false));
193 }
194
195 let removed_object_ids = &versions_object_ids - ¤t_version.get_object_ids(false);
197 let total_file_size = removed_object_ids
198 .iter()
199 .map(|t| {
200 object_sizes.get(t).copied().unwrap_or_else(|| {
201 warn!(object_id = t, "unable to get size of removed object id");
202 0
203 })
204 })
205 .sum::<u64>();
206 stale_objects.insert(
207 current_version.id,
208 StaleObjects {
209 id: removed_object_ids.into_iter().collect(),
210 total_file_size,
211 },
212 );
213 if self.env.opts.enable_hummock_data_archive {
214 archive = Some(PbHummockVersionArchive {
215 version: Some(PbHummockVersion::from(&old_checkpoint.version)),
216 version_deltas: versioning
217 .hummock_version_deltas
218 .range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
219 .map(|(_, version_delta)| version_delta.into())
220 .collect(),
221 });
222 }
223 let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
224 let may_delete_object = stale_objects
225 .iter()
226 .filter_map(|(version_id, object_ids)| {
227 if *version_id >= min_pinned_version_id {
228 return None;
229 }
230 Some(object_ids.id.clone())
231 })
232 .flatten();
233 self.gc_manager.add_may_delete_object_ids(may_delete_object);
234 stale_objects.retain(|version_id, _| *version_id >= min_pinned_version_id);
235 let new_checkpoint = HummockVersionCheckpoint {
236 version: current_version.clone(),
237 stale_objects,
238 };
239 drop(versioning_guard);
240 self.write_checkpoint(&new_checkpoint).await?;
242 if let Some(archive) = archive {
243 if let Err(e) = self.write_version_archive(&archive).await {
244 tracing::warn!(
245 error = %e.as_report(),
246 "failed to write version archive {}",
247 archive.version.as_ref().unwrap().id
248 );
249 }
250 }
251 let mut versioning_guard = self.versioning.write().await;
253 let versioning = versioning_guard.deref_mut();
254 assert!(new_checkpoint.version.id > versioning.checkpoint.version.id);
255 versioning.checkpoint = new_checkpoint;
256 let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
257 trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id);
258 trigger_split_stat(&self.metrics, &versioning.current_version);
259 drop(versioning_guard);
260 timer.observe_duration();
261 self.metrics
262 .checkpoint_version_id
263 .set(new_checkpoint_id.to_u64() as i64);
264
265 Ok(new_checkpoint_id - old_checkpoint_id)
266 }
267
268 pub fn pause_version_checkpoint(&self) {
269 self.pause_version_checkpoint.store(true, Ordering::Relaxed);
270 tracing::info!("hummock version checkpoint is paused.");
271 }
272
273 pub fn resume_version_checkpoint(&self) {
274 self.pause_version_checkpoint
275 .store(false, Ordering::Relaxed);
276 tracing::info!("hummock version checkpoint is resumed.");
277 }
278
279 pub fn is_version_checkpoint_paused(&self) -> bool {
280 self.pause_version_checkpoint.load(Ordering::Relaxed)
281 }
282
283 pub async fn get_checkpoint_version(&self) -> HummockVersion {
284 let versioning_guard = self.versioning.read().await;
285 versioning_guard.checkpoint.version.clone()
286 }
287}