1#![allow(clippy::derive_partial_eq_without_eq)]
16#![feature(error_generic_member_access)]
17#![cfg_attr(coverage, feature(coverage_attribute))]
18
19pub mod error;
20pub mod meta_snapshot;
21pub mod meta_snapshot_v1;
22pub mod meta_snapshot_v2;
23pub mod storage;
24
25use std::collections::{HashMap, HashSet};
26
27use itertools::Itertools;
28use risingwave_common::RW_VERSION;
29use risingwave_hummock_sdk::state_table_info::StateTableInfo;
30use risingwave_hummock_sdk::version::HummockVersion;
31use risingwave_hummock_sdk::{HummockRawObjectId, HummockVersionId};
32use risingwave_pb::backup_service::{PbMetaSnapshotManifest, PbMetaSnapshotMetadata};
33use risingwave_pb::id::TableId;
34use serde::{Deserialize, Serialize};
35
36use crate::error::{BackupError, BackupResult};
37
38pub type MetaSnapshotId = u64;
39pub type MetaBackupJobId = u64;
40
41#[derive(Serialize, Deserialize, Clone)]
43pub struct MetaSnapshotMetadata {
44 pub id: MetaSnapshotId,
45 pub hummock_version_id: HummockVersionId,
46 #[serde(rename = "ssts")]
48 pub objects: HashSet<HummockRawObjectId>,
49 #[serde(default)]
50 pub format_version: u32,
51 pub remarks: Option<String>,
52 #[serde(default)]
53 pub state_table_info: HashMap<TableId, StateTableInfo>,
54 pub rw_version: Option<String>,
55}
56
57impl MetaSnapshotMetadata {
58 pub fn new(
59 id: MetaSnapshotId,
60 v: &HummockVersion,
61 format_version: u32,
62 remarks: Option<String>,
63 table_change_log_object_ids: impl Iterator<Item = HummockRawObjectId>,
64 ) -> Self {
65 Self {
66 id,
67 hummock_version_id: v.id,
68 objects: v
69 .get_object_ids()
70 .map(|object_id| object_id.as_raw())
71 .chain(table_change_log_object_ids)
72 .collect(),
73 format_version,
74 remarks,
75 state_table_info: v
76 .state_table_info
77 .info()
78 .iter()
79 .map(|(id, info)| (*id, info.into()))
80 .collect(),
81 rw_version: Some(RW_VERSION.to_owned()),
82 }
83 }
84}
85
86#[derive(Serialize, Deserialize, Default, Clone)]
88pub struct MetaSnapshotManifest {
89 pub manifest_id: u64,
90 pub snapshot_metadata: Vec<MetaSnapshotMetadata>,
91}
92
93pub fn xxhash64_checksum(data: &[u8]) -> u64 {
94 twox_hash::XxHash64::oneshot(0, data)
95}
96
97pub fn xxhash64_verify(data: &[u8], checksum: u64) -> BackupResult<()> {
98 let data_checksum = xxhash64_checksum(data);
99 if data_checksum != checksum {
100 return Err(BackupError::ChecksumMismatch {
101 expected: checksum,
102 found: data_checksum,
103 });
104 }
105 Ok(())
106}
107
108impl From<&MetaSnapshotMetadata> for PbMetaSnapshotMetadata {
109 fn from(m: &MetaSnapshotMetadata) -> Self {
110 Self {
111 id: m.id,
112 hummock_version_id: m.hummock_version_id,
113 format_version: Some(m.format_version),
114 remarks: m.remarks.clone(),
115 state_table_info: m
116 .state_table_info
117 .iter()
118 .map(|(t, i)| ((*t), i.into()))
119 .collect(),
120 rw_version: m.rw_version.clone(),
121 }
122 }
123}
124
125impl From<&MetaSnapshotManifest> for PbMetaSnapshotManifest {
126 fn from(m: &MetaSnapshotManifest) -> Self {
127 Self {
128 manifest_id: m.manifest_id,
129 snapshot_metadata: m.snapshot_metadata.iter().map_into().collect_vec(),
130 }
131 }
132}