risingwave_meta/hummock/manager/
checkpoint.rs1use std::collections::HashMap;
16use std::ops::Bound::{Excluded, Included};
17use std::ops::{Deref, DerefMut};
18use std::sync::atomic::Ordering;
19
20use risingwave_hummock_sdk::compaction_group::hummock_version_ext::object_size_map;
21use risingwave_hummock_sdk::version::HummockVersion;
22use risingwave_hummock_sdk::{HummockObjectId, HummockVersionId, get_stale_object_ids};
23use risingwave_pb::hummock::hummock_version_checkpoint::{PbStaleObjects, StaleObjects};
24use risingwave_pb::hummock::{
25 PbHummockVersion, PbHummockVersionArchive, PbHummockVersionCheckpoint,
26};
27use thiserror_ext::AsReport;
28use tracing::warn;
29
30use crate::hummock::HummockManager;
31use crate::hummock::error::Result;
32use crate::hummock::manager::versioning::Versioning;
33use crate::hummock::metrics_utils::{trigger_gc_stat, trigger_split_stat};
34
35#[derive(Default)]
36pub struct HummockVersionCheckpoint {
37 pub version: HummockVersion,
38
39 pub stale_objects: HashMap<HummockVersionId, PbStaleObjects>,
45}
46
47impl HummockVersionCheckpoint {
48 pub fn from_protobuf(checkpoint: &PbHummockVersionCheckpoint) -> Self {
49 Self {
50 version: HummockVersion::from_persisted_protobuf(checkpoint.version.as_ref().unwrap()),
51 stale_objects: checkpoint
52 .stale_objects
53 .iter()
54 .map(|(version_id, objects)| (HummockVersionId::new(*version_id), objects.clone()))
55 .collect(),
56 }
57 }
58
59 pub fn to_protobuf(&self) -> PbHummockVersionCheckpoint {
60 PbHummockVersionCheckpoint {
61 version: Some(PbHummockVersion::from(&self.version)),
62 stale_objects: self
63 .stale_objects
64 .iter()
65 .map(|(version_id, objects)| (version_id.to_u64(), objects.clone()))
66 .collect(),
67 }
68 }
69}
70
71impl HummockManager {
74 pub async fn try_read_checkpoint(&self) -> Result<Option<HummockVersionCheckpoint>> {
76 use prost::Message;
77 let data = match self
78 .object_store
79 .read(&self.version_checkpoint_path, ..)
80 .await
81 {
82 Ok(data) => data,
83 Err(e) => {
84 if e.is_object_not_found_error() {
85 return Ok(None);
86 }
87 return Err(e.into());
88 }
89 };
90 let ckpt = PbHummockVersionCheckpoint::decode(data).map_err(|e| anyhow::anyhow!(e))?;
91 Ok(Some(HummockVersionCheckpoint::from_protobuf(&ckpt)))
92 }
93
94 pub(super) async fn write_checkpoint(
95 &self,
96 checkpoint: &HummockVersionCheckpoint,
97 ) -> Result<()> {
98 use prost::Message;
99 let buf = checkpoint.to_protobuf().encode_to_vec();
100 self.object_store
101 .upload(&self.version_checkpoint_path, buf.into())
102 .await?;
103 Ok(())
104 }
105
106 pub(super) async fn write_version_archive(
107 &self,
108 archive: &PbHummockVersionArchive,
109 ) -> Result<()> {
110 use prost::Message;
111 let buf = archive.encode_to_vec();
112 let archive_path = format!(
113 "{}/{}",
114 self.version_archive_dir,
115 archive.version.as_ref().unwrap().id
116 );
117 self.object_store.upload(&archive_path, buf.into()).await?;
118 Ok(())
119 }
120
121 pub async fn create_version_checkpoint(&self, min_delta_log_num: u64) -> Result<u64> {
126 let timer = self.metrics.version_checkpoint_latency.start_timer();
127 let versioning_guard = self.versioning.read().await;
129 let versioning: &Versioning = versioning_guard.deref();
130 let current_version: &HummockVersion = &versioning.current_version;
131 let old_checkpoint: &HummockVersionCheckpoint = &versioning.checkpoint;
132 let new_checkpoint_id = current_version.id;
133 let old_checkpoint_id = old_checkpoint.version.id;
134 if new_checkpoint_id < old_checkpoint_id + min_delta_log_num {
135 return Ok(0);
136 }
137 if cfg!(test) && new_checkpoint_id == old_checkpoint_id {
138 drop(versioning_guard);
139 let versioning = self.versioning.read().await;
140 let context_info = self.context_info.read().await;
141 let min_pinned_version_id = context_info.min_pinned_version_id();
142 trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id);
143 return Ok(0);
144 }
145 assert!(new_checkpoint_id > old_checkpoint_id);
146 let mut archive: Option<PbHummockVersionArchive> = None;
147 let mut stale_objects = old_checkpoint.stale_objects.clone();
148 let mut object_sizes = object_size_map(&old_checkpoint.version);
150 let mut versions_object_ids = old_checkpoint.version.get_object_ids(false);
152 for (_, version_delta) in versioning
153 .hummock_version_deltas
154 .range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
155 {
156 match HummockObjectId::Sstable(0.into()) {
160 HummockObjectId::Sstable(_) => {}
161 };
162 for sst in version_delta.newly_added_sst_infos(false) {
163 let object_id = HummockObjectId::Sstable(sst.object_id);
164 object_sizes.insert(object_id, sst.file_size);
165 versions_object_ids.insert(object_id);
166 }
167 }
168
169 let removed_object_ids = &versions_object_ids - ¤t_version.get_object_ids(false);
171 let total_file_size = removed_object_ids
172 .iter()
173 .map(|t| {
174 object_sizes.get(t).copied().unwrap_or_else(|| {
175 warn!(object_id = ?t, "unable to get size of removed object id");
176 0
177 })
178 })
179 .sum::<u64>();
180 stale_objects.insert(
181 current_version.id,
182 StaleObjects {
183 id: removed_object_ids
184 .into_iter()
185 .map(|object_id| {
186 let HummockObjectId::Sstable(sst_id) = object_id;
187 sst_id.inner()
188 })
189 .collect(),
190 total_file_size,
191 },
192 );
193 if self.env.opts.enable_hummock_data_archive {
194 archive = Some(PbHummockVersionArchive {
195 version: Some(PbHummockVersion::from(&old_checkpoint.version)),
196 version_deltas: versioning
197 .hummock_version_deltas
198 .range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
199 .map(|(_, version_delta)| version_delta.into())
200 .collect(),
201 });
202 }
203 let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
204 let may_delete_object = stale_objects
205 .iter()
206 .filter_map(|(version_id, object_ids)| {
207 if *version_id >= min_pinned_version_id {
208 return None;
209 }
210 Some(get_stale_object_ids(object_ids))
211 })
212 .flatten();
213 self.gc_manager.add_may_delete_object_ids(may_delete_object);
214 stale_objects.retain(|version_id, _| *version_id >= min_pinned_version_id);
215 let new_checkpoint = HummockVersionCheckpoint {
216 version: current_version.clone(),
217 stale_objects,
218 };
219 drop(versioning_guard);
220 self.write_checkpoint(&new_checkpoint).await?;
222 if let Some(archive) = archive {
223 if let Err(e) = self.write_version_archive(&archive).await {
224 tracing::warn!(
225 error = %e.as_report(),
226 "failed to write version archive {}",
227 archive.version.as_ref().unwrap().id
228 );
229 }
230 }
231 let mut versioning_guard = self.versioning.write().await;
233 let versioning = versioning_guard.deref_mut();
234 assert!(new_checkpoint.version.id > versioning.checkpoint.version.id);
235 versioning.checkpoint = new_checkpoint;
236 let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
237 trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id);
238 trigger_split_stat(&self.metrics, &versioning.current_version);
239 drop(versioning_guard);
240 timer.observe_duration();
241 self.metrics
242 .checkpoint_version_id
243 .set(new_checkpoint_id.to_u64() as i64);
244
245 Ok(new_checkpoint_id - old_checkpoint_id)
246 }
247
248 pub fn pause_version_checkpoint(&self) {
249 self.pause_version_checkpoint.store(true, Ordering::Relaxed);
250 tracing::info!("hummock version checkpoint is paused.");
251 }
252
253 pub fn resume_version_checkpoint(&self) {
254 self.pause_version_checkpoint
255 .store(false, Ordering::Relaxed);
256 tracing::info!("hummock version checkpoint is resumed.");
257 }
258
259 pub fn is_version_checkpoint_paused(&self) -> bool {
260 self.pause_version_checkpoint.load(Ordering::Relaxed)
261 }
262
263 pub async fn get_checkpoint_version(&self) -> HummockVersion {
264 let versioning_guard = self.versioning.read().await;
265 versioning_guard.checkpoint.version.clone()
266 }
267}