risingwave_meta/hummock/manager/
checkpoint.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::ops::Bound::{Excluded, Included};
17use std::ops::{Deref, DerefMut};
18use std::sync::atomic::Ordering;
19
20use bytes::BytesMut;
21use risingwave_hummock_sdk::compaction_group::hummock_version_ext::version_object_size_map;
22use risingwave_hummock_sdk::version::HummockVersion;
23use risingwave_hummock_sdk::{HummockObjectId, HummockVersionId, get_stale_object_ids};
24use risingwave_pb::hummock::hummock_version_checkpoint::{PbStaleObjects, StaleObjects};
25use risingwave_pb::hummock::{
26    CheckpointCompressionAlgorithm, PbHummockVersion, PbHummockVersionArchive,
27    PbHummockVersionCheckpoint, PbHummockVersionCheckpointEnvelope, PbVectorIndexObject,
28    PbVectorIndexObjectType,
29};
30use thiserror_ext::AsReport;
31use tracing::warn;
32
33use crate::hummock::HummockManager;
34use crate::hummock::error::Result;
35use crate::hummock::manager::versioning::Versioning;
36use crate::hummock::metrics_utils::{trigger_gc_stat, trigger_split_stat};
37
38/// Computes xxhash64 checksum of the given data, using seed 0.
39/// This matches the xxhash64 used in block checksum (see `sstable/utils.rs`).
40pub(crate) fn xxhash64_checksum(data: &[u8]) -> u64 {
41    use std::hash::Hasher;
42    let mut hasher = twox_hash::XxHash64::with_seed(0);
43    hasher.write(data);
44    hasher.finish()
45}
46
47#[derive(Default)]
48pub struct HummockVersionCheckpoint {
49    pub version: HummockVersion,
50
51    /// stale objects of versions before the current checkpoint.
52    ///
53    /// Previously we stored the stale object of each single version.
54    /// Currently we will merge the stale object between two checkpoints, and only the
55    /// id of the checkpointed hummock version are included in the map.
56    pub stale_objects: HashMap<HummockVersionId, PbStaleObjects>,
57}
58
59impl HummockVersionCheckpoint {
60    pub fn from_protobuf(checkpoint: &PbHummockVersionCheckpoint) -> Self {
61        Self {
62            version: HummockVersion::from_persisted_protobuf(checkpoint.version.as_ref().unwrap()),
63            stale_objects: checkpoint
64                .stale_objects
65                .iter()
66                .map(|(version_id, objects)| (*version_id, objects.clone()))
67                .collect(),
68        }
69    }
70
71    /// Convert an owned `PbHummockVersionCheckpoint` to `HummockVersionCheckpoint`,
72    /// moving data instead of cloning for better performance on large checkpoints.
73    pub fn from_protobuf_owned(checkpoint: PbHummockVersionCheckpoint) -> Self {
74        Self {
75            version: HummockVersion::from_persisted_protobuf_owned(checkpoint.version.unwrap()),
76            stale_objects: checkpoint.stale_objects,
77        }
78    }
79
80    pub fn to_protobuf(&self) -> PbHummockVersionCheckpoint {
81        PbHummockVersionCheckpoint {
82            version: Some(PbHummockVersion::from(&self.version)),
83            stale_objects: self
84                .stale_objects
85                .iter()
86                .map(|(version_id, objects)| (*version_id, objects.clone()))
87                .collect(),
88        }
89    }
90}
91
92/// Decodes checkpoint data, supporting both envelope (compressed) and legacy (raw) formats.
93///
94/// Format detection: our writer always sets `checksum` in the envelope, so
95/// `checksum.is_some()` reliably distinguishes envelope from legacy format.
96/// Legacy bytes may happen to decode as an envelope (field 1 wire-type mismatch
97/// is skipped, field 2 LEN matches `payload`), but will never have a checksum.
98///
99/// Decoding logic:
100/// 1. Try to decode as `HummockVersionCheckpointEnvelope`
101/// 2. If `checksum.is_some()`:
102///    - Verify xxhash64 checksum
103///    - Decompress payload according to `compression_algorithm`
104///    - Decode decompressed bytes as `PbHummockVersionCheckpoint`
105/// 3. If decode fails or `checksum.is_none()`:
106///    - Decode bytes directly as legacy `PbHummockVersionCheckpoint`
107fn decode_checkpoint_data(data: bytes::Bytes) -> Result<PbHummockVersionCheckpoint> {
108    use anyhow::Context;
109    use prost::Message;
110
111    let data_size = data.len();
112
113    if let Ok(envelope) = PbHummockVersionCheckpointEnvelope::decode(data.clone())
114        && let Some(expected) = envelope.checksum
115    {
116        let actual = xxhash64_checksum(&envelope.payload);
117        if actual != expected {
118            return Err(anyhow::anyhow!(
119                "checkpoint checksum mismatch: expected {:#x}, got {:#x}",
120                expected,
121                actual
122            )
123            .into());
124        }
125
126        let algo = CheckpointCompressionAlgorithm::try_from(envelope.compression_algorithm)
127            .with_context(|| {
128                format!(
129                    "unknown checkpoint compression algorithm: {}",
130                    envelope.compression_algorithm
131                )
132            })?;
133
134        let decompressed = decompress_payload(algo, &envelope.payload)?;
135        let ckpt = PbHummockVersionCheckpoint::decode(decompressed.as_ref())
136            .context("failed to decode checkpoint envelope payload")?;
137        if ckpt.version.is_none() {
138            return Err(anyhow::anyhow!("checkpoint missing required field `version`").into());
139        }
140
141        tracing::info!(
142            compression = ?algo,
143            compressed_size = envelope.payload.len(),
144            decompressed_size = decompressed.len(),
145            compression_ratio =
146                format!("{:.2}x", decompressed.len() as f64 / envelope.payload.len().max(1) as f64),
147            checksum = format!("{expected:#x}"),
148            "decoded compressed checkpoint"
149        );
150        return Ok(ckpt);
151    }
152
153    // Legacy uncompressed format
154    tracing::info!(
155        data_size,
156        "decoding checkpoint in legacy uncompressed format"
157    );
158    let ckpt =
159        PbHummockVersionCheckpoint::decode(data).context("failed to decode legacy checkpoint")?;
160    if ckpt.version.is_none() {
161        return Err(anyhow::anyhow!("legacy checkpoint missing required field `version`").into());
162    }
163    Ok(ckpt)
164}
165
166fn decompress_payload(
167    algo: CheckpointCompressionAlgorithm,
168    payload: &[u8],
169) -> Result<std::borrow::Cow<'_, [u8]>> {
170    use anyhow::Context;
171
172    match algo {
173        CheckpointCompressionAlgorithm::CheckpointCompressionUnspecified => Ok(payload.into()),
174        CheckpointCompressionAlgorithm::CheckpointCompressionZstd => {
175            Ok(zstd::stream::decode_all(payload)
176                .map(std::borrow::Cow::Owned)
177                .context("zstd decompression failed")?)
178        }
179        CheckpointCompressionAlgorithm::CheckpointCompressionLz4 => {
180            let mut decoder = lz4::Decoder::new(payload).context("lz4 decoder init failed")?;
181            let mut decompressed = Vec::new();
182            std::io::Read::read_to_end(&mut decoder, &mut decompressed)
183                .context("lz4 decompression failed")?;
184            Ok(decompressed.into())
185        }
186    }
187}
188
189/// Compresses checkpoint data using the specified algorithm.
190pub(crate) fn compress_payload(
191    algo: risingwave_common::config::CheckpointCompression,
192    data: &[u8],
193) -> Result<Vec<u8>> {
194    use anyhow::Context;
195    use risingwave_common::config::CheckpointCompression;
196
197    match algo {
198        CheckpointCompression::None => Ok(data.to_vec()),
199        CheckpointCompression::Zstd => {
200            // Level 3: good balance between compression ratio and speed
201            Ok(zstd::stream::encode_all(data, 3).context("zstd compression failed")?)
202        }
203        CheckpointCompression::Lz4 => {
204            let mut compressed = Vec::new();
205            let mut encoder = lz4::EncoderBuilder::new()
206                .level(4)
207                .build(&mut compressed)
208                .context("lz4 encoder init failed")?;
209            std::io::Write::write_all(&mut encoder, data)
210                .context("lz4 compression write failed")?;
211            let (_writer, result) = encoder.finish();
212            result.context("lz4 compression finish failed")?;
213            Ok(compressed)
214        }
215    }
216}
217
218async fn read_bytes_in_chunks<F, Fut>(
219    total_size: usize,
220    chunk_size: usize,
221    max_in_flight_chunks: usize,
222    mut read_range: F,
223) -> anyhow::Result<bytes::Bytes>
224where
225    F: FnMut(std::ops::Range<usize>) -> Fut,
226    Fut: std::future::Future<Output = anyhow::Result<bytes::Bytes>>,
227{
228    use anyhow::Context;
229    use futures::StreamExt;
230
231    let num_chunks = total_size.div_ceil(chunk_size);
232    let mut buf = BytesMut::with_capacity(total_size);
233
234    let mut chunk_stream = futures::stream::iter((0..total_size).step_by(chunk_size))
235        .enumerate()
236        .map(|(chunk_idx, offset)| {
237            let end = std::cmp::min(offset + chunk_size, total_size);
238            let range = offset..end;
239            let fut = read_range(range.clone());
240            async move {
241                fut.await.with_context(|| {
242                    format!(
243                        "read checkpoint chunk {}/{} range {}..{}",
244                        chunk_idx + 1,
245                        num_chunks,
246                        range.start,
247                        range.end
248                    )
249                })
250            }
251        })
252        .buffered(max_in_flight_chunks);
253
254    while let Some(chunk) = chunk_stream.next().await {
255        let chunk = chunk?;
256        buf.extend_from_slice(&chunk);
257    }
258
259    Ok(buf.freeze())
260}
261
262/// A hummock version checkpoint compacts previous hummock version delta logs, and stores stale
263/// objects from those delta logs.
264impl HummockManager {
265    /// Returns Ok(None) if not found.
266    ///
267    /// Reads large checkpoints using bounded parallel chunked reads to avoid
268    /// single-request timeout issues. Chunk size and concurrency are configurable
269    /// via `checkpoint_read_chunk_size` and `checkpoint_read_max_in_flight_chunks`.
270    /// Supports both compressed (envelope) and uncompressed (legacy) checkpoint formats.
271    pub async fn try_read_checkpoint(&self) -> Result<Option<HummockVersionCheckpoint>> {
272        let object_metadata = match self
273            .object_store
274            .metadata(&self.version_checkpoint_path)
275            .await
276        {
277            Ok(metadata) => metadata,
278            Err(e) => {
279                if e.is_object_not_found_error() {
280                    return Ok(None);
281                }
282                return Err(e.into());
283            }
284        };
285        let total_size = object_metadata.total_size;
286
287        let chunk_size = self.env.opts.checkpoint_read_chunk_size;
288        let max_in_flight_chunks = self.env.opts.checkpoint_read_max_in_flight_chunks;
289
290        let download_start = std::time::Instant::now();
291        let data = if total_size <= chunk_size {
292            self.object_store
293                .read(&self.version_checkpoint_path, 0..total_size)
294                .await?
295        } else {
296            let num_chunks = total_size.div_ceil(chunk_size);
297            let data = read_bytes_in_chunks(
298                total_size,
299                chunk_size,
300                max_in_flight_chunks,
301                |range| async {
302                    Ok(self
303                        .object_store
304                        .read(&self.version_checkpoint_path, range)
305                        .await?)
306                },
307            )
308            .await?;
309
310            tracing::info!(
311                total_size,
312                num_chunks,
313                chunk_size,
314                max_in_flight_chunks,
315                "chunked read complete"
316            );
317            data
318        };
319        let download_duration = download_start.elapsed();
320
321        let decode_start = std::time::Instant::now();
322        let ckpt = decode_checkpoint_data(data)?;
323        let decode_duration = decode_start.elapsed();
324
325        tracing::info!(
326            total_size,
327            download_ms = download_duration.as_millis() as u64,
328            decode_ms = decode_duration.as_millis() as u64,
329            "checkpoint read complete"
330        );
331
332        Ok(Some(HummockVersionCheckpoint::from_protobuf_owned(ckpt)))
333    }
334
335    pub(super) async fn write_checkpoint(
336        &self,
337        checkpoint: &HummockVersionCheckpoint,
338    ) -> Result<()> {
339        use prost::Message;
340        let raw_bytes = checkpoint.to_protobuf().encode_to_vec();
341        let raw_size = raw_bytes.len();
342
343        let compression = self.env.opts.checkpoint_compression_algorithm;
344        let compressed = compress_payload(compression, &raw_bytes)?;
345        let checksum = xxhash64_checksum(&compressed);
346
347        tracing::info!(
348            raw_size,
349            compressed_size = compressed.len(),
350            compression_ratio =
351                format!("{:.2}x", raw_size as f64 / compressed.len().max(1) as f64),
352            compression = ?compression,
353            checksum = format!("{:#x}", checksum),
354            "writing compressed checkpoint"
355        );
356
357        let envelope = PbHummockVersionCheckpointEnvelope {
358            compression_algorithm: compression as i32,
359            payload: compressed,
360            checksum: Some(checksum),
361        };
362
363        let buf = envelope.encode_to_vec();
364        self.object_store
365            .upload(&self.version_checkpoint_path, buf.into())
366            .await?;
367        Ok(())
368    }
369
370    pub(super) async fn write_version_archive(
371        &self,
372        archive: &PbHummockVersionArchive,
373    ) -> Result<()> {
374        use prost::Message;
375        let buf = archive.encode_to_vec();
376        let archive_path = format!(
377            "{}/{}",
378            self.version_archive_dir,
379            archive.version.as_ref().unwrap().id
380        );
381        self.object_store.upload(&archive_path, buf.into()).await?;
382        Ok(())
383    }
384
385    /// Creates a hummock version checkpoint.
386    /// Returns the diff between new and old checkpoint id.
387    /// Note that this method must not be called concurrently, because internally it doesn't hold
388    /// lock throughout the method.
389    pub async fn create_version_checkpoint(&self, min_delta_log_num: u64) -> Result<u64> {
390        let timer = self.metrics.version_checkpoint_latency.start_timer();
391        // 1. hold read lock and create new checkpoint
392        let versioning_guard = self.versioning.read().await;
393        let versioning: &Versioning = versioning_guard.deref();
394        let current_version: &HummockVersion = &versioning.current_version;
395        let old_checkpoint: &HummockVersionCheckpoint = &versioning.checkpoint;
396        let new_checkpoint_id = current_version.id;
397        let old_checkpoint_id = old_checkpoint.version.id;
398        if new_checkpoint_id < old_checkpoint_id + min_delta_log_num {
399            return Ok(0);
400        }
401        if cfg!(test) && new_checkpoint_id == old_checkpoint_id {
402            drop(versioning_guard);
403            let versioning = self.versioning.read().await;
404            let context_info = self.context_info.read().await;
405            let min_pinned_version_id = context_info.min_pinned_version_id();
406            trigger_gc_stat(
407                &self.metrics,
408                &versioning.checkpoint,
409                min_pinned_version_id,
410                &versioning.table_change_log,
411            );
412            return Ok(0);
413        }
414        assert!(new_checkpoint_id > old_checkpoint_id);
415        let mut archive: Option<PbHummockVersionArchive> = None;
416        let mut stale_objects = old_checkpoint.stale_objects.clone();
417        // `object_sizes` is used to calculate size of stale objects.
418        let mut object_sizes = version_object_size_map(&old_checkpoint.version);
419        // The set of object ids that once exist in any hummock version
420        let mut versions_object_ids: HashSet<_> = old_checkpoint.version.get_object_ids().collect();
421        for (_, version_delta) in versioning
422            .hummock_version_deltas
423            .range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
424        {
425            // DO NOT REMOVE THIS LINE
426            // This is to ensure that when adding new variant to `HummockObjectId`,
427            // the compiler will warn us if we forget to handle it here.
428            match HummockObjectId::Sstable(0.into()) {
429                HummockObjectId::Sstable(_) => {}
430                HummockObjectId::VectorFile(_) => {}
431                HummockObjectId::HnswGraphFile(_) => {}
432            };
433            for (object_id, file_size) in version_delta
434                .newly_added_sst_infos(false)
435                .map(|sst| (HummockObjectId::Sstable(sst.object_id), sst.file_size))
436                .chain(
437                    version_delta
438                        .vector_index_delta
439                        .values()
440                        .flat_map(|delta| delta.newly_added_objects()),
441                )
442            {
443                object_sizes.insert(object_id, file_size);
444                versions_object_ids.insert(object_id);
445            }
446        }
447
448        // Object ids that once exist in any hummock version but not exist in the latest hummock version
449        let current_version_object_ids = current_version
450            .get_object_ids()
451            .chain(
452                versioning
453                    .table_change_log
454                    .values()
455                    .flat_map(|l| l.get_object_ids()),
456            )
457            .collect();
458        let removed_object_ids = &versions_object_ids - &current_version_object_ids;
459        let total_file_size = removed_object_ids
460            .iter()
461            .map(|t| {
462                object_sizes.get(t).copied().unwrap_or_else(|| {
463                    warn!(object_id = ?t, "unable to get size of removed object id");
464                    0
465                })
466            })
467            .sum::<u64>();
468        stale_objects.insert(current_version.id, {
469            let mut sst_ids = vec![];
470            let mut vector_files = vec![];
471            for object_id in removed_object_ids {
472                match object_id {
473                    HummockObjectId::Sstable(sst_id) => sst_ids.push(sst_id),
474                    HummockObjectId::VectorFile(vector_file_id) => {
475                        vector_files.push(PbVectorIndexObject {
476                            id: vector_file_id.as_raw(),
477                            object_type: PbVectorIndexObjectType::VectorIndexObjectVector as _,
478                        })
479                    }
480                    HummockObjectId::HnswGraphFile(graph_file_id) => {
481                        vector_files.push(PbVectorIndexObject {
482                            id: graph_file_id.as_raw(),
483                            object_type: PbVectorIndexObjectType::VectorIndexObjectHnswGraph as _,
484                        });
485                    }
486                }
487            }
488            StaleObjects {
489                id: sst_ids,
490                total_file_size,
491                vector_files,
492            }
493        });
494        if self.env.opts.enable_hummock_data_archive {
495            archive = Some(PbHummockVersionArchive {
496                version: Some(PbHummockVersion::from(&old_checkpoint.version)),
497                version_deltas: versioning
498                    .hummock_version_deltas
499                    .range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
500                    .map(|(_, version_delta)| version_delta.into())
501                    .collect(),
502            });
503        }
504        let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
505        let may_delete_object = stale_objects
506            .iter()
507            .filter_map(|(version_id, object_ids)| {
508                if *version_id >= min_pinned_version_id {
509                    return None;
510                }
511                Some(get_stale_object_ids(object_ids))
512            })
513            .flatten();
514        self.gc_manager.add_may_delete_object_ids(may_delete_object);
515        stale_objects.retain(|version_id, _| *version_id >= min_pinned_version_id);
516        let new_checkpoint = HummockVersionCheckpoint {
517            version: current_version.clone(),
518            stale_objects,
519        };
520        drop(versioning_guard);
521        self.write_checkpoint(&new_checkpoint).await?;
522        if let Some(archive) = archive
523            && let Err(e) = self.write_version_archive(&archive).await
524        {
525            tracing::warn!(
526                error = %e.as_report(),
527                "failed to write version archive {}",
528                archive.version.as_ref().unwrap().id
529            );
530        }
531        let mut versioning_guard = self.versioning.write().await;
532        let versioning = versioning_guard.deref_mut();
533        assert!(new_checkpoint.version.id > versioning.checkpoint.version.id);
534        versioning.checkpoint = new_checkpoint;
535        let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
536        trigger_gc_stat(
537            &self.metrics,
538            &versioning.checkpoint,
539            min_pinned_version_id,
540            &versioning.table_change_log,
541        );
542        trigger_split_stat(&self.metrics, &versioning.current_version);
543        drop(versioning_guard);
544        timer.observe_duration();
545        self.metrics
546            .checkpoint_version_id
547            .set(new_checkpoint_id.as_i64_id());
548
549        Ok(new_checkpoint_id - old_checkpoint_id)
550    }
551
552    pub fn pause_version_checkpoint(&self) {
553        self.pause_version_checkpoint.store(true, Ordering::Relaxed);
554        tracing::info!("hummock version checkpoint is paused.");
555    }
556
557    pub fn resume_version_checkpoint(&self) {
558        self.pause_version_checkpoint
559            .store(false, Ordering::Relaxed);
560        tracing::info!("hummock version checkpoint is resumed.");
561    }
562
563    pub fn is_version_checkpoint_paused(&self) -> bool {
564        self.pause_version_checkpoint.load(Ordering::Relaxed)
565    }
566
567    pub async fn get_checkpoint_version(&self) -> HummockVersion {
568        let versioning_guard = self.versioning.read().await;
569        versioning_guard.checkpoint.version.clone()
570    }
571}
572
573#[cfg(test)]
574mod tests {
575    use bytes::Bytes;
576    use prost::Message;
577    use risingwave_common::config::CheckpointCompression;
578    use risingwave_pb::hummock::hummock_version_checkpoint::StaleObjects;
579    use risingwave_pb::hummock::{
580        PbHummockVersion, PbHummockVersionCheckpoint, PbHummockVersionCheckpointEnvelope,
581    };
582
583    use super::{
584        compress_payload, decode_checkpoint_data, read_bytes_in_chunks, xxhash64_checksum,
585    };
586
587    #[allow(deprecated)]
588    fn make_version(id: u64) -> PbHummockVersion {
589        PbHummockVersion {
590            id: id.into(),
591            levels: Default::default(),
592            max_committed_epoch: 0,
593            table_watermarks: Default::default(),
594            table_change_logs: Default::default(),
595            state_table_info: Default::default(),
596            vector_indexes: Default::default(),
597        }
598    }
599
600    fn make_checkpoint(version_id: u64) -> PbHummockVersionCheckpoint {
601        let stale = StaleObjects {
602            id: vec![1u64.into(), 2u64.into(), 3u64.into()],
603            total_file_size: 123,
604            vector_files: vec![],
605        };
606
607        PbHummockVersionCheckpoint {
608            version: Some(make_version(version_id)),
609            stale_objects: [(1u64.into(), stale)].into_iter().collect(),
610        }
611    }
612
613    fn make_envelope_bytes(
614        checkpoint: &PbHummockVersionCheckpoint,
615        compression: CheckpointCompression,
616        checksum: Option<u64>,
617    ) -> Bytes {
618        let raw = checkpoint.encode_to_vec();
619        let payload = compress_payload(compression, &raw)
620            .expect("compress checkpoint payload should succeed");
621        let checksum = checksum.unwrap_or_else(|| xxhash64_checksum(&payload));
622        let envelope = PbHummockVersionCheckpointEnvelope {
623            compression_algorithm: compression as i32,
624            payload,
625            checksum: Some(checksum),
626        };
627        Bytes::from(envelope.encode_to_vec())
628    }
629
630    #[test]
631    fn decode_checkpoint_data_falls_back_to_legacy_format() {
632        let checkpoint = make_checkpoint(42);
633        let raw = Bytes::from(checkpoint.encode_to_vec());
634        let decoded = decode_checkpoint_data(raw).expect("legacy checkpoint should decode");
635        assert_eq!(decoded, checkpoint);
636    }
637
638    #[test]
639    fn decode_checkpoint_data_roundtrips_envelope_with_checksum() {
640        let checkpoint = make_checkpoint(42);
641        for compression in [
642            CheckpointCompression::None,
643            CheckpointCompression::Zstd,
644            CheckpointCompression::Lz4,
645        ] {
646            let data = make_envelope_bytes(&checkpoint, compression, None);
647            let decoded = decode_checkpoint_data(data).expect("envelope checkpoint should decode");
648            assert_eq!(decoded, checkpoint);
649        }
650    }
651
652    #[test]
653    fn decode_checkpoint_data_returns_error_on_checksum_mismatch() {
654        let checkpoint = make_checkpoint(42);
655        let raw = checkpoint.encode_to_vec();
656        let mut payload = compress_payload(CheckpointCompression::Zstd, &raw)
657            .expect("compress checkpoint payload should succeed");
658        let expected = xxhash64_checksum(&payload);
659        payload[0] ^= 0x01;
660        let envelope = PbHummockVersionCheckpointEnvelope {
661            compression_algorithm: CheckpointCompression::Zstd as i32,
662            payload,
663            checksum: Some(expected),
664        };
665        let data = Bytes::from(envelope.encode_to_vec());
666        let err = decode_checkpoint_data(data).expect_err("checksum mismatch should error");
667        assert!(err.to_string().contains("checksum mismatch"), "{err:?}");
668    }
669
670    #[test]
671    fn decode_checkpoint_data_returns_error_on_unknown_compression_algorithm() {
672        let checkpoint = make_checkpoint(42);
673        let payload = checkpoint.encode_to_vec();
674        let checksum = xxhash64_checksum(&payload);
675        let envelope = PbHummockVersionCheckpointEnvelope {
676            compression_algorithm: 123,
677            payload,
678            checksum: Some(checksum),
679        };
680        let data = Bytes::from(envelope.encode_to_vec());
681        let err =
682            decode_checkpoint_data(data).expect_err("unknown compression algorithm should error");
683        assert!(
684            err.to_string()
685                .contains("unknown checkpoint compression algorithm"),
686            "{err:?}"
687        );
688    }
689
690    #[test]
691    fn decode_checkpoint_data_returns_error_on_legacy_missing_version() {
692        let checkpoint = PbHummockVersionCheckpoint {
693            version: None,
694            stale_objects: Default::default(),
695        };
696        let data = Bytes::from(checkpoint.encode_to_vec());
697        let err = decode_checkpoint_data(data).expect_err("missing version should error");
698        assert!(
699            err.to_string()
700                .contains("legacy checkpoint missing required field `version`"),
701            "{err:?}"
702        );
703    }
704
705    #[test]
706    fn decode_checkpoint_data_returns_error_on_corrupt_envelope_payload() {
707        let garbage = b"not a valid protobuf";
708        let checksum = xxhash64_checksum(garbage);
709        let envelope = PbHummockVersionCheckpointEnvelope {
710            compression_algorithm: CheckpointCompression::None as i32,
711            payload: garbage.to_vec(),
712            checksum: Some(checksum),
713        };
714        let data = Bytes::from(envelope.encode_to_vec());
715        let err = decode_checkpoint_data(data).expect_err("corrupt envelope payload should error");
716        assert!(
717            err.to_string()
718                .contains("failed to decode checkpoint envelope payload"),
719            "{err:?}"
720        );
721    }
722
723    #[test]
724    fn decode_checkpoint_data_returns_error_on_empty_input() {
725        let err = decode_checkpoint_data(Bytes::new()).expect_err("empty checkpoint should fail");
726        assert!(
727            err.to_string()
728                .contains("legacy checkpoint missing required field `version`"),
729            "{err:?}"
730        );
731    }
732
733    #[test]
734    fn decode_checkpoint_data_returns_error_on_envelope_missing_version() {
735        let checkpoint = PbHummockVersionCheckpoint {
736            version: None,
737            stale_objects: Default::default(),
738        };
739        let raw = checkpoint.encode_to_vec();
740        let checksum = xxhash64_checksum(&raw);
741        let envelope = PbHummockVersionCheckpointEnvelope {
742            compression_algorithm: CheckpointCompression::None as i32,
743            payload: raw,
744            checksum: Some(checksum),
745        };
746        let data = Bytes::from(envelope.encode_to_vec());
747        let err =
748            decode_checkpoint_data(data).expect_err("envelope with missing version should error");
749        assert!(
750            err.to_string()
751                .contains("checkpoint missing required field `version`"),
752            "{err:?}"
753        );
754    }
755
756    #[tokio::test]
757    async fn read_bytes_in_chunks_respects_concurrency_limit_and_reassembles() {
758        use std::sync::Arc;
759        use std::sync::atomic::{AtomicUsize, Ordering};
760
761        use tokio::time::{Duration, sleep};
762
763        let total_size = 100usize;
764        let chunk_size = 10usize;
765        let max_in_flight = 3usize;
766
767        let data: Arc<Vec<u8>> = Arc::new((0..total_size).map(|i| (i % 256) as u8).collect());
768        let in_flight = Arc::new(AtomicUsize::new(0));
769        let max_seen = Arc::new(AtomicUsize::new(0));
770
771        let out = read_bytes_in_chunks(total_size, chunk_size, max_in_flight, {
772            let data = data.clone();
773            let in_flight = in_flight.clone();
774            let max_seen = max_seen.clone();
775            move |range: std::ops::Range<usize>| {
776                let data = data.clone();
777                let in_flight = in_flight.clone();
778                let max_seen = max_seen.clone();
779                async move {
780                    let cur = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
781                    max_seen.fetch_max(cur, Ordering::SeqCst);
782
783                    // Add a small delay to simulate real I/O and allow multiple reads
784                    // to be in-flight concurrently. This tests that max_in_flight limit
785                    // is respected (should not exceed 3 concurrent reads).
786                    sleep(Duration::from_millis(30)).await;
787
788                    let bytes = Bytes::copy_from_slice(&data[range]);
789                    in_flight.fetch_sub(1, Ordering::SeqCst);
790                    Ok(bytes)
791                }
792            }
793        })
794        .await
795        .expect("chunked read should succeed");
796
797        assert_eq!(out.as_ref(), data.as_slice());
798        let max_seen = max_seen.load(Ordering::SeqCst);
799        assert!(max_seen <= max_in_flight, "max_seen={max_seen}");
800        assert!(
801            max_seen > 1,
802            "expected some concurrency, max_seen={max_seen}"
803        );
804    }
805
806    #[tokio::test]
807    async fn read_bytes_in_chunks_adds_range_context_on_error() {
808        let total_size = 30usize;
809        let chunk_size = 10usize;
810        let max_in_flight = 2usize;
811
812        let err = read_bytes_in_chunks(total_size, chunk_size, max_in_flight, |range| async move {
813            if range.start == 10 {
814                anyhow::bail!("boom");
815            }
816            Ok(Bytes::copy_from_slice(&vec![0u8; range.len()]))
817        })
818        .await
819        .expect_err("should fail");
820
821        let msg = err.to_string();
822        assert!(
823            msg.contains("read checkpoint chunk 2/3 range 10..20"),
824            "unexpected error message: {msg}"
825        );
826        let msg_with_chain = format!("{err:#}");
827        assert!(
828            msg_with_chain.contains("boom"),
829            "unexpected error message: {msg_with_chain}"
830        );
831    }
832}