1use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17
18use bytes::{Buf, BufMut};
19use itertools::Itertools;
20use risingwave_common::util::iter_util::ZipEqFast;
21use risingwave_hummock_sdk::version::HummockVersion;
22use risingwave_pb::catalog::{
23 Connection, Database, Function, Index, Schema, Secret, Sink, Source, Subscription, Table, View,
24};
25use risingwave_pb::hummock::{CompactionGroup, HummockVersionStats, PbHummockVersion};
26use risingwave_pb::meta::{SystemParams, TableFragments};
27use risingwave_pb::user::UserInfo;
28
29use crate::error::{BackupError, BackupResult};
30use crate::meta_snapshot::{MetaSnapshot, Metadata};
31
32pub type MetaSnapshotV1 = MetaSnapshot<ClusterMetadata>;
34
35impl Display for ClusterMetadata {
36 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
37 writeln!(f, "default_cf:")?;
38 for (k, v) in &self.default_cf {
39 let key = String::from_utf8(k.clone()).unwrap();
40 writeln!(f, "{} {:x?}", key, v)?;
41 }
42 writeln!(f, "hummock_version:")?;
43 writeln!(f, "{:#?}", self.hummock_version)?;
44 writeln!(f, "version_stats:")?;
45 writeln!(f, "{:#?}", self.version_stats)?;
46 writeln!(f, "compaction_groups:")?;
47 writeln!(f, "{:#?}", self.compaction_groups)?;
48 writeln!(f, "database:")?;
49 writeln!(f, "{:#?}", self.database)?;
50 writeln!(f, "schema:")?;
51 writeln!(f, "{:#?}", self.schema)?;
52 writeln!(f, "table:")?;
53 writeln!(f, "{:#?}", self.table)?;
54 writeln!(f, "index:")?;
55 writeln!(f, "{:#?}", self.index)?;
56 writeln!(f, "sink:")?;
57 writeln!(f, "{:#?}", self.sink)?;
58 writeln!(f, "source:")?;
59 writeln!(f, "{:#?}", self.source)?;
60 writeln!(f, "view:")?;
61 writeln!(f, "{:#?}", self.view)?;
62 writeln!(f, "connection:")?;
63 writeln!(f, "{:#?}", self.connection)?;
64 writeln!(f, "table_fragments:")?;
65 writeln!(f, "{:#?}", self.table_fragments)?;
66 writeln!(f, "user_info:")?;
67 writeln!(f, "{:#?}", self.user_info)?;
68 writeln!(f, "function:")?;
69 writeln!(f, "{:#?}", self.function)?;
70 writeln!(f, "system_param:")?;
71 writeln!(f, "{:#?}", self.system_param)?;
72 writeln!(f, "cluster_id:")?;
73 writeln!(f, "{:#?}", self.cluster_id)?;
74 writeln!(f, "subscription:")?;
75 writeln!(f, "{:#?}", self.subscription)?;
76 writeln!(f, "secret:")?;
77 writeln!(f, "{:#?}", self.secret)?;
78 Ok(())
79 }
80}
81
82impl Metadata for ClusterMetadata {
83 fn encode_to(&self, buf: &mut Vec<u8>) -> BackupResult<()> {
84 self.encode_to(buf)
85 }
86
87 fn decode(buf: &[u8]) -> BackupResult<Self>
88 where
89 Self: Sized,
90 {
91 ClusterMetadata::decode(buf)
92 }
93
94 fn hummock_version_ref(&self) -> &HummockVersion {
95 &self.hummock_version
96 }
97
98 fn hummock_version(self) -> HummockVersion {
99 self.hummock_version
100 }
101
102 fn storage_url(&self) -> BackupResult<String> {
103 unreachable!("");
104 }
105
106 fn storage_directory(&self) -> BackupResult<String> {
107 unreachable!("");
108 }
109}
110
111#[derive(Debug, Default, Clone, PartialEq)]
113pub struct ClusterMetadata {
114 pub default_cf: HashMap<Vec<u8>, Vec<u8>>,
118 pub hummock_version: HummockVersion,
119 pub version_stats: HummockVersionStats,
120 pub compaction_groups: Vec<CompactionGroup>,
121 pub database: Vec<Database>,
122 pub schema: Vec<Schema>,
123 pub table: Vec<Table>,
124 pub index: Vec<Index>,
125 pub sink: Vec<Sink>,
126 pub source: Vec<Source>,
127 pub view: Vec<View>,
128 pub table_fragments: Vec<TableFragments>,
129 pub user_info: Vec<UserInfo>,
130 pub function: Vec<Function>,
131 pub connection: Vec<Connection>,
132 pub system_param: SystemParams,
133 pub cluster_id: String,
134 pub subscription: Vec<Subscription>,
135 pub secret: Vec<Secret>,
136}
137
138impl ClusterMetadata {
139 pub fn encode_to(&self, buf: &mut Vec<u8>) -> BackupResult<()> {
140 let default_cf_keys = self.default_cf.keys().collect_vec();
141 let default_cf_values = self.default_cf.values().collect_vec();
142 Self::encode_prost_message_list(&default_cf_keys, buf);
143 Self::encode_prost_message_list(&default_cf_values, buf);
144 Self::encode_prost_message(&PbHummockVersion::from(&self.hummock_version), buf);
145 Self::encode_prost_message(&self.version_stats, buf);
146 Self::encode_prost_message_list(&self.compaction_groups.iter().collect_vec(), buf);
147 Self::encode_prost_message_list(&self.table_fragments.iter().collect_vec(), buf);
148 Self::encode_prost_message_list(&self.user_info.iter().collect_vec(), buf);
149 Self::encode_prost_message_list(&self.database.iter().collect_vec(), buf);
150 Self::encode_prost_message_list(&self.schema.iter().collect_vec(), buf);
151 Self::encode_prost_message_list(&self.table.iter().collect_vec(), buf);
152 Self::encode_prost_message_list(&self.index.iter().collect_vec(), buf);
153 Self::encode_prost_message_list(&self.sink.iter().collect_vec(), buf);
154 Self::encode_prost_message_list(&self.source.iter().collect_vec(), buf);
155 Self::encode_prost_message_list(&self.view.iter().collect_vec(), buf);
156 Self::encode_prost_message_list(&self.function.iter().collect_vec(), buf);
157 Self::encode_prost_message_list(&self.connection.iter().collect_vec(), buf);
158 Self::encode_prost_message(&self.system_param, buf);
159 Self::encode_prost_message(&self.cluster_id, buf);
160 Self::encode_prost_message_list(&self.subscription.iter().collect_vec(), buf);
161 Self::encode_prost_message_list(&self.secret.iter().collect_vec(), buf);
162 Ok(())
163 }
164
165 pub fn decode(mut buf: &[u8]) -> BackupResult<Self> {
166 let default_cf_keys: Vec<Vec<u8>> = Self::decode_prost_message_list(&mut buf)?;
167 let default_cf_values: Vec<Vec<u8>> = Self::decode_prost_message_list(&mut buf)?;
168 let default_cf = default_cf_keys
169 .into_iter()
170 .zip_eq_fast(default_cf_values.into_iter())
171 .collect();
172 let hummock_version =
173 HummockVersion::from_persisted_protobuf(&Self::decode_prost_message(&mut buf)?);
174 let version_stats = Self::decode_prost_message(&mut buf)?;
175 let compaction_groups: Vec<CompactionGroup> = Self::decode_prost_message_list(&mut buf)?;
176 let table_fragments: Vec<TableFragments> = Self::decode_prost_message_list(&mut buf)?;
177 let user_info: Vec<UserInfo> = Self::decode_prost_message_list(&mut buf)?;
178 let database: Vec<Database> = Self::decode_prost_message_list(&mut buf)?;
179 let schema: Vec<Schema> = Self::decode_prost_message_list(&mut buf)?;
180 let table: Vec<Table> = Self::decode_prost_message_list(&mut buf)?;
181 let index: Vec<Index> = Self::decode_prost_message_list(&mut buf)?;
182 let sink: Vec<Sink> = Self::decode_prost_message_list(&mut buf)?;
183 let source: Vec<Source> = Self::decode_prost_message_list(&mut buf)?;
184 let view: Vec<View> = Self::decode_prost_message_list(&mut buf)?;
185 let function: Vec<Function> = Self::decode_prost_message_list(&mut buf)?;
186 let connection: Vec<Connection> = Self::decode_prost_message_list(&mut buf)?;
187 let system_param: SystemParams = Self::decode_prost_message(&mut buf)?;
188 let cluster_id: String = Self::decode_prost_message(&mut buf)?;
189 let subscription: Vec<Subscription> =
190 Self::try_decode_prost_message_list(&mut buf).unwrap_or_else(|| Ok(vec![]))?;
191 let secret: Vec<Secret> =
192 Self::try_decode_prost_message_list(&mut buf).unwrap_or_else(|| Ok(vec![]))?;
193
194 Ok(Self {
195 default_cf,
196 hummock_version,
197 version_stats,
198 compaction_groups,
199 database,
200 schema,
201 table,
202 index,
203 sink,
204 source,
205 view,
206 table_fragments,
207 user_info,
208 function,
209 connection,
210 system_param,
211 cluster_id,
212 subscription,
213 secret,
214 })
215 }
216
217 fn encode_prost_message(message: &impl prost::Message, buf: &mut Vec<u8>) {
218 let encoded_message = message.encode_to_vec();
219 buf.put_u32_le(encoded_message.len() as u32);
220 buf.put_slice(&encoded_message);
221 }
222
223 fn decode_prost_message<T>(buf: &mut &[u8]) -> BackupResult<T>
224 where
225 T: prost::Message + Default,
226 {
227 let len = buf.get_u32_le() as usize;
228 let v = buf[..len].to_vec();
229 buf.advance(len);
230 T::decode(v.as_slice()).map_err(|e| BackupError::Decoding(e.into()))
231 }
232
233 fn encode_prost_message_list(messages: &[&impl prost::Message], buf: &mut Vec<u8>) {
234 buf.put_u32_le(messages.len() as u32);
235 for message in messages {
236 Self::encode_prost_message(*message, buf);
237 }
238 }
239
240 fn decode_prost_message_list<T>(buf: &mut &[u8]) -> BackupResult<Vec<T>>
241 where
242 T: prost::Message + Default,
243 {
244 let vec_len = buf.get_u32_le() as usize;
245 let mut result = vec![];
246 for _ in 0..vec_len {
247 let v: T = Self::decode_prost_message(buf)?;
248 result.push(v);
249 }
250 Ok(result)
251 }
252
253 fn try_decode_prost_message_list<T>(buf: &mut &[u8]) -> Option<BackupResult<Vec<T>>>
254 where
255 T: prost::Message + Default,
256 {
257 if buf.is_empty() {
258 return None;
259 }
260 Some(Self::decode_prost_message_list(buf))
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use risingwave_hummock_sdk::HummockVersionId;
267 use risingwave_pb::hummock::{CompactionGroup, TableStats};
268
269 use crate::meta_snapshot_v1::{ClusterMetadata, MetaSnapshotV1};
270
271 type MetaSnapshot = MetaSnapshotV1;
272
273 #[test]
274 fn test_snapshot_encoding_decoding() {
275 let mut metadata = ClusterMetadata::default();
276 metadata.hummock_version.id = HummockVersionId::new(321);
277 let raw = MetaSnapshot {
278 format_version: 0,
279 id: 123,
280 metadata,
281 };
282 let encoded = raw.encode().unwrap();
283 let decoded = MetaSnapshot::decode(&encoded).unwrap();
284 assert_eq!(raw, decoded);
285 }
286
287 #[test]
288 fn test_metadata_encoding_decoding() {
289 let mut buf = vec![];
290 let mut raw = ClusterMetadata::default();
291 raw.default_cf.insert(vec![0, 1, 2], vec![3, 4, 5]);
292 raw.hummock_version.id = HummockVersionId::new(1);
293 raw.version_stats.hummock_version_id = 10.into();
294 raw.version_stats.table_stats.insert(
295 200.into(),
296 TableStats {
297 total_key_count: 1000,
298 ..Default::default()
299 },
300 );
301 raw.compaction_groups.push(CompactionGroup {
302 id: 3000.into(),
303 ..Default::default()
304 });
305 raw.encode_to(&mut buf).unwrap();
306 let decoded = ClusterMetadata::decode(buf.as_slice()).unwrap();
307 assert_eq!(raw, decoded);
308 }
309}