risingwave_meta/hummock/manager/
checkpoint.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::ops::Bound::{Excluded, Included};
17use std::ops::{Deref, DerefMut};
18use std::sync::atomic::Ordering;
19
20use risingwave_hummock_sdk::HummockVersionId;
21use risingwave_hummock_sdk::compaction_group::hummock_version_ext::object_size_map;
22use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion};
23use risingwave_pb::hummock::hummock_version_checkpoint::{PbStaleObjects, StaleObjects};
24use risingwave_pb::hummock::{
25    PbHummockVersion, PbHummockVersionArchive, PbHummockVersionCheckpoint,
26};
27use thiserror_ext::AsReport;
28use tracing::warn;
29
30use crate::hummock::HummockManager;
31use crate::hummock::error::Result;
32use crate::hummock::manager::versioning::Versioning;
33use crate::hummock::metrics_utils::{trigger_gc_stat, trigger_split_stat};
34
35#[derive(Default)]
36pub struct HummockVersionCheckpoint {
37    pub version: HummockVersion,
38
39    /// stale objects of versions before the current checkpoint.
40    ///
41    /// Previously we stored the stale object of each single version.
42    /// Currently we will merge the stale object between two checkpoints, and only the
43    /// id of the checkpointed hummock version are included in the map.
44    pub stale_objects: HashMap<HummockVersionId, PbStaleObjects>,
45}
46
47impl HummockVersionCheckpoint {
48    pub fn from_protobuf(checkpoint: &PbHummockVersionCheckpoint) -> Self {
49        Self {
50            version: HummockVersion::from_persisted_protobuf(checkpoint.version.as_ref().unwrap()),
51            stale_objects: checkpoint
52                .stale_objects
53                .iter()
54                .map(|(version_id, objects)| (HummockVersionId::new(*version_id), objects.clone()))
55                .collect(),
56        }
57    }
58
59    pub fn to_protobuf(&self) -> PbHummockVersionCheckpoint {
60        PbHummockVersionCheckpoint {
61            version: Some(PbHummockVersion::from(&self.version)),
62            stale_objects: self
63                .stale_objects
64                .iter()
65                .map(|(version_id, objects)| (version_id.to_u64(), objects.clone()))
66                .collect(),
67        }
68    }
69}
70
71/// A hummock version checkpoint compacts previous hummock version delta logs, and stores stale
72/// objects from those delta logs.
73impl HummockManager {
74    /// Returns Ok(None) if not found.
75    pub async fn try_read_checkpoint(&self) -> Result<Option<HummockVersionCheckpoint>> {
76        use prost::Message;
77        let data = match self
78            .object_store
79            .read(&self.version_checkpoint_path, ..)
80            .await
81        {
82            Ok(data) => data,
83            Err(e) => {
84                if e.is_object_not_found_error() {
85                    return Ok(None);
86                }
87                return Err(e.into());
88            }
89        };
90        let ckpt = PbHummockVersionCheckpoint::decode(data).map_err(|e| anyhow::anyhow!(e))?;
91        Ok(Some(HummockVersionCheckpoint::from_protobuf(&ckpt)))
92    }
93
94    pub(super) async fn write_checkpoint(
95        &self,
96        checkpoint: &HummockVersionCheckpoint,
97    ) -> Result<()> {
98        use prost::Message;
99        let buf = checkpoint.to_protobuf().encode_to_vec();
100        self.object_store
101            .upload(&self.version_checkpoint_path, buf.into())
102            .await?;
103        Ok(())
104    }
105
106    pub(super) async fn write_version_archive(
107        &self,
108        archive: &PbHummockVersionArchive,
109    ) -> Result<()> {
110        use prost::Message;
111        let buf = archive.encode_to_vec();
112        let archive_path = format!(
113            "{}/{}",
114            self.version_archive_dir,
115            archive.version.as_ref().unwrap().id
116        );
117        self.object_store.upload(&archive_path, buf.into()).await?;
118        Ok(())
119    }
120
121    /// Creates a hummock version checkpoint.
122    /// Returns the diff between new and old checkpoint id.
123    /// Note that this method must not be called concurrently, because internally it doesn't hold
124    /// lock throughout the method.
125    pub async fn create_version_checkpoint(&self, min_delta_log_num: u64) -> Result<u64> {
126        let timer = self.metrics.version_checkpoint_latency.start_timer();
127        // 1. hold read lock and create new checkpoint
128        let versioning_guard = self.versioning.read().await;
129        let versioning: &Versioning = versioning_guard.deref();
130        let current_version: &HummockVersion = &versioning.current_version;
131        let old_checkpoint: &HummockVersionCheckpoint = &versioning.checkpoint;
132        let new_checkpoint_id = current_version.id;
133        let old_checkpoint_id = old_checkpoint.version.id;
134        if new_checkpoint_id < old_checkpoint_id + min_delta_log_num {
135            return Ok(0);
136        }
137        if cfg!(test) && new_checkpoint_id == old_checkpoint_id {
138            drop(versioning_guard);
139            let versioning = self.versioning.read().await;
140            let context_info = self.context_info.read().await;
141            let min_pinned_version_id = context_info.min_pinned_version_id();
142            trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id);
143            return Ok(0);
144        }
145        assert!(new_checkpoint_id > old_checkpoint_id);
146        let mut archive: Option<PbHummockVersionArchive> = None;
147        let mut stale_objects = old_checkpoint.stale_objects.clone();
148        // `object_sizes` is used to calculate size of stale objects.
149        let mut object_sizes = object_size_map(&old_checkpoint.version);
150        // The set of object ids that once exist in any hummock version
151        let mut versions_object_ids = old_checkpoint.version.get_object_ids(false);
152        for (_, version_delta) in versioning
153            .hummock_version_deltas
154            .range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
155        {
156            for group_deltas in version_delta.group_deltas.values() {
157                object_sizes.extend(
158                    group_deltas
159                        .group_deltas
160                        .iter()
161                        .flat_map(|delta| {
162                            match delta {
163                                GroupDeltaCommon::IntraLevel(level_delta) => {
164                                    Some(level_delta.inserted_table_infos.iter())
165                                }
166                                GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
167                                    Some(inserted_table_infos.iter())
168                                }
169                                GroupDeltaCommon::GroupConstruct(_)
170                                | GroupDeltaCommon::GroupDestroy(_)
171                                | GroupDeltaCommon::GroupMerge(_) => None,
172                            }
173                            .into_iter()
174                            .flatten()
175                            .map(|t| (t.object_id, t.file_size))
176                        })
177                        .chain(
178                            version_delta
179                                .change_log_delta
180                                .values()
181                                .flat_map(|change_log| {
182                                    let new_log = &change_log.new_log;
183                                    new_log
184                                        .new_value
185                                        .iter()
186                                        .chain(new_log.old_value.iter())
187                                        .map(|t| (t.object_id, t.file_size))
188                                }),
189                        ),
190                );
191            }
192            versions_object_ids.extend(version_delta.newly_added_object_ids(false));
193        }
194
195        // Object ids that once exist in any hummock version but not exist in the latest hummock version
196        let removed_object_ids = &versions_object_ids - &current_version.get_object_ids(false);
197        let total_file_size = removed_object_ids
198            .iter()
199            .map(|t| {
200                object_sizes.get(t).copied().unwrap_or_else(|| {
201                    warn!(object_id = t, "unable to get size of removed object id");
202                    0
203                })
204            })
205            .sum::<u64>();
206        stale_objects.insert(
207            current_version.id,
208            StaleObjects {
209                id: removed_object_ids.into_iter().collect(),
210                total_file_size,
211            },
212        );
213        if self.env.opts.enable_hummock_data_archive {
214            archive = Some(PbHummockVersionArchive {
215                version: Some(PbHummockVersion::from(&old_checkpoint.version)),
216                version_deltas: versioning
217                    .hummock_version_deltas
218                    .range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
219                    .map(|(_, version_delta)| version_delta.into())
220                    .collect(),
221            });
222        }
223        let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
224        let may_delete_object = stale_objects
225            .iter()
226            .filter_map(|(version_id, object_ids)| {
227                if *version_id >= min_pinned_version_id {
228                    return None;
229                }
230                Some(object_ids.id.clone())
231            })
232            .flatten();
233        self.gc_manager.add_may_delete_object_ids(may_delete_object);
234        stale_objects.retain(|version_id, _| *version_id >= min_pinned_version_id);
235        let new_checkpoint = HummockVersionCheckpoint {
236            version: current_version.clone(),
237            stale_objects,
238        };
239        drop(versioning_guard);
240        // 2. persist the new checkpoint without holding lock
241        self.write_checkpoint(&new_checkpoint).await?;
242        if let Some(archive) = archive {
243            if let Err(e) = self.write_version_archive(&archive).await {
244                tracing::warn!(
245                    error = %e.as_report(),
246                    "failed to write version archive {}",
247                    archive.version.as_ref().unwrap().id
248                );
249            }
250        }
251        // 3. hold write lock and update in memory state
252        let mut versioning_guard = self.versioning.write().await;
253        let versioning = versioning_guard.deref_mut();
254        assert!(new_checkpoint.version.id > versioning.checkpoint.version.id);
255        versioning.checkpoint = new_checkpoint;
256        let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
257        trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id);
258        trigger_split_stat(&self.metrics, &versioning.current_version);
259        drop(versioning_guard);
260        timer.observe_duration();
261        self.metrics
262            .checkpoint_version_id
263            .set(new_checkpoint_id.to_u64() as i64);
264
265        Ok(new_checkpoint_id - old_checkpoint_id)
266    }
267
268    pub fn pause_version_checkpoint(&self) {
269        self.pause_version_checkpoint.store(true, Ordering::Relaxed);
270        tracing::info!("hummock version checkpoint is paused.");
271    }
272
273    pub fn resume_version_checkpoint(&self) {
274        self.pause_version_checkpoint
275            .store(false, Ordering::Relaxed);
276        tracing::info!("hummock version checkpoint is resumed.");
277    }
278
279    pub fn is_version_checkpoint_paused(&self) -> bool {
280        self.pause_version_checkpoint.load(Ordering::Relaxed)
281    }
282
283    pub async fn get_checkpoint_version(&self) -> HummockVersion {
284        let versioning_guard = self.versioning.read().await;
285        versioning_guard.checkpoint.version.clone()
286    }
287}