1use std::fmt::{Display, Formatter};
16
17use bytes::{Buf, BufMut};
18use risingwave_hummock_sdk::version::HummockVersion;
19use serde::{Deserialize, Serialize};
20
21use crate::meta_snapshot::{MetaSnapshot, Metadata};
22use crate::{BackupError, BackupResult};
23pub type MetaSnapshotV2 = MetaSnapshot<MetadataV2>;
24
25impl From<serde_json::Error> for BackupError {
26 fn from(value: serde_json::Error) -> Self {
27 BackupError::Other(value.into())
28 }
29}
30
31#[macro_export]
33macro_rules! for_all_metadata_models_v2 {
34 ($macro:ident) => {
35 $macro! {
36 {seaql_migrations, risingwave_meta_model::serde_seaql_migration},
37 {version_stats, risingwave_meta_model::hummock_version_stats},
38 {compaction_configs, risingwave_meta_model::compaction_config},
39 {actors, risingwave_meta_model::actor},
40 {clusters, risingwave_meta_model::cluster},
41 {fragment_relation, risingwave_meta_model::fragment_relation},
42 {catalog_versions, risingwave_meta_model::catalog_version},
43 {connections, risingwave_meta_model::connection},
44 {databases, risingwave_meta_model::database},
45 {fragments, risingwave_meta_model::fragment},
46 {functions, risingwave_meta_model::function},
47 {indexes, risingwave_meta_model::index},
48 {objects, risingwave_meta_model::object},
49 {object_dependencies, risingwave_meta_model::object_dependency},
50 {schemas, risingwave_meta_model::schema},
51 {sinks, risingwave_meta_model::sink},
52 {sources, risingwave_meta_model::source},
53 {streaming_jobs, risingwave_meta_model::streaming_job},
54 {subscriptions, risingwave_meta_model::subscription},
55 {system_parameters, risingwave_meta_model::system_parameter},
56 {tables, risingwave_meta_model::table},
57 {users, risingwave_meta_model::user},
58 {user_privileges, risingwave_meta_model::user_privilege},
59 {views, risingwave_meta_model::view},
60 {workers, risingwave_meta_model::worker},
61 {worker_properties, risingwave_meta_model::worker_property},
62 {hummock_sequences, risingwave_meta_model::hummock_sequence},
63 {session_parameters, risingwave_meta_model::session_parameter},
64 {secrets, risingwave_meta_model::secret}
65 }
66 };
67}
68
69macro_rules! define_metadata_v2 {
70 ($({ $name:ident, $mod_path:ident::$mod_name:ident }),*) => {
71 #[derive(Default)]
72 pub struct MetadataV2 {
73 pub hummock_version: HummockVersion,
74 $(
75 pub $name: Vec<$mod_path::$mod_name::Model>,
76 )*
77 }
78 };
79}
80
81for_all_metadata_models_v2!(define_metadata_v2);
82
83macro_rules! define_encode_metadata {
84 ($( {$name:ident, $mod_path:ident::$mod_name:ident} ),*) => {
85 fn encode_metadata(
86 metadata: &MetadataV2,
87 buf: &mut Vec<u8>,
88 ) -> BackupResult<()> {
89 let mut _idx = 0;
90 $(
91 if _idx == 1 {
92 put_1(buf, &metadata.hummock_version.to_protobuf())?;
93 }
94 put_n(buf, &metadata.$name)?;
95 _idx += 1;
96 )*
97 Ok(())
98 }
99 };
100}
101
102for_all_metadata_models_v2!(define_encode_metadata);
103
104macro_rules! define_decode_metadata {
105 ($( {$name:ident, $mod_path:ident::$mod_name:ident} ),*) => {
106 fn decode_metadata(
107 metadata: &mut MetadataV2,
108 mut buf: &[u8],
109 ) -> BackupResult<()> {
110 let mut _idx = 0;
111 $(
112 if _idx == 1 {
113 metadata.hummock_version = HummockVersion::from_persisted_protobuf(&get_1(&mut buf)?);
114 }
115 metadata.$name = get_n(&mut buf)?;
116 _idx += 1;
117 )*
118 Ok(())
119 }
120 };
121}
122
123for_all_metadata_models_v2!(define_decode_metadata);
124
125impl Display for MetadataV2 {
126 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
127 writeln!(f, "clusters: {:#?}", self.clusters)?;
128 writeln!(
129 f,
130 "Hummock version: id {}, committed_epoch: {:?}",
131 self.hummock_version.id,
132 self.hummock_version.state_table_info.info(),
133 )?;
134 Ok(())
136 }
137}
138
139impl Metadata for MetadataV2 {
140 fn encode_to(&self, buf: &mut Vec<u8>) -> BackupResult<()> {
141 encode_metadata(self, buf)
142 }
143
144 fn decode(buf: &[u8]) -> BackupResult<Self>
145 where
146 Self: Sized,
147 {
148 let mut metadata = Self::default();
149 decode_metadata(&mut metadata, buf)?;
150 Ok(metadata)
151 }
152
153 fn hummock_version_ref(&self) -> &HummockVersion {
154 &self.hummock_version
155 }
156
157 fn hummock_version(self) -> HummockVersion {
158 self.hummock_version
159 }
160}
161
162fn put_n<T: Serialize>(buf: &mut Vec<u8>, data: &[T]) -> Result<(), serde_json::Error> {
163 buf.put_u32_le(
164 data.len()
165 .try_into()
166 .unwrap_or_else(|_| panic!("cannot convert {} into u32", data.len())),
167 );
168 for d in data {
169 put_with_len_prefix(buf, d)?;
170 }
171 Ok(())
172}
173
174fn put_1<T: Serialize>(buf: &mut Vec<u8>, data: &T) -> Result<(), serde_json::Error> {
175 put_n(buf, &[data])
176}
177
178fn get_n<'a, T: Deserialize<'a>>(buf: &mut &'a [u8]) -> Result<Vec<T>, serde_json::Error> {
179 let n = buf.get_u32_le() as usize;
180 let mut elements = Vec::with_capacity(n);
181 for _ in 0..n {
182 elements.push(get_with_len_prefix(buf)?);
183 }
184 Ok(elements)
185}
186
187fn get_1<'a, T: Deserialize<'a>>(buf: &mut &'a [u8]) -> Result<T, serde_json::Error> {
188 let elements = get_n(buf)?;
189 assert_eq!(elements.len(), 1);
190 Ok(elements.into_iter().next().unwrap())
191}
192
193fn put_with_len_prefix<T: Serialize>(buf: &mut Vec<u8>, data: &T) -> Result<(), serde_json::Error> {
194 let b = serde_json::to_vec(data)?;
195 buf.put_u32_le(
196 b.len()
197 .try_into()
198 .unwrap_or_else(|_| panic!("cannot convert {} into u32", b.len())),
199 );
200 buf.put_slice(&b);
201 Ok(())
202}
203
204fn get_with_len_prefix<'a, T: Deserialize<'a>>(buf: &mut &'a [u8]) -> Result<T, serde_json::Error> {
205 let len = buf.get_u32_le() as usize;
206 let d = serde_json::from_slice(&buf[..len])?;
207 buf.advance(len);
208 Ok(d)
209}