risingwave_backup/
meta_snapshot.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Display, Formatter};
16
17use bytes::{Buf, BufMut};
18use risingwave_hummock_sdk::version::HummockVersion;
19
20use crate::error::BackupResult;
21use crate::{MetaSnapshotId, xxhash64_checksum, xxhash64_verify};
22
23pub trait Metadata: Display + Send + Sync {
24    fn encode_to(&self, buf: &mut Vec<u8>) -> BackupResult<()>;
25
26    fn decode(buf: &[u8]) -> BackupResult<Self>
27    where
28        Self: Sized;
29
30    fn hummock_version_ref(&self) -> &HummockVersion;
31
32    fn hummock_version(self) -> HummockVersion;
33
34    fn storage_url(&self) -> BackupResult<String>;
35
36    fn storage_directory(&self) -> BackupResult<String>;
37}
38
39#[derive(Debug, Default, Clone, PartialEq)]
40pub struct MetaSnapshot<T: Metadata> {
41    pub format_version: u32,
42    pub id: MetaSnapshotId,
43    /// Snapshot of meta store.
44    pub metadata: T,
45}
46
47impl<T: Metadata> MetaSnapshot<T> {
48    pub fn encode(&self) -> BackupResult<Vec<u8>> {
49        let mut buf = vec![];
50        buf.put_u32_le(self.format_version);
51        buf.put_u64_le(self.id);
52        self.metadata.encode_to(&mut buf)?;
53        let checksum = xxhash64_checksum(&buf);
54        buf.put_u64_le(checksum);
55        Ok(buf)
56    }
57
58    pub fn decode(mut buf: &[u8]) -> BackupResult<Self> {
59        let checksum = (&buf[buf.len() - 8..]).get_u64_le();
60        xxhash64_verify(&buf[..buf.len() - 8], checksum)?;
61        let format_version = buf.get_u32_le();
62        let id = buf.get_u64_le();
63        let metadata = T::decode(buf)?;
64        Ok(Self {
65            format_version,
66            id,
67            metadata,
68        })
69    }
70
71    pub fn decode_format_version(mut buf: &[u8]) -> BackupResult<u32> {
72        let checksum = (&buf[buf.len() - 8..]).get_u64_le();
73        xxhash64_verify(&buf[..buf.len() - 8], checksum)?;
74        let format_version = buf.get_u32_le();
75        Ok(format_version)
76    }
77}
78
79impl<T: Metadata> Display for MetaSnapshot<T> {
80    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
81        writeln!(f, "format_version: {}", self.format_version)?;
82        writeln!(f, "id: {}", self.id)?;
83        writeln!(f, "{}", self.metadata)?;
84        Ok(())
85    }
86}