risingwave_meta/hummock/manager/
checkpoint.rs1use std::collections::{HashMap, HashSet};
16use std::ops::Bound::{Excluded, Included};
17use std::ops::{Deref, DerefMut};
18use std::sync::atomic::Ordering;
19
20use risingwave_hummock_sdk::compaction_group::hummock_version_ext::object_size_map;
21use risingwave_hummock_sdk::version::HummockVersion;
22use risingwave_hummock_sdk::{HummockObjectId, HummockVersionId, get_stale_object_ids};
23use risingwave_pb::hummock::hummock_version_checkpoint::{PbStaleObjects, StaleObjects};
24use risingwave_pb::hummock::{
25 PbHummockVersion, PbHummockVersionArchive, PbHummockVersionCheckpoint, PbVectorIndexObject,
26 PbVectorIndexObjectType,
27};
28use thiserror_ext::AsReport;
29use tracing::warn;
30
31use crate::hummock::HummockManager;
32use crate::hummock::error::Result;
33use crate::hummock::manager::versioning::Versioning;
34use crate::hummock::metrics_utils::{trigger_gc_stat, trigger_split_stat};
35
36#[derive(Default)]
37pub struct HummockVersionCheckpoint {
38 pub version: HummockVersion,
39
40 pub stale_objects: HashMap<HummockVersionId, PbStaleObjects>,
46}
47
48impl HummockVersionCheckpoint {
49 pub fn from_protobuf(checkpoint: &PbHummockVersionCheckpoint) -> Self {
50 Self {
51 version: HummockVersion::from_persisted_protobuf(checkpoint.version.as_ref().unwrap()),
52 stale_objects: checkpoint
53 .stale_objects
54 .iter()
55 .map(|(version_id, objects)| (HummockVersionId::new(*version_id), objects.clone()))
56 .collect(),
57 }
58 }
59
60 pub fn to_protobuf(&self) -> PbHummockVersionCheckpoint {
61 PbHummockVersionCheckpoint {
62 version: Some(PbHummockVersion::from(&self.version)),
63 stale_objects: self
64 .stale_objects
65 .iter()
66 .map(|(version_id, objects)| (version_id.to_u64(), objects.clone()))
67 .collect(),
68 }
69 }
70}
71
72impl HummockManager {
75 pub async fn try_read_checkpoint(&self) -> Result<Option<HummockVersionCheckpoint>> {
77 use prost::Message;
78 let data = match self
79 .object_store
80 .read(&self.version_checkpoint_path, ..)
81 .await
82 {
83 Ok(data) => data,
84 Err(e) => {
85 if e.is_object_not_found_error() {
86 return Ok(None);
87 }
88 return Err(e.into());
89 }
90 };
91 let ckpt = PbHummockVersionCheckpoint::decode(data).map_err(|e| anyhow::anyhow!(e))?;
92 Ok(Some(HummockVersionCheckpoint::from_protobuf(&ckpt)))
93 }
94
95 pub(super) async fn write_checkpoint(
96 &self,
97 checkpoint: &HummockVersionCheckpoint,
98 ) -> Result<()> {
99 use prost::Message;
100 let buf = checkpoint.to_protobuf().encode_to_vec();
101 self.object_store
102 .upload(&self.version_checkpoint_path, buf.into())
103 .await?;
104 Ok(())
105 }
106
107 pub(super) async fn write_version_archive(
108 &self,
109 archive: &PbHummockVersionArchive,
110 ) -> Result<()> {
111 use prost::Message;
112 let buf = archive.encode_to_vec();
113 let archive_path = format!(
114 "{}/{}",
115 self.version_archive_dir,
116 archive.version.as_ref().unwrap().id
117 );
118 self.object_store.upload(&archive_path, buf.into()).await?;
119 Ok(())
120 }
121
122 pub async fn create_version_checkpoint(&self, min_delta_log_num: u64) -> Result<u64> {
127 let timer = self.metrics.version_checkpoint_latency.start_timer();
128 let versioning_guard = self.versioning.read().await;
130 let versioning: &Versioning = versioning_guard.deref();
131 let current_version: &HummockVersion = &versioning.current_version;
132 let old_checkpoint: &HummockVersionCheckpoint = &versioning.checkpoint;
133 let new_checkpoint_id = current_version.id;
134 let old_checkpoint_id = old_checkpoint.version.id;
135 if new_checkpoint_id < old_checkpoint_id + min_delta_log_num {
136 return Ok(0);
137 }
138 if cfg!(test) && new_checkpoint_id == old_checkpoint_id {
139 drop(versioning_guard);
140 let versioning = self.versioning.read().await;
141 let context_info = self.context_info.read().await;
142 let min_pinned_version_id = context_info.min_pinned_version_id();
143 trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id);
144 return Ok(0);
145 }
146 assert!(new_checkpoint_id > old_checkpoint_id);
147 let mut archive: Option<PbHummockVersionArchive> = None;
148 let mut stale_objects = old_checkpoint.stale_objects.clone();
149 let mut object_sizes = object_size_map(&old_checkpoint.version);
151 let mut versions_object_ids: HashSet<_> =
153 old_checkpoint.version.get_object_ids(false).collect();
154 for (_, version_delta) in versioning
155 .hummock_version_deltas
156 .range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
157 {
158 match HummockObjectId::Sstable(0.into()) {
162 HummockObjectId::Sstable(_) => {}
163 HummockObjectId::VectorFile(_) => {}
164 };
165 for (object_id, file_size) in version_delta
166 .newly_added_sst_infos(false)
167 .map(|sst| (HummockObjectId::Sstable(sst.object_id), sst.file_size))
168 .chain(
169 version_delta
170 .vector_index_delta
171 .values()
172 .flat_map(|delta| delta.newly_added_objects()),
173 )
174 {
175 object_sizes.insert(object_id, file_size);
176 versions_object_ids.insert(object_id);
177 }
178 }
179
180 let removed_object_ids =
182 &versions_object_ids - ¤t_version.get_object_ids(false).collect();
183 let total_file_size = removed_object_ids
184 .iter()
185 .map(|t| {
186 object_sizes.get(t).copied().unwrap_or_else(|| {
187 warn!(object_id = ?t, "unable to get size of removed object id");
188 0
189 })
190 })
191 .sum::<u64>();
192 stale_objects.insert(current_version.id, {
193 let mut sst_ids = vec![];
194 let mut vector_files = vec![];
195 for object_id in removed_object_ids {
196 match object_id {
197 HummockObjectId::Sstable(sst_id) => sst_ids.push(sst_id.inner()),
198 HummockObjectId::VectorFile(vector_file_id) => {
199 vector_files.push(PbVectorIndexObject {
200 id: vector_file_id.inner(),
201 object_type: PbVectorIndexObjectType::VectorIndexObjectVector as _,
202 })
203 }
204 }
205 }
206 StaleObjects {
207 id: sst_ids,
208 total_file_size,
209 vector_files,
210 }
211 });
212 if self.env.opts.enable_hummock_data_archive {
213 archive = Some(PbHummockVersionArchive {
214 version: Some(PbHummockVersion::from(&old_checkpoint.version)),
215 version_deltas: versioning
216 .hummock_version_deltas
217 .range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
218 .map(|(_, version_delta)| version_delta.into())
219 .collect(),
220 });
221 }
222 let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
223 let may_delete_object = stale_objects
224 .iter()
225 .filter_map(|(version_id, object_ids)| {
226 if *version_id >= min_pinned_version_id {
227 return None;
228 }
229 Some(get_stale_object_ids(object_ids))
230 })
231 .flatten();
232 self.gc_manager.add_may_delete_object_ids(may_delete_object);
233 stale_objects.retain(|version_id, _| *version_id >= min_pinned_version_id);
234 let new_checkpoint = HummockVersionCheckpoint {
235 version: current_version.clone(),
236 stale_objects,
237 };
238 drop(versioning_guard);
239 self.write_checkpoint(&new_checkpoint).await?;
241 if let Some(archive) = archive {
242 if let Err(e) = self.write_version_archive(&archive).await {
243 tracing::warn!(
244 error = %e.as_report(),
245 "failed to write version archive {}",
246 archive.version.as_ref().unwrap().id
247 );
248 }
249 }
250 let mut versioning_guard = self.versioning.write().await;
252 let versioning = versioning_guard.deref_mut();
253 assert!(new_checkpoint.version.id > versioning.checkpoint.version.id);
254 versioning.checkpoint = new_checkpoint;
255 let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
256 trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id);
257 trigger_split_stat(&self.metrics, &versioning.current_version);
258 drop(versioning_guard);
259 timer.observe_duration();
260 self.metrics
261 .checkpoint_version_id
262 .set(new_checkpoint_id.to_u64() as i64);
263
264 Ok(new_checkpoint_id - old_checkpoint_id)
265 }
266
267 pub fn pause_version_checkpoint(&self) {
268 self.pause_version_checkpoint.store(true, Ordering::Relaxed);
269 tracing::info!("hummock version checkpoint is paused.");
270 }
271
272 pub fn resume_version_checkpoint(&self) {
273 self.pause_version_checkpoint
274 .store(false, Ordering::Relaxed);
275 tracing::info!("hummock version checkpoint is resumed.");
276 }
277
278 pub fn is_version_checkpoint_paused(&self) -> bool {
279 self.pause_version_checkpoint.load(Ordering::Relaxed)
280 }
281
282 pub async fn get_checkpoint_version(&self) -> HummockVersion {
283 let versioning_guard = self.versioning.read().await;
284 versioning_guard.checkpoint.version.clone()
285 }
286}