1use std::collections::{HashMap, HashSet};
16use std::ops::Bound::{Excluded, Included};
17use std::ops::{Deref, DerefMut};
18use std::sync::atomic::Ordering;
19
20use bytes::BytesMut;
21use risingwave_hummock_sdk::compaction_group::hummock_version_ext::version_object_size_map;
22use risingwave_hummock_sdk::version::HummockVersion;
23use risingwave_hummock_sdk::{HummockObjectId, HummockVersionId, get_stale_object_ids};
24use risingwave_pb::hummock::hummock_version_checkpoint::{PbStaleObjects, StaleObjects};
25use risingwave_pb::hummock::{
26 CheckpointCompressionAlgorithm, PbHummockVersion, PbHummockVersionArchive,
27 PbHummockVersionCheckpoint, PbHummockVersionCheckpointEnvelope, PbVectorIndexObject,
28 PbVectorIndexObjectType,
29};
30use thiserror_ext::AsReport;
31use tracing::warn;
32
33use crate::hummock::HummockManager;
34use crate::hummock::error::Result;
35use crate::hummock::manager::versioning::Versioning;
36use crate::hummock::metrics_utils::{trigger_gc_stat, trigger_split_stat};
37
38pub(crate) fn xxhash64_checksum(data: &[u8]) -> u64 {
41 use std::hash::Hasher;
42 let mut hasher = twox_hash::XxHash64::with_seed(0);
43 hasher.write(data);
44 hasher.finish()
45}
46
47#[derive(Default)]
48pub struct HummockVersionCheckpoint {
49 pub version: HummockVersion,
50
51 pub stale_objects: HashMap<HummockVersionId, PbStaleObjects>,
57}
58
59impl HummockVersionCheckpoint {
60 pub fn from_protobuf(checkpoint: &PbHummockVersionCheckpoint) -> Self {
61 Self {
62 version: HummockVersion::from_persisted_protobuf(checkpoint.version.as_ref().unwrap()),
63 stale_objects: checkpoint
64 .stale_objects
65 .iter()
66 .map(|(version_id, objects)| (*version_id, objects.clone()))
67 .collect(),
68 }
69 }
70
71 pub fn from_protobuf_owned(checkpoint: PbHummockVersionCheckpoint) -> Self {
74 Self {
75 version: HummockVersion::from_persisted_protobuf_owned(checkpoint.version.unwrap()),
76 stale_objects: checkpoint.stale_objects,
77 }
78 }
79
80 pub fn to_protobuf(&self) -> PbHummockVersionCheckpoint {
81 PbHummockVersionCheckpoint {
82 version: Some(PbHummockVersion::from(&self.version)),
83 stale_objects: self
84 .stale_objects
85 .iter()
86 .map(|(version_id, objects)| (*version_id, objects.clone()))
87 .collect(),
88 }
89 }
90}
91
92fn decode_checkpoint_data(data: bytes::Bytes) -> Result<PbHummockVersionCheckpoint> {
108 use anyhow::Context;
109 use prost::Message;
110
111 let data_size = data.len();
112
113 if let Ok(envelope) = PbHummockVersionCheckpointEnvelope::decode(data.clone())
114 && let Some(expected) = envelope.checksum
115 {
116 let actual = xxhash64_checksum(&envelope.payload);
117 if actual != expected {
118 return Err(anyhow::anyhow!(
119 "checkpoint checksum mismatch: expected {:#x}, got {:#x}",
120 expected,
121 actual
122 )
123 .into());
124 }
125
126 let algo = CheckpointCompressionAlgorithm::try_from(envelope.compression_algorithm)
127 .with_context(|| {
128 format!(
129 "unknown checkpoint compression algorithm: {}",
130 envelope.compression_algorithm
131 )
132 })?;
133
134 let decompressed = decompress_payload(algo, &envelope.payload)?;
135 let ckpt = PbHummockVersionCheckpoint::decode(decompressed.as_ref())
136 .context("failed to decode checkpoint envelope payload")?;
137 if ckpt.version.is_none() {
138 return Err(anyhow::anyhow!("checkpoint missing required field `version`").into());
139 }
140
141 tracing::info!(
142 compression = ?algo,
143 compressed_size = envelope.payload.len(),
144 decompressed_size = decompressed.len(),
145 compression_ratio =
146 format!("{:.2}x", decompressed.len() as f64 / envelope.payload.len().max(1) as f64),
147 checksum = format!("{expected:#x}"),
148 "decoded compressed checkpoint"
149 );
150 return Ok(ckpt);
151 }
152
153 tracing::info!(
155 data_size,
156 "decoding checkpoint in legacy uncompressed format"
157 );
158 let ckpt =
159 PbHummockVersionCheckpoint::decode(data).context("failed to decode legacy checkpoint")?;
160 if ckpt.version.is_none() {
161 return Err(anyhow::anyhow!("legacy checkpoint missing required field `version`").into());
162 }
163 Ok(ckpt)
164}
165
166fn decompress_payload(
167 algo: CheckpointCompressionAlgorithm,
168 payload: &[u8],
169) -> Result<std::borrow::Cow<'_, [u8]>> {
170 use anyhow::Context;
171
172 match algo {
173 CheckpointCompressionAlgorithm::CheckpointCompressionUnspecified => Ok(payload.into()),
174 CheckpointCompressionAlgorithm::CheckpointCompressionZstd => {
175 Ok(zstd::stream::decode_all(payload)
176 .map(std::borrow::Cow::Owned)
177 .context("zstd decompression failed")?)
178 }
179 CheckpointCompressionAlgorithm::CheckpointCompressionLz4 => {
180 let mut decoder = lz4::Decoder::new(payload).context("lz4 decoder init failed")?;
181 let mut decompressed = Vec::new();
182 std::io::Read::read_to_end(&mut decoder, &mut decompressed)
183 .context("lz4 decompression failed")?;
184 Ok(decompressed.into())
185 }
186 }
187}
188
189pub(crate) fn compress_payload(
191 algo: risingwave_common::config::CheckpointCompression,
192 data: &[u8],
193) -> Result<Vec<u8>> {
194 use anyhow::Context;
195 use risingwave_common::config::CheckpointCompression;
196
197 match algo {
198 CheckpointCompression::None => Ok(data.to_vec()),
199 CheckpointCompression::Zstd => {
200 Ok(zstd::stream::encode_all(data, 3).context("zstd compression failed")?)
202 }
203 CheckpointCompression::Lz4 => {
204 let mut compressed = Vec::new();
205 let mut encoder = lz4::EncoderBuilder::new()
206 .level(4)
207 .build(&mut compressed)
208 .context("lz4 encoder init failed")?;
209 std::io::Write::write_all(&mut encoder, data)
210 .context("lz4 compression write failed")?;
211 let (_writer, result) = encoder.finish();
212 result.context("lz4 compression finish failed")?;
213 Ok(compressed)
214 }
215 }
216}
217
218async fn read_bytes_in_chunks<F, Fut>(
219 total_size: usize,
220 chunk_size: usize,
221 max_in_flight_chunks: usize,
222 mut read_range: F,
223) -> anyhow::Result<bytes::Bytes>
224where
225 F: FnMut(std::ops::Range<usize>) -> Fut,
226 Fut: std::future::Future<Output = anyhow::Result<bytes::Bytes>>,
227{
228 use anyhow::Context;
229 use futures::StreamExt;
230
231 let num_chunks = total_size.div_ceil(chunk_size);
232 let mut buf = BytesMut::with_capacity(total_size);
233
234 let mut chunk_stream = futures::stream::iter((0..total_size).step_by(chunk_size))
235 .enumerate()
236 .map(|(chunk_idx, offset)| {
237 let end = std::cmp::min(offset + chunk_size, total_size);
238 let range = offset..end;
239 let fut = read_range(range.clone());
240 async move {
241 fut.await.with_context(|| {
242 format!(
243 "read checkpoint chunk {}/{} range {}..{}",
244 chunk_idx + 1,
245 num_chunks,
246 range.start,
247 range.end
248 )
249 })
250 }
251 })
252 .buffered(max_in_flight_chunks);
253
254 while let Some(chunk) = chunk_stream.next().await {
255 let chunk = chunk?;
256 buf.extend_from_slice(&chunk);
257 }
258
259 Ok(buf.freeze())
260}
261
262impl HummockManager {
265 pub async fn try_read_checkpoint(&self) -> Result<Option<HummockVersionCheckpoint>> {
272 let object_metadata = match self
273 .object_store
274 .metadata(&self.version_checkpoint_path)
275 .await
276 {
277 Ok(metadata) => metadata,
278 Err(e) => {
279 if e.is_object_not_found_error() {
280 return Ok(None);
281 }
282 return Err(e.into());
283 }
284 };
285 let total_size = object_metadata.total_size;
286
287 let chunk_size = self.env.opts.checkpoint_read_chunk_size;
288 let max_in_flight_chunks = self.env.opts.checkpoint_read_max_in_flight_chunks;
289
290 let download_start = std::time::Instant::now();
291 let data = if total_size <= chunk_size {
292 self.object_store
293 .read(&self.version_checkpoint_path, 0..total_size)
294 .await?
295 } else {
296 let num_chunks = total_size.div_ceil(chunk_size);
297 let data = read_bytes_in_chunks(
298 total_size,
299 chunk_size,
300 max_in_flight_chunks,
301 |range| async {
302 Ok(self
303 .object_store
304 .read(&self.version_checkpoint_path, range)
305 .await?)
306 },
307 )
308 .await?;
309
310 tracing::info!(
311 total_size,
312 num_chunks,
313 chunk_size,
314 max_in_flight_chunks,
315 "chunked read complete"
316 );
317 data
318 };
319 let download_duration = download_start.elapsed();
320
321 let decode_start = std::time::Instant::now();
322 let ckpt = decode_checkpoint_data(data)?;
323 let decode_duration = decode_start.elapsed();
324
325 tracing::info!(
326 total_size,
327 download_ms = download_duration.as_millis() as u64,
328 decode_ms = decode_duration.as_millis() as u64,
329 "checkpoint read complete"
330 );
331
332 Ok(Some(HummockVersionCheckpoint::from_protobuf_owned(ckpt)))
333 }
334
335 pub(super) async fn write_checkpoint(
336 &self,
337 checkpoint: &HummockVersionCheckpoint,
338 ) -> Result<()> {
339 use prost::Message;
340 let raw_bytes = checkpoint.to_protobuf().encode_to_vec();
341 let raw_size = raw_bytes.len();
342
343 let compression = self.env.opts.checkpoint_compression_algorithm;
344 let compressed = compress_payload(compression, &raw_bytes)?;
345 let checksum = xxhash64_checksum(&compressed);
346
347 tracing::info!(
348 raw_size,
349 compressed_size = compressed.len(),
350 compression_ratio =
351 format!("{:.2}x", raw_size as f64 / compressed.len().max(1) as f64),
352 compression = ?compression,
353 checksum = format!("{:#x}", checksum),
354 "writing compressed checkpoint"
355 );
356
357 let envelope = PbHummockVersionCheckpointEnvelope {
358 compression_algorithm: compression as i32,
359 payload: compressed,
360 checksum: Some(checksum),
361 };
362
363 let buf = envelope.encode_to_vec();
364 self.object_store
365 .upload(&self.version_checkpoint_path, buf.into())
366 .await?;
367 Ok(())
368 }
369
370 pub(super) async fn write_version_archive(
371 &self,
372 archive: &PbHummockVersionArchive,
373 ) -> Result<()> {
374 use prost::Message;
375 let buf = archive.encode_to_vec();
376 let archive_path = format!(
377 "{}/{}",
378 self.version_archive_dir,
379 archive.version.as_ref().unwrap().id
380 );
381 self.object_store.upload(&archive_path, buf.into()).await?;
382 Ok(())
383 }
384
385 pub async fn create_version_checkpoint(&self, min_delta_log_num: u64) -> Result<u64> {
390 let timer = self.metrics.version_checkpoint_latency.start_timer();
391 let versioning_guard = self.versioning.read().await;
393 let versioning: &Versioning = versioning_guard.deref();
394 let current_version: &HummockVersion = &versioning.current_version;
395 let old_checkpoint: &HummockVersionCheckpoint = &versioning.checkpoint;
396 let new_checkpoint_id = current_version.id;
397 let old_checkpoint_id = old_checkpoint.version.id;
398 if new_checkpoint_id < old_checkpoint_id + min_delta_log_num {
399 return Ok(0);
400 }
401 if cfg!(test) && new_checkpoint_id == old_checkpoint_id {
402 drop(versioning_guard);
403 let versioning = self.versioning.read().await;
404 let context_info = self.context_info.read().await;
405 let min_pinned_version_id = context_info.min_pinned_version_id();
406 trigger_gc_stat(
407 &self.metrics,
408 &versioning.checkpoint,
409 min_pinned_version_id,
410 &versioning.table_change_log,
411 );
412 return Ok(0);
413 }
414 assert!(new_checkpoint_id > old_checkpoint_id);
415 let mut archive: Option<PbHummockVersionArchive> = None;
416 let mut stale_objects = old_checkpoint.stale_objects.clone();
417 let mut object_sizes = version_object_size_map(&old_checkpoint.version);
419 let mut versions_object_ids: HashSet<_> = old_checkpoint.version.get_object_ids().collect();
421 for (_, version_delta) in versioning
422 .hummock_version_deltas
423 .range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
424 {
425 match HummockObjectId::Sstable(0.into()) {
429 HummockObjectId::Sstable(_) => {}
430 HummockObjectId::VectorFile(_) => {}
431 HummockObjectId::HnswGraphFile(_) => {}
432 };
433 for (object_id, file_size) in version_delta
434 .newly_added_sst_infos(false)
435 .map(|sst| (HummockObjectId::Sstable(sst.object_id), sst.file_size))
436 .chain(
437 version_delta
438 .vector_index_delta
439 .values()
440 .flat_map(|delta| delta.newly_added_objects()),
441 )
442 {
443 object_sizes.insert(object_id, file_size);
444 versions_object_ids.insert(object_id);
445 }
446 }
447
448 let current_version_object_ids = current_version
450 .get_object_ids()
451 .chain(
452 versioning
453 .table_change_log
454 .values()
455 .flat_map(|l| l.get_object_ids()),
456 )
457 .collect();
458 let removed_object_ids = &versions_object_ids - ¤t_version_object_ids;
459 let total_file_size = removed_object_ids
460 .iter()
461 .map(|t| {
462 object_sizes.get(t).copied().unwrap_or_else(|| {
463 warn!(object_id = ?t, "unable to get size of removed object id");
464 0
465 })
466 })
467 .sum::<u64>();
468 stale_objects.insert(current_version.id, {
469 let mut sst_ids = vec![];
470 let mut vector_files = vec![];
471 for object_id in removed_object_ids {
472 match object_id {
473 HummockObjectId::Sstable(sst_id) => sst_ids.push(sst_id),
474 HummockObjectId::VectorFile(vector_file_id) => {
475 vector_files.push(PbVectorIndexObject {
476 id: vector_file_id.as_raw(),
477 object_type: PbVectorIndexObjectType::VectorIndexObjectVector as _,
478 })
479 }
480 HummockObjectId::HnswGraphFile(graph_file_id) => {
481 vector_files.push(PbVectorIndexObject {
482 id: graph_file_id.as_raw(),
483 object_type: PbVectorIndexObjectType::VectorIndexObjectHnswGraph as _,
484 });
485 }
486 }
487 }
488 StaleObjects {
489 id: sst_ids,
490 total_file_size,
491 vector_files,
492 }
493 });
494 if self.env.opts.enable_hummock_data_archive {
495 archive = Some(PbHummockVersionArchive {
496 version: Some(PbHummockVersion::from(&old_checkpoint.version)),
497 version_deltas: versioning
498 .hummock_version_deltas
499 .range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
500 .map(|(_, version_delta)| version_delta.into())
501 .collect(),
502 });
503 }
504 let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
505 let may_delete_object = stale_objects
506 .iter()
507 .filter_map(|(version_id, object_ids)| {
508 if *version_id >= min_pinned_version_id {
509 return None;
510 }
511 Some(get_stale_object_ids(object_ids))
512 })
513 .flatten();
514 self.gc_manager.add_may_delete_object_ids(may_delete_object);
515 stale_objects.retain(|version_id, _| *version_id >= min_pinned_version_id);
516 let new_checkpoint = HummockVersionCheckpoint {
517 version: current_version.clone(),
518 stale_objects,
519 };
520 drop(versioning_guard);
521 self.write_checkpoint(&new_checkpoint).await?;
522 if let Some(archive) = archive
523 && let Err(e) = self.write_version_archive(&archive).await
524 {
525 tracing::warn!(
526 error = %e.as_report(),
527 "failed to write version archive {}",
528 archive.version.as_ref().unwrap().id
529 );
530 }
531 let mut versioning_guard = self.versioning.write().await;
532 let versioning = versioning_guard.deref_mut();
533 assert!(new_checkpoint.version.id > versioning.checkpoint.version.id);
534 versioning.checkpoint = new_checkpoint;
535 let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
536 trigger_gc_stat(
537 &self.metrics,
538 &versioning.checkpoint,
539 min_pinned_version_id,
540 &versioning.table_change_log,
541 );
542 trigger_split_stat(&self.metrics, &versioning.current_version);
543 drop(versioning_guard);
544 timer.observe_duration();
545 self.metrics
546 .checkpoint_version_id
547 .set(new_checkpoint_id.as_i64_id());
548
549 Ok(new_checkpoint_id - old_checkpoint_id)
550 }
551
552 pub fn pause_version_checkpoint(&self) {
553 self.pause_version_checkpoint.store(true, Ordering::Relaxed);
554 tracing::info!("hummock version checkpoint is paused.");
555 }
556
557 pub fn resume_version_checkpoint(&self) {
558 self.pause_version_checkpoint
559 .store(false, Ordering::Relaxed);
560 tracing::info!("hummock version checkpoint is resumed.");
561 }
562
563 pub fn is_version_checkpoint_paused(&self) -> bool {
564 self.pause_version_checkpoint.load(Ordering::Relaxed)
565 }
566
567 pub async fn get_checkpoint_version(&self) -> HummockVersion {
568 let versioning_guard = self.versioning.read().await;
569 versioning_guard.checkpoint.version.clone()
570 }
571}
572
573#[cfg(test)]
574mod tests {
575 use bytes::Bytes;
576 use prost::Message;
577 use risingwave_common::config::CheckpointCompression;
578 use risingwave_pb::hummock::hummock_version_checkpoint::StaleObjects;
579 use risingwave_pb::hummock::{
580 PbHummockVersion, PbHummockVersionCheckpoint, PbHummockVersionCheckpointEnvelope,
581 };
582
583 use super::{
584 compress_payload, decode_checkpoint_data, read_bytes_in_chunks, xxhash64_checksum,
585 };
586
587 #[allow(deprecated)]
588 fn make_version(id: u64) -> PbHummockVersion {
589 PbHummockVersion {
590 id: id.into(),
591 levels: Default::default(),
592 max_committed_epoch: 0,
593 table_watermarks: Default::default(),
594 table_change_logs: Default::default(),
595 state_table_info: Default::default(),
596 vector_indexes: Default::default(),
597 }
598 }
599
600 fn make_checkpoint(version_id: u64) -> PbHummockVersionCheckpoint {
601 let stale = StaleObjects {
602 id: vec![1u64.into(), 2u64.into(), 3u64.into()],
603 total_file_size: 123,
604 vector_files: vec![],
605 };
606
607 PbHummockVersionCheckpoint {
608 version: Some(make_version(version_id)),
609 stale_objects: [(1u64.into(), stale)].into_iter().collect(),
610 }
611 }
612
613 fn make_envelope_bytes(
614 checkpoint: &PbHummockVersionCheckpoint,
615 compression: CheckpointCompression,
616 checksum: Option<u64>,
617 ) -> Bytes {
618 let raw = checkpoint.encode_to_vec();
619 let payload = compress_payload(compression, &raw)
620 .expect("compress checkpoint payload should succeed");
621 let checksum = checksum.unwrap_or_else(|| xxhash64_checksum(&payload));
622 let envelope = PbHummockVersionCheckpointEnvelope {
623 compression_algorithm: compression as i32,
624 payload,
625 checksum: Some(checksum),
626 };
627 Bytes::from(envelope.encode_to_vec())
628 }
629
630 #[test]
631 fn decode_checkpoint_data_falls_back_to_legacy_format() {
632 let checkpoint = make_checkpoint(42);
633 let raw = Bytes::from(checkpoint.encode_to_vec());
634 let decoded = decode_checkpoint_data(raw).expect("legacy checkpoint should decode");
635 assert_eq!(decoded, checkpoint);
636 }
637
638 #[test]
639 fn decode_checkpoint_data_roundtrips_envelope_with_checksum() {
640 let checkpoint = make_checkpoint(42);
641 for compression in [
642 CheckpointCompression::None,
643 CheckpointCompression::Zstd,
644 CheckpointCompression::Lz4,
645 ] {
646 let data = make_envelope_bytes(&checkpoint, compression, None);
647 let decoded = decode_checkpoint_data(data).expect("envelope checkpoint should decode");
648 assert_eq!(decoded, checkpoint);
649 }
650 }
651
652 #[test]
653 fn decode_checkpoint_data_returns_error_on_checksum_mismatch() {
654 let checkpoint = make_checkpoint(42);
655 let raw = checkpoint.encode_to_vec();
656 let mut payload = compress_payload(CheckpointCompression::Zstd, &raw)
657 .expect("compress checkpoint payload should succeed");
658 let expected = xxhash64_checksum(&payload);
659 payload[0] ^= 0x01;
660 let envelope = PbHummockVersionCheckpointEnvelope {
661 compression_algorithm: CheckpointCompression::Zstd as i32,
662 payload,
663 checksum: Some(expected),
664 };
665 let data = Bytes::from(envelope.encode_to_vec());
666 let err = decode_checkpoint_data(data).expect_err("checksum mismatch should error");
667 assert!(err.to_string().contains("checksum mismatch"), "{err:?}");
668 }
669
670 #[test]
671 fn decode_checkpoint_data_returns_error_on_unknown_compression_algorithm() {
672 let checkpoint = make_checkpoint(42);
673 let payload = checkpoint.encode_to_vec();
674 let checksum = xxhash64_checksum(&payload);
675 let envelope = PbHummockVersionCheckpointEnvelope {
676 compression_algorithm: 123,
677 payload,
678 checksum: Some(checksum),
679 };
680 let data = Bytes::from(envelope.encode_to_vec());
681 let err =
682 decode_checkpoint_data(data).expect_err("unknown compression algorithm should error");
683 assert!(
684 err.to_string()
685 .contains("unknown checkpoint compression algorithm"),
686 "{err:?}"
687 );
688 }
689
690 #[test]
691 fn decode_checkpoint_data_returns_error_on_legacy_missing_version() {
692 let checkpoint = PbHummockVersionCheckpoint {
693 version: None,
694 stale_objects: Default::default(),
695 };
696 let data = Bytes::from(checkpoint.encode_to_vec());
697 let err = decode_checkpoint_data(data).expect_err("missing version should error");
698 assert!(
699 err.to_string()
700 .contains("legacy checkpoint missing required field `version`"),
701 "{err:?}"
702 );
703 }
704
705 #[test]
706 fn decode_checkpoint_data_returns_error_on_corrupt_envelope_payload() {
707 let garbage = b"not a valid protobuf";
708 let checksum = xxhash64_checksum(garbage);
709 let envelope = PbHummockVersionCheckpointEnvelope {
710 compression_algorithm: CheckpointCompression::None as i32,
711 payload: garbage.to_vec(),
712 checksum: Some(checksum),
713 };
714 let data = Bytes::from(envelope.encode_to_vec());
715 let err = decode_checkpoint_data(data).expect_err("corrupt envelope payload should error");
716 assert!(
717 err.to_string()
718 .contains("failed to decode checkpoint envelope payload"),
719 "{err:?}"
720 );
721 }
722
723 #[test]
724 fn decode_checkpoint_data_returns_error_on_empty_input() {
725 let err = decode_checkpoint_data(Bytes::new()).expect_err("empty checkpoint should fail");
726 assert!(
727 err.to_string()
728 .contains("legacy checkpoint missing required field `version`"),
729 "{err:?}"
730 );
731 }
732
733 #[test]
734 fn decode_checkpoint_data_returns_error_on_envelope_missing_version() {
735 let checkpoint = PbHummockVersionCheckpoint {
736 version: None,
737 stale_objects: Default::default(),
738 };
739 let raw = checkpoint.encode_to_vec();
740 let checksum = xxhash64_checksum(&raw);
741 let envelope = PbHummockVersionCheckpointEnvelope {
742 compression_algorithm: CheckpointCompression::None as i32,
743 payload: raw,
744 checksum: Some(checksum),
745 };
746 let data = Bytes::from(envelope.encode_to_vec());
747 let err =
748 decode_checkpoint_data(data).expect_err("envelope with missing version should error");
749 assert!(
750 err.to_string()
751 .contains("checkpoint missing required field `version`"),
752 "{err:?}"
753 );
754 }
755
756 #[tokio::test]
757 async fn read_bytes_in_chunks_respects_concurrency_limit_and_reassembles() {
758 use std::sync::Arc;
759 use std::sync::atomic::{AtomicUsize, Ordering};
760
761 use tokio::time::{Duration, sleep};
762
763 let total_size = 100usize;
764 let chunk_size = 10usize;
765 let max_in_flight = 3usize;
766
767 let data: Arc<Vec<u8>> = Arc::new((0..total_size).map(|i| (i % 256) as u8).collect());
768 let in_flight = Arc::new(AtomicUsize::new(0));
769 let max_seen = Arc::new(AtomicUsize::new(0));
770
771 let out = read_bytes_in_chunks(total_size, chunk_size, max_in_flight, {
772 let data = data.clone();
773 let in_flight = in_flight.clone();
774 let max_seen = max_seen.clone();
775 move |range: std::ops::Range<usize>| {
776 let data = data.clone();
777 let in_flight = in_flight.clone();
778 let max_seen = max_seen.clone();
779 async move {
780 let cur = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
781 max_seen.fetch_max(cur, Ordering::SeqCst);
782
783 sleep(Duration::from_millis(30)).await;
787
788 let bytes = Bytes::copy_from_slice(&data[range]);
789 in_flight.fetch_sub(1, Ordering::SeqCst);
790 Ok(bytes)
791 }
792 }
793 })
794 .await
795 .expect("chunked read should succeed");
796
797 assert_eq!(out.as_ref(), data.as_slice());
798 let max_seen = max_seen.load(Ordering::SeqCst);
799 assert!(max_seen <= max_in_flight, "max_seen={max_seen}");
800 assert!(
801 max_seen > 1,
802 "expected some concurrency, max_seen={max_seen}"
803 );
804 }
805
806 #[tokio::test]
807 async fn read_bytes_in_chunks_adds_range_context_on_error() {
808 let total_size = 30usize;
809 let chunk_size = 10usize;
810 let max_in_flight = 2usize;
811
812 let err = read_bytes_in_chunks(total_size, chunk_size, max_in_flight, |range| async move {
813 if range.start == 10 {
814 anyhow::bail!("boom");
815 }
816 Ok(Bytes::copy_from_slice(&vec![0u8; range.len()]))
817 })
818 .await
819 .expect_err("should fail");
820
821 let msg = err.to_string();
822 assert!(
823 msg.contains("read checkpoint chunk 2/3 range 10..20"),
824 "unexpected error message: {msg}"
825 );
826 let msg_with_chain = format!("{err:#}");
827 assert!(
828 msg_with_chain.contains("boom"),
829 "unexpected error message: {msg_with_chain}"
830 );
831 }
832}