risingwave_meta/hummock/manager/
checkpoint.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::ops::Bound::{Excluded, Included};
17use std::ops::{Deref, DerefMut};
18use std::sync::atomic::Ordering;
19
20use risingwave_hummock_sdk::compaction_group::hummock_version_ext::object_size_map;
21use risingwave_hummock_sdk::version::HummockVersion;
22use risingwave_hummock_sdk::{HummockObjectId, HummockVersionId, get_stale_object_ids};
23use risingwave_pb::hummock::hummock_version_checkpoint::{PbStaleObjects, StaleObjects};
24use risingwave_pb::hummock::{
25    PbHummockVersion, PbHummockVersionArchive, PbHummockVersionCheckpoint, PbVectorIndexObject,
26    PbVectorIndexObjectType,
27};
28use thiserror_ext::AsReport;
29use tracing::warn;
30
31use crate::hummock::HummockManager;
32use crate::hummock::error::Result;
33use crate::hummock::manager::versioning::Versioning;
34use crate::hummock::metrics_utils::{trigger_gc_stat, trigger_split_stat};
35
36#[derive(Default)]
37pub struct HummockVersionCheckpoint {
38    pub version: HummockVersion,
39
40    /// stale objects of versions before the current checkpoint.
41    ///
42    /// Previously we stored the stale object of each single version.
43    /// Currently we will merge the stale object between two checkpoints, and only the
44    /// id of the checkpointed hummock version are included in the map.
45    pub stale_objects: HashMap<HummockVersionId, PbStaleObjects>,
46}
47
48impl HummockVersionCheckpoint {
49    pub fn from_protobuf(checkpoint: &PbHummockVersionCheckpoint) -> Self {
50        Self {
51            version: HummockVersion::from_persisted_protobuf(checkpoint.version.as_ref().unwrap()),
52            stale_objects: checkpoint
53                .stale_objects
54                .iter()
55                .map(|(version_id, objects)| (HummockVersionId::new(*version_id), objects.clone()))
56                .collect(),
57        }
58    }
59
60    pub fn to_protobuf(&self) -> PbHummockVersionCheckpoint {
61        PbHummockVersionCheckpoint {
62            version: Some(PbHummockVersion::from(&self.version)),
63            stale_objects: self
64                .stale_objects
65                .iter()
66                .map(|(version_id, objects)| (version_id.to_u64(), objects.clone()))
67                .collect(),
68        }
69    }
70}
71
72/// A hummock version checkpoint compacts previous hummock version delta logs, and stores stale
73/// objects from those delta logs.
74impl HummockManager {
75    /// Returns Ok(None) if not found.
76    pub async fn try_read_checkpoint(&self) -> Result<Option<HummockVersionCheckpoint>> {
77        use prost::Message;
78        let data = match self
79            .object_store
80            .read(&self.version_checkpoint_path, ..)
81            .await
82        {
83            Ok(data) => data,
84            Err(e) => {
85                if e.is_object_not_found_error() {
86                    return Ok(None);
87                }
88                return Err(e.into());
89            }
90        };
91        let ckpt = PbHummockVersionCheckpoint::decode(data).map_err(|e| anyhow::anyhow!(e))?;
92        Ok(Some(HummockVersionCheckpoint::from_protobuf(&ckpt)))
93    }
94
95    pub(super) async fn write_checkpoint(
96        &self,
97        checkpoint: &HummockVersionCheckpoint,
98    ) -> Result<()> {
99        use prost::Message;
100        let buf = checkpoint.to_protobuf().encode_to_vec();
101        self.object_store
102            .upload(&self.version_checkpoint_path, buf.into())
103            .await?;
104        Ok(())
105    }
106
107    pub(super) async fn write_version_archive(
108        &self,
109        archive: &PbHummockVersionArchive,
110    ) -> Result<()> {
111        use prost::Message;
112        let buf = archive.encode_to_vec();
113        let archive_path = format!(
114            "{}/{}",
115            self.version_archive_dir,
116            archive.version.as_ref().unwrap().id
117        );
118        self.object_store.upload(&archive_path, buf.into()).await?;
119        Ok(())
120    }
121
122    /// Creates a hummock version checkpoint.
123    /// Returns the diff between new and old checkpoint id.
124    /// Note that this method must not be called concurrently, because internally it doesn't hold
125    /// lock throughout the method.
126    pub async fn create_version_checkpoint(&self, min_delta_log_num: u64) -> Result<u64> {
127        let timer = self.metrics.version_checkpoint_latency.start_timer();
128        // 1. hold read lock and create new checkpoint
129        let versioning_guard = self.versioning.read().await;
130        let versioning: &Versioning = versioning_guard.deref();
131        let current_version: &HummockVersion = &versioning.current_version;
132        let old_checkpoint: &HummockVersionCheckpoint = &versioning.checkpoint;
133        let new_checkpoint_id = current_version.id;
134        let old_checkpoint_id = old_checkpoint.version.id;
135        if new_checkpoint_id < old_checkpoint_id + min_delta_log_num {
136            return Ok(0);
137        }
138        if cfg!(test) && new_checkpoint_id == old_checkpoint_id {
139            drop(versioning_guard);
140            let versioning = self.versioning.read().await;
141            let context_info = self.context_info.read().await;
142            let min_pinned_version_id = context_info.min_pinned_version_id();
143            trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id);
144            return Ok(0);
145        }
146        assert!(new_checkpoint_id > old_checkpoint_id);
147        let mut archive: Option<PbHummockVersionArchive> = None;
148        let mut stale_objects = old_checkpoint.stale_objects.clone();
149        // `object_sizes` is used to calculate size of stale objects.
150        let mut object_sizes = object_size_map(&old_checkpoint.version);
151        // The set of object ids that once exist in any hummock version
152        let mut versions_object_ids: HashSet<_> =
153            old_checkpoint.version.get_object_ids(false).collect();
154        for (_, version_delta) in versioning
155            .hummock_version_deltas
156            .range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
157        {
158            // DO NOT REMOVE THIS LINE
159            // This is to ensure that when adding new variant to `HummockObjectId`,
160            // the compiler will warn us if we forget to handle it here.
161            match HummockObjectId::Sstable(0.into()) {
162                HummockObjectId::Sstable(_) => {}
163                HummockObjectId::VectorFile(_) => {}
164            };
165            for (object_id, file_size) in version_delta
166                .newly_added_sst_infos(false)
167                .map(|sst| (HummockObjectId::Sstable(sst.object_id), sst.file_size))
168                .chain(
169                    version_delta
170                        .vector_index_delta
171                        .values()
172                        .flat_map(|delta| delta.newly_added_objects()),
173                )
174            {
175                object_sizes.insert(object_id, file_size);
176                versions_object_ids.insert(object_id);
177            }
178        }
179
180        // Object ids that once exist in any hummock version but not exist in the latest hummock version
181        let removed_object_ids =
182            &versions_object_ids - &current_version.get_object_ids(false).collect();
183        let total_file_size = removed_object_ids
184            .iter()
185            .map(|t| {
186                object_sizes.get(t).copied().unwrap_or_else(|| {
187                    warn!(object_id = ?t, "unable to get size of removed object id");
188                    0
189                })
190            })
191            .sum::<u64>();
192        stale_objects.insert(current_version.id, {
193            let mut sst_ids = vec![];
194            let mut vector_files = vec![];
195            for object_id in removed_object_ids {
196                match object_id {
197                    HummockObjectId::Sstable(sst_id) => sst_ids.push(sst_id.inner()),
198                    HummockObjectId::VectorFile(vector_file_id) => {
199                        vector_files.push(PbVectorIndexObject {
200                            id: vector_file_id.inner(),
201                            object_type: PbVectorIndexObjectType::VectorIndexObjectVector as _,
202                        })
203                    }
204                }
205            }
206            StaleObjects {
207                id: sst_ids,
208                total_file_size,
209                vector_files,
210            }
211        });
212        if self.env.opts.enable_hummock_data_archive {
213            archive = Some(PbHummockVersionArchive {
214                version: Some(PbHummockVersion::from(&old_checkpoint.version)),
215                version_deltas: versioning
216                    .hummock_version_deltas
217                    .range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
218                    .map(|(_, version_delta)| version_delta.into())
219                    .collect(),
220            });
221        }
222        let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
223        let may_delete_object = stale_objects
224            .iter()
225            .filter_map(|(version_id, object_ids)| {
226                if *version_id >= min_pinned_version_id {
227                    return None;
228                }
229                Some(get_stale_object_ids(object_ids))
230            })
231            .flatten();
232        self.gc_manager.add_may_delete_object_ids(may_delete_object);
233        stale_objects.retain(|version_id, _| *version_id >= min_pinned_version_id);
234        let new_checkpoint = HummockVersionCheckpoint {
235            version: current_version.clone(),
236            stale_objects,
237        };
238        drop(versioning_guard);
239        // 2. persist the new checkpoint without holding lock
240        self.write_checkpoint(&new_checkpoint).await?;
241        if let Some(archive) = archive {
242            if let Err(e) = self.write_version_archive(&archive).await {
243                tracing::warn!(
244                    error = %e.as_report(),
245                    "failed to write version archive {}",
246                    archive.version.as_ref().unwrap().id
247                );
248            }
249        }
250        // 3. hold write lock and update in memory state
251        let mut versioning_guard = self.versioning.write().await;
252        let versioning = versioning_guard.deref_mut();
253        assert!(new_checkpoint.version.id > versioning.checkpoint.version.id);
254        versioning.checkpoint = new_checkpoint;
255        let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
256        trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id);
257        trigger_split_stat(&self.metrics, &versioning.current_version);
258        drop(versioning_guard);
259        timer.observe_duration();
260        self.metrics
261            .checkpoint_version_id
262            .set(new_checkpoint_id.to_u64() as i64);
263
264        Ok(new_checkpoint_id - old_checkpoint_id)
265    }
266
267    pub fn pause_version_checkpoint(&self) {
268        self.pause_version_checkpoint.store(true, Ordering::Relaxed);
269        tracing::info!("hummock version checkpoint is paused.");
270    }
271
272    pub fn resume_version_checkpoint(&self) {
273        self.pause_version_checkpoint
274            .store(false, Ordering::Relaxed);
275        tracing::info!("hummock version checkpoint is resumed.");
276    }
277
278    pub fn is_version_checkpoint_paused(&self) -> bool {
279        self.pause_version_checkpoint.load(Ordering::Relaxed)
280    }
281
282    pub async fn get_checkpoint_version(&self) -> HummockVersion {
283        let versioning_guard = self.versioning.read().await;
284        versioning_guard.checkpoint.version.clone()
285    }
286}