risingwave_backup/
meta_snapshot.rs1use std::collections::HashSet;
16use std::fmt::{Display, Formatter};
17
18use bytes::{Buf, BufMut};
19use risingwave_hummock_sdk::HummockRawObjectId;
20use risingwave_hummock_sdk::version::HummockVersion;
21
22use crate::error::BackupResult;
23use crate::{MetaSnapshotId, xxhash64_checksum, xxhash64_verify};
24
25pub trait Metadata: Display + Send + Sync {
26 fn encode_to(&self, buf: &mut Vec<u8>) -> BackupResult<()>;
27
28 fn decode(buf: &[u8]) -> BackupResult<Self>
29 where
30 Self: Sized;
31
32 fn hummock_version_ref(&self) -> &HummockVersion;
33
34 fn hummock_version(self) -> HummockVersion;
35
36 fn storage_url(&self) -> BackupResult<String>;
37
38 fn storage_directory(&self) -> BackupResult<String>;
39
40 fn table_change_log_object_ids(&self) -> HashSet<HummockRawObjectId>;
41}
42
43#[derive(Debug, Default, Clone, PartialEq)]
44pub struct MetaSnapshot<T: Metadata> {
45 pub format_version: u32,
46 pub id: MetaSnapshotId,
47 pub metadata: T,
49}
50
51impl<T: Metadata> MetaSnapshot<T> {
52 pub fn encode(&self) -> BackupResult<Vec<u8>> {
53 let mut buf = vec![];
54 buf.put_u32_le(self.format_version);
55 buf.put_u64_le(self.id);
56 self.metadata.encode_to(&mut buf)?;
57 let checksum = xxhash64_checksum(&buf);
58 buf.put_u64_le(checksum);
59 Ok(buf)
60 }
61
62 pub fn decode(mut buf: &[u8]) -> BackupResult<Self> {
63 let checksum = (&buf[buf.len() - 8..]).get_u64_le();
64 xxhash64_verify(&buf[..buf.len() - 8], checksum)?;
65 let format_version = buf.get_u32_le();
66 let id = buf.get_u64_le();
67 let metadata = T::decode(buf)?;
68 Ok(Self {
69 format_version,
70 id,
71 metadata,
72 })
73 }
74
75 pub fn decode_format_version(mut buf: &[u8]) -> BackupResult<u32> {
76 let checksum = (&buf[buf.len() - 8..]).get_u64_le();
77 xxhash64_verify(&buf[..buf.len() - 8], checksum)?;
78 let format_version = buf.get_u32_le();
79 Ok(format_version)
80 }
81}
82
83impl<T: Metadata> Display for MetaSnapshot<T> {
84 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
85 writeln!(f, "format_version: {}", self.format_version)?;
86 writeln!(f, "id: {}", self.id)?;
87 writeln!(f, "{}", self.metadata)?;
88 Ok(())
89 }
90}