risingwave_backup/
meta_snapshot.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::fmt::{Display, Formatter};
17
18use bytes::{Buf, BufMut};
19use risingwave_hummock_sdk::HummockRawObjectId;
20use risingwave_hummock_sdk::version::HummockVersion;
21
22use crate::error::BackupResult;
23use crate::{MetaSnapshotId, xxhash64_checksum, xxhash64_verify};
24
25pub trait Metadata: Display + Send + Sync {
26    fn encode_to(&self, buf: &mut Vec<u8>) -> BackupResult<()>;
27
28    fn decode(buf: &[u8]) -> BackupResult<Self>
29    where
30        Self: Sized;
31
32    fn hummock_version_ref(&self) -> &HummockVersion;
33
34    fn hummock_version(self) -> HummockVersion;
35
36    fn storage_url(&self) -> BackupResult<String>;
37
38    fn storage_directory(&self) -> BackupResult<String>;
39
40    fn table_change_log_object_ids(&self) -> HashSet<HummockRawObjectId>;
41}
42
43#[derive(Debug, Default, Clone, PartialEq)]
44pub struct MetaSnapshot<T: Metadata> {
45    pub format_version: u32,
46    pub id: MetaSnapshotId,
47    /// Snapshot of meta store.
48    pub metadata: T,
49}
50
51impl<T: Metadata> MetaSnapshot<T> {
52    pub fn encode(&self) -> BackupResult<Vec<u8>> {
53        let mut buf = vec![];
54        buf.put_u32_le(self.format_version);
55        buf.put_u64_le(self.id);
56        self.metadata.encode_to(&mut buf)?;
57        let checksum = xxhash64_checksum(&buf);
58        buf.put_u64_le(checksum);
59        Ok(buf)
60    }
61
62    pub fn decode(mut buf: &[u8]) -> BackupResult<Self> {
63        let checksum = (&buf[buf.len() - 8..]).get_u64_le();
64        xxhash64_verify(&buf[..buf.len() - 8], checksum)?;
65        let format_version = buf.get_u32_le();
66        let id = buf.get_u64_le();
67        let metadata = T::decode(buf)?;
68        Ok(Self {
69            format_version,
70            id,
71            metadata,
72        })
73    }
74
75    pub fn decode_format_version(mut buf: &[u8]) -> BackupResult<u32> {
76        let checksum = (&buf[buf.len() - 8..]).get_u64_le();
77        xxhash64_verify(&buf[..buf.len() - 8], checksum)?;
78        let format_version = buf.get_u32_le();
79        Ok(format_version)
80    }
81}
82
83impl<T: Metadata> Display for MetaSnapshot<T> {
84    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
85        writeln!(f, "format_version: {}", self.format_version)?;
86        writeln!(f, "id: {}", self.id)?;
87        writeln!(f, "{}", self.metadata)?;
88        Ok(())
89    }
90}