risingwave_meta/hummock/manager/
checkpoint.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::ops::Bound::{Excluded, Included};
17use std::ops::{Deref, DerefMut};
18use std::sync::atomic::Ordering;
19
20use risingwave_hummock_sdk::compaction_group::hummock_version_ext::object_size_map;
21use risingwave_hummock_sdk::version::HummockVersion;
22use risingwave_hummock_sdk::{HummockObjectId, HummockVersionId, get_stale_object_ids};
23use risingwave_pb::hummock::hummock_version_checkpoint::{PbStaleObjects, StaleObjects};
24use risingwave_pb::hummock::{
25    PbHummockVersion, PbHummockVersionArchive, PbHummockVersionCheckpoint,
26};
27use thiserror_ext::AsReport;
28use tracing::warn;
29
30use crate::hummock::HummockManager;
31use crate::hummock::error::Result;
32use crate::hummock::manager::versioning::Versioning;
33use crate::hummock::metrics_utils::{trigger_gc_stat, trigger_split_stat};
34
35#[derive(Default)]
36pub struct HummockVersionCheckpoint {
37    pub version: HummockVersion,
38
39    /// stale objects of versions before the current checkpoint.
40    ///
41    /// Previously we stored the stale object of each single version.
42    /// Currently we will merge the stale object between two checkpoints, and only the
43    /// id of the checkpointed hummock version are included in the map.
44    pub stale_objects: HashMap<HummockVersionId, PbStaleObjects>,
45}
46
47impl HummockVersionCheckpoint {
48    pub fn from_protobuf(checkpoint: &PbHummockVersionCheckpoint) -> Self {
49        Self {
50            version: HummockVersion::from_persisted_protobuf(checkpoint.version.as_ref().unwrap()),
51            stale_objects: checkpoint
52                .stale_objects
53                .iter()
54                .map(|(version_id, objects)| (HummockVersionId::new(*version_id), objects.clone()))
55                .collect(),
56        }
57    }
58
59    pub fn to_protobuf(&self) -> PbHummockVersionCheckpoint {
60        PbHummockVersionCheckpoint {
61            version: Some(PbHummockVersion::from(&self.version)),
62            stale_objects: self
63                .stale_objects
64                .iter()
65                .map(|(version_id, objects)| (version_id.to_u64(), objects.clone()))
66                .collect(),
67        }
68    }
69}
70
71/// A hummock version checkpoint compacts previous hummock version delta logs, and stores stale
72/// objects from those delta logs.
73impl HummockManager {
74    /// Returns Ok(None) if not found.
75    pub async fn try_read_checkpoint(&self) -> Result<Option<HummockVersionCheckpoint>> {
76        use prost::Message;
77        let data = match self
78            .object_store
79            .read(&self.version_checkpoint_path, ..)
80            .await
81        {
82            Ok(data) => data,
83            Err(e) => {
84                if e.is_object_not_found_error() {
85                    return Ok(None);
86                }
87                return Err(e.into());
88            }
89        };
90        let ckpt = PbHummockVersionCheckpoint::decode(data).map_err(|e| anyhow::anyhow!(e))?;
91        Ok(Some(HummockVersionCheckpoint::from_protobuf(&ckpt)))
92    }
93
94    pub(super) async fn write_checkpoint(
95        &self,
96        checkpoint: &HummockVersionCheckpoint,
97    ) -> Result<()> {
98        use prost::Message;
99        let buf = checkpoint.to_protobuf().encode_to_vec();
100        self.object_store
101            .upload(&self.version_checkpoint_path, buf.into())
102            .await?;
103        Ok(())
104    }
105
106    pub(super) async fn write_version_archive(
107        &self,
108        archive: &PbHummockVersionArchive,
109    ) -> Result<()> {
110        use prost::Message;
111        let buf = archive.encode_to_vec();
112        let archive_path = format!(
113            "{}/{}",
114            self.version_archive_dir,
115            archive.version.as_ref().unwrap().id
116        );
117        self.object_store.upload(&archive_path, buf.into()).await?;
118        Ok(())
119    }
120
121    /// Creates a hummock version checkpoint.
122    /// Returns the diff between new and old checkpoint id.
123    /// Note that this method must not be called concurrently, because internally it doesn't hold
124    /// lock throughout the method.
125    pub async fn create_version_checkpoint(&self, min_delta_log_num: u64) -> Result<u64> {
126        let timer = self.metrics.version_checkpoint_latency.start_timer();
127        // 1. hold read lock and create new checkpoint
128        let versioning_guard = self.versioning.read().await;
129        let versioning: &Versioning = versioning_guard.deref();
130        let current_version: &HummockVersion = &versioning.current_version;
131        let old_checkpoint: &HummockVersionCheckpoint = &versioning.checkpoint;
132        let new_checkpoint_id = current_version.id;
133        let old_checkpoint_id = old_checkpoint.version.id;
134        if new_checkpoint_id < old_checkpoint_id + min_delta_log_num {
135            return Ok(0);
136        }
137        if cfg!(test) && new_checkpoint_id == old_checkpoint_id {
138            drop(versioning_guard);
139            let versioning = self.versioning.read().await;
140            let context_info = self.context_info.read().await;
141            let min_pinned_version_id = context_info.min_pinned_version_id();
142            trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id);
143            return Ok(0);
144        }
145        assert!(new_checkpoint_id > old_checkpoint_id);
146        let mut archive: Option<PbHummockVersionArchive> = None;
147        let mut stale_objects = old_checkpoint.stale_objects.clone();
148        // `object_sizes` is used to calculate size of stale objects.
149        let mut object_sizes = object_size_map(&old_checkpoint.version);
150        // The set of object ids that once exist in any hummock version
151        let mut versions_object_ids = old_checkpoint.version.get_object_ids(false);
152        for (_, version_delta) in versioning
153            .hummock_version_deltas
154            .range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
155        {
156            // DO NOT REMOVE THIS LINE
157            // This is to ensure that when adding new variant to `HummockObjectId`,
158            // the compiler will warn us if we forget to handle it here.
159            match HummockObjectId::Sstable(0.into()) {
160                HummockObjectId::Sstable(_) => {}
161            };
162            for sst in version_delta.newly_added_sst_infos(false) {
163                let object_id = HummockObjectId::Sstable(sst.object_id);
164                object_sizes.insert(object_id, sst.file_size);
165                versions_object_ids.insert(object_id);
166            }
167        }
168
169        // Object ids that once exist in any hummock version but not exist in the latest hummock version
170        let removed_object_ids = &versions_object_ids - &current_version.get_object_ids(false);
171        let total_file_size = removed_object_ids
172            .iter()
173            .map(|t| {
174                object_sizes.get(t).copied().unwrap_or_else(|| {
175                    warn!(object_id = ?t, "unable to get size of removed object id");
176                    0
177                })
178            })
179            .sum::<u64>();
180        stale_objects.insert(
181            current_version.id,
182            StaleObjects {
183                id: removed_object_ids
184                    .into_iter()
185                    .map(|object_id| {
186                        let HummockObjectId::Sstable(sst_id) = object_id;
187                        sst_id.inner()
188                    })
189                    .collect(),
190                total_file_size,
191            },
192        );
193        if self.env.opts.enable_hummock_data_archive {
194            archive = Some(PbHummockVersionArchive {
195                version: Some(PbHummockVersion::from(&old_checkpoint.version)),
196                version_deltas: versioning
197                    .hummock_version_deltas
198                    .range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
199                    .map(|(_, version_delta)| version_delta.into())
200                    .collect(),
201            });
202        }
203        let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
204        let may_delete_object = stale_objects
205            .iter()
206            .filter_map(|(version_id, object_ids)| {
207                if *version_id >= min_pinned_version_id {
208                    return None;
209                }
210                Some(get_stale_object_ids(object_ids))
211            })
212            .flatten();
213        self.gc_manager.add_may_delete_object_ids(may_delete_object);
214        stale_objects.retain(|version_id, _| *version_id >= min_pinned_version_id);
215        let new_checkpoint = HummockVersionCheckpoint {
216            version: current_version.clone(),
217            stale_objects,
218        };
219        drop(versioning_guard);
220        // 2. persist the new checkpoint without holding lock
221        self.write_checkpoint(&new_checkpoint).await?;
222        if let Some(archive) = archive {
223            if let Err(e) = self.write_version_archive(&archive).await {
224                tracing::warn!(
225                    error = %e.as_report(),
226                    "failed to write version archive {}",
227                    archive.version.as_ref().unwrap().id
228                );
229            }
230        }
231        // 3. hold write lock and update in memory state
232        let mut versioning_guard = self.versioning.write().await;
233        let versioning = versioning_guard.deref_mut();
234        assert!(new_checkpoint.version.id > versioning.checkpoint.version.id);
235        versioning.checkpoint = new_checkpoint;
236        let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
237        trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id);
238        trigger_split_stat(&self.metrics, &versioning.current_version);
239        drop(versioning_guard);
240        timer.observe_duration();
241        self.metrics
242            .checkpoint_version_id
243            .set(new_checkpoint_id.to_u64() as i64);
244
245        Ok(new_checkpoint_id - old_checkpoint_id)
246    }
247
248    pub fn pause_version_checkpoint(&self) {
249        self.pause_version_checkpoint.store(true, Ordering::Relaxed);
250        tracing::info!("hummock version checkpoint is paused.");
251    }
252
253    pub fn resume_version_checkpoint(&self) {
254        self.pause_version_checkpoint
255            .store(false, Ordering::Relaxed);
256        tracing::info!("hummock version checkpoint is resumed.");
257    }
258
259    pub fn is_version_checkpoint_paused(&self) -> bool {
260        self.pause_version_checkpoint.load(Ordering::Relaxed)
261    }
262
263    pub async fn get_checkpoint_version(&self) -> HummockVersion {
264        let versioning_guard = self.versioning.read().await;
265        versioning_guard.checkpoint.version.clone()
266    }
267}