risingwave_meta/hummock/manager/
checkpoint.rs1use std::collections::{HashMap, HashSet};
16use std::ops::Bound::{Excluded, Included};
17use std::ops::{Deref, DerefMut};
18use std::sync::atomic::Ordering;
19
20use risingwave_hummock_sdk::compaction_group::hummock_version_ext::object_size_map;
21use risingwave_hummock_sdk::version::HummockVersion;
22use risingwave_hummock_sdk::{HummockObjectId, HummockVersionId, get_stale_object_ids};
23use risingwave_pb::hummock::hummock_version_checkpoint::{PbStaleObjects, StaleObjects};
24use risingwave_pb::hummock::{
25 PbHummockVersion, PbHummockVersionArchive, PbHummockVersionCheckpoint, PbVectorIndexObject,
26 PbVectorIndexObjectType,
27};
28use thiserror_ext::AsReport;
29use tracing::warn;
30
31use crate::hummock::HummockManager;
32use crate::hummock::error::Result;
33use crate::hummock::manager::versioning::Versioning;
34use crate::hummock::metrics_utils::{trigger_gc_stat, trigger_split_stat};
35
36#[derive(Default)]
37pub struct HummockVersionCheckpoint {
38 pub version: HummockVersion,
39
40 pub stale_objects: HashMap<HummockVersionId, PbStaleObjects>,
46}
47
48impl HummockVersionCheckpoint {
49 pub fn from_protobuf(checkpoint: &PbHummockVersionCheckpoint) -> Self {
50 Self {
51 version: HummockVersion::from_persisted_protobuf(checkpoint.version.as_ref().unwrap()),
52 stale_objects: checkpoint
53 .stale_objects
54 .iter()
55 .map(|(version_id, objects)| (HummockVersionId::new(*version_id), objects.clone()))
56 .collect(),
57 }
58 }
59
60 pub fn to_protobuf(&self) -> PbHummockVersionCheckpoint {
61 PbHummockVersionCheckpoint {
62 version: Some(PbHummockVersion::from(&self.version)),
63 stale_objects: self
64 .stale_objects
65 .iter()
66 .map(|(version_id, objects)| (version_id.to_u64(), objects.clone()))
67 .collect(),
68 }
69 }
70}
71
72impl HummockManager {
75 pub async fn try_read_checkpoint(&self) -> Result<Option<HummockVersionCheckpoint>> {
77 use prost::Message;
78 let data = match self
79 .object_store
80 .read(&self.version_checkpoint_path, ..)
81 .await
82 {
83 Ok(data) => data,
84 Err(e) => {
85 if e.is_object_not_found_error() {
86 return Ok(None);
87 }
88 return Err(e.into());
89 }
90 };
91 let ckpt = PbHummockVersionCheckpoint::decode(data).map_err(|e| anyhow::anyhow!(e))?;
92 Ok(Some(HummockVersionCheckpoint::from_protobuf(&ckpt)))
93 }
94
95 pub(super) async fn write_checkpoint(
96 &self,
97 checkpoint: &HummockVersionCheckpoint,
98 ) -> Result<()> {
99 use prost::Message;
100 let buf = checkpoint.to_protobuf().encode_to_vec();
101 self.object_store
102 .upload(&self.version_checkpoint_path, buf.into())
103 .await?;
104 Ok(())
105 }
106
107 pub(super) async fn write_version_archive(
108 &self,
109 archive: &PbHummockVersionArchive,
110 ) -> Result<()> {
111 use prost::Message;
112 let buf = archive.encode_to_vec();
113 let archive_path = format!(
114 "{}/{}",
115 self.version_archive_dir,
116 archive.version.as_ref().unwrap().id
117 );
118 self.object_store.upload(&archive_path, buf.into()).await?;
119 Ok(())
120 }
121
122 pub async fn create_version_checkpoint(&self, min_delta_log_num: u64) -> Result<u64> {
127 let timer = self.metrics.version_checkpoint_latency.start_timer();
128 let versioning_guard = self.versioning.read().await;
130 let versioning: &Versioning = versioning_guard.deref();
131 let current_version: &HummockVersion = &versioning.current_version;
132 let old_checkpoint: &HummockVersionCheckpoint = &versioning.checkpoint;
133 let new_checkpoint_id = current_version.id;
134 let old_checkpoint_id = old_checkpoint.version.id;
135 if new_checkpoint_id < old_checkpoint_id + min_delta_log_num {
136 return Ok(0);
137 }
138 if cfg!(test) && new_checkpoint_id == old_checkpoint_id {
139 drop(versioning_guard);
140 let versioning = self.versioning.read().await;
141 let context_info = self.context_info.read().await;
142 let min_pinned_version_id = context_info.min_pinned_version_id();
143 trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id);
144 return Ok(0);
145 }
146 assert!(new_checkpoint_id > old_checkpoint_id);
147 let mut archive: Option<PbHummockVersionArchive> = None;
148 let mut stale_objects = old_checkpoint.stale_objects.clone();
149 let mut object_sizes = object_size_map(&old_checkpoint.version);
151 let mut versions_object_ids: HashSet<_> =
153 old_checkpoint.version.get_object_ids(false).collect();
154 for (_, version_delta) in versioning
155 .hummock_version_deltas
156 .range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
157 {
158 match HummockObjectId::Sstable(0.into()) {
162 HummockObjectId::Sstable(_) => {}
163 HummockObjectId::VectorFile(_) => {}
164 HummockObjectId::HnswGraphFile(_) => {}
165 };
166 for (object_id, file_size) in version_delta
167 .newly_added_sst_infos(false)
168 .map(|sst| (HummockObjectId::Sstable(sst.object_id), sst.file_size))
169 .chain(
170 version_delta
171 .vector_index_delta
172 .values()
173 .flat_map(|delta| delta.newly_added_objects()),
174 )
175 {
176 object_sizes.insert(object_id, file_size);
177 versions_object_ids.insert(object_id);
178 }
179 }
180
181 let removed_object_ids =
183 &versions_object_ids - ¤t_version.get_object_ids(false).collect();
184 let total_file_size = removed_object_ids
185 .iter()
186 .map(|t| {
187 object_sizes.get(t).copied().unwrap_or_else(|| {
188 warn!(object_id = ?t, "unable to get size of removed object id");
189 0
190 })
191 })
192 .sum::<u64>();
193 stale_objects.insert(current_version.id, {
194 let mut sst_ids = vec![];
195 let mut vector_files = vec![];
196 for object_id in removed_object_ids {
197 match object_id {
198 HummockObjectId::Sstable(sst_id) => sst_ids.push(sst_id.inner()),
199 HummockObjectId::VectorFile(vector_file_id) => {
200 vector_files.push(PbVectorIndexObject {
201 id: vector_file_id.inner(),
202 object_type: PbVectorIndexObjectType::VectorIndexObjectVector as _,
203 })
204 }
205 HummockObjectId::HnswGraphFile(graph_file_id) => {
206 vector_files.push(PbVectorIndexObject {
207 id: graph_file_id.inner(),
208 object_type: PbVectorIndexObjectType::VectorIndexObjectHnswGraph as _,
209 });
210 }
211 }
212 }
213 StaleObjects {
214 id: sst_ids,
215 total_file_size,
216 vector_files,
217 }
218 });
219 if self.env.opts.enable_hummock_data_archive {
220 archive = Some(PbHummockVersionArchive {
221 version: Some(PbHummockVersion::from(&old_checkpoint.version)),
222 version_deltas: versioning
223 .hummock_version_deltas
224 .range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
225 .map(|(_, version_delta)| version_delta.into())
226 .collect(),
227 });
228 }
229 let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
230 let may_delete_object = stale_objects
231 .iter()
232 .filter_map(|(version_id, object_ids)| {
233 if *version_id >= min_pinned_version_id {
234 return None;
235 }
236 Some(get_stale_object_ids(object_ids))
237 })
238 .flatten();
239 self.gc_manager.add_may_delete_object_ids(may_delete_object);
240 stale_objects.retain(|version_id, _| *version_id >= min_pinned_version_id);
241 let new_checkpoint = HummockVersionCheckpoint {
242 version: current_version.clone(),
243 stale_objects,
244 };
245 drop(versioning_guard);
246 self.write_checkpoint(&new_checkpoint).await?;
248 if let Some(archive) = archive
249 && let Err(e) = self.write_version_archive(&archive).await
250 {
251 tracing::warn!(
252 error = %e.as_report(),
253 "failed to write version archive {}",
254 archive.version.as_ref().unwrap().id
255 );
256 }
257 let mut versioning_guard = self.versioning.write().await;
259 let versioning = versioning_guard.deref_mut();
260 assert!(new_checkpoint.version.id > versioning.checkpoint.version.id);
261 versioning.checkpoint = new_checkpoint;
262 let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
263 trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id);
264 trigger_split_stat(&self.metrics, &versioning.current_version);
265 drop(versioning_guard);
266 timer.observe_duration();
267 self.metrics
268 .checkpoint_version_id
269 .set(new_checkpoint_id.to_u64() as i64);
270
271 Ok(new_checkpoint_id - old_checkpoint_id)
272 }
273
274 pub fn pause_version_checkpoint(&self) {
275 self.pause_version_checkpoint.store(true, Ordering::Relaxed);
276 tracing::info!("hummock version checkpoint is paused.");
277 }
278
279 pub fn resume_version_checkpoint(&self) {
280 self.pause_version_checkpoint
281 .store(false, Ordering::Relaxed);
282 tracing::info!("hummock version checkpoint is resumed.");
283 }
284
285 pub fn is_version_checkpoint_paused(&self) -> bool {
286 self.pause_version_checkpoint.load(Ordering::Relaxed)
287 }
288
289 pub async fn get_checkpoint_version(&self) -> HummockVersion {
290 let versioning_guard = self.versioning.read().await;
291 versioning_guard.checkpoint.version.clone()
292 }
293}