1use std::collections::{HashMap, HashSet};
16use std::fmt::{Display, Formatter};
17
18use bytes::{Buf, BufMut};
19use itertools::Itertools;
20use risingwave_common::util::iter_util::ZipEqFast;
21use risingwave_hummock_sdk::HummockRawObjectId;
22use risingwave_hummock_sdk::version::HummockVersion;
23use risingwave_pb::catalog::{
24 Connection, Database, Function, Index, Schema, Secret, Sink, Source, Subscription, Table, View,
25};
26use risingwave_pb::hummock::{CompactionGroup, HummockVersionStats, PbHummockVersion};
27use risingwave_pb::meta::{SystemParams, TableFragments};
28use risingwave_pb::user::UserInfo;
29
30use crate::error::{BackupError, BackupResult};
31use crate::meta_snapshot::{MetaSnapshot, Metadata};
32
33pub type MetaSnapshotV1 = MetaSnapshot<ClusterMetadata>;
35
36impl Display for ClusterMetadata {
37 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
38 writeln!(f, "default_cf:")?;
39 for (k, v) in &self.default_cf {
40 let key = String::from_utf8(k.clone()).unwrap();
41 writeln!(f, "{} {:x?}", key, v)?;
42 }
43 writeln!(f, "hummock_version:")?;
44 writeln!(f, "{:#?}", self.hummock_version)?;
45 writeln!(f, "version_stats:")?;
46 writeln!(f, "{:#?}", self.version_stats)?;
47 writeln!(f, "compaction_groups:")?;
48 writeln!(f, "{:#?}", self.compaction_groups)?;
49 writeln!(f, "database:")?;
50 writeln!(f, "{:#?}", self.database)?;
51 writeln!(f, "schema:")?;
52 writeln!(f, "{:#?}", self.schema)?;
53 writeln!(f, "table:")?;
54 writeln!(f, "{:#?}", self.table)?;
55 writeln!(f, "index:")?;
56 writeln!(f, "{:#?}", self.index)?;
57 writeln!(f, "sink:")?;
58 writeln!(f, "{:#?}", self.sink)?;
59 writeln!(f, "source:")?;
60 writeln!(f, "{:#?}", self.source)?;
61 writeln!(f, "view:")?;
62 writeln!(f, "{:#?}", self.view)?;
63 writeln!(f, "connection:")?;
64 writeln!(f, "{:#?}", self.connection)?;
65 writeln!(f, "table_fragments:")?;
66 writeln!(f, "{:#?}", self.table_fragments)?;
67 writeln!(f, "user_info:")?;
68 writeln!(f, "{:#?}", self.user_info)?;
69 writeln!(f, "function:")?;
70 writeln!(f, "{:#?}", self.function)?;
71 writeln!(f, "system_param:")?;
72 writeln!(f, "{:#?}", self.system_param)?;
73 writeln!(f, "cluster_id:")?;
74 writeln!(f, "{:#?}", self.cluster_id)?;
75 writeln!(f, "subscription:")?;
76 writeln!(f, "{:#?}", self.subscription)?;
77 writeln!(f, "secret:")?;
78 writeln!(f, "{:#?}", self.secret)?;
79 Ok(())
80 }
81}
82
83impl Metadata for ClusterMetadata {
84 fn encode_to(&self, buf: &mut Vec<u8>) -> BackupResult<()> {
85 self.encode_to(buf)
86 }
87
88 fn decode(buf: &[u8]) -> BackupResult<Self>
89 where
90 Self: Sized,
91 {
92 ClusterMetadata::decode(buf)
93 }
94
95 fn hummock_version_ref(&self) -> &HummockVersion {
96 &self.hummock_version
97 }
98
99 fn hummock_version(self) -> HummockVersion {
100 self.hummock_version
101 }
102
103 fn storage_url(&self) -> BackupResult<String> {
104 unreachable!("");
105 }
106
107 fn storage_directory(&self) -> BackupResult<String> {
108 unreachable!("");
109 }
110
111 fn table_change_log_object_ids(&self) -> HashSet<HummockRawObjectId> {
112 HashSet::default()
113 }
114}
115
116#[derive(Debug, Default, Clone, PartialEq)]
118pub struct ClusterMetadata {
119 pub default_cf: HashMap<Vec<u8>, Vec<u8>>,
123 pub hummock_version: HummockVersion,
124 pub version_stats: HummockVersionStats,
125 pub compaction_groups: Vec<CompactionGroup>,
126 pub database: Vec<Database>,
127 pub schema: Vec<Schema>,
128 pub table: Vec<Table>,
129 pub index: Vec<Index>,
130 pub sink: Vec<Sink>,
131 pub source: Vec<Source>,
132 pub view: Vec<View>,
133 pub table_fragments: Vec<TableFragments>,
134 pub user_info: Vec<UserInfo>,
135 pub function: Vec<Function>,
136 pub connection: Vec<Connection>,
137 pub system_param: SystemParams,
138 pub cluster_id: String,
139 pub subscription: Vec<Subscription>,
140 pub secret: Vec<Secret>,
141}
142
143impl ClusterMetadata {
144 pub fn encode_to(&self, buf: &mut Vec<u8>) -> BackupResult<()> {
145 let default_cf_keys = self.default_cf.keys().collect_vec();
146 let default_cf_values = self.default_cf.values().collect_vec();
147 Self::encode_prost_message_list(&default_cf_keys, buf);
148 Self::encode_prost_message_list(&default_cf_values, buf);
149 Self::encode_prost_message(&PbHummockVersion::from(&self.hummock_version), buf);
150 Self::encode_prost_message(&self.version_stats, buf);
151 Self::encode_prost_message_list(&self.compaction_groups.iter().collect_vec(), buf);
152 Self::encode_prost_message_list(&self.table_fragments.iter().collect_vec(), buf);
153 Self::encode_prost_message_list(&self.user_info.iter().collect_vec(), buf);
154 Self::encode_prost_message_list(&self.database.iter().collect_vec(), buf);
155 Self::encode_prost_message_list(&self.schema.iter().collect_vec(), buf);
156 Self::encode_prost_message_list(&self.table.iter().collect_vec(), buf);
157 Self::encode_prost_message_list(&self.index.iter().collect_vec(), buf);
158 Self::encode_prost_message_list(&self.sink.iter().collect_vec(), buf);
159 Self::encode_prost_message_list(&self.source.iter().collect_vec(), buf);
160 Self::encode_prost_message_list(&self.view.iter().collect_vec(), buf);
161 Self::encode_prost_message_list(&self.function.iter().collect_vec(), buf);
162 Self::encode_prost_message_list(&self.connection.iter().collect_vec(), buf);
163 Self::encode_prost_message(&self.system_param, buf);
164 Self::encode_prost_message(&self.cluster_id, buf);
165 Self::encode_prost_message_list(&self.subscription.iter().collect_vec(), buf);
166 Self::encode_prost_message_list(&self.secret.iter().collect_vec(), buf);
167 Ok(())
168 }
169
170 pub fn decode(mut buf: &[u8]) -> BackupResult<Self> {
171 let default_cf_keys: Vec<Vec<u8>> = Self::decode_prost_message_list(&mut buf)?;
172 let default_cf_values: Vec<Vec<u8>> = Self::decode_prost_message_list(&mut buf)?;
173 let default_cf = default_cf_keys
174 .into_iter()
175 .zip_eq_fast(default_cf_values.into_iter())
176 .collect();
177 let hummock_version =
178 HummockVersion::from_persisted_protobuf_owned(Self::decode_prost_message(&mut buf)?);
179 let version_stats = Self::decode_prost_message(&mut buf)?;
180 let compaction_groups: Vec<CompactionGroup> = Self::decode_prost_message_list(&mut buf)?;
181 let table_fragments: Vec<TableFragments> = Self::decode_prost_message_list(&mut buf)?;
182 let user_info: Vec<UserInfo> = Self::decode_prost_message_list(&mut buf)?;
183 let database: Vec<Database> = Self::decode_prost_message_list(&mut buf)?;
184 let schema: Vec<Schema> = Self::decode_prost_message_list(&mut buf)?;
185 let table: Vec<Table> = Self::decode_prost_message_list(&mut buf)?;
186 let index: Vec<Index> = Self::decode_prost_message_list(&mut buf)?;
187 let sink: Vec<Sink> = Self::decode_prost_message_list(&mut buf)?;
188 let source: Vec<Source> = Self::decode_prost_message_list(&mut buf)?;
189 let view: Vec<View> = Self::decode_prost_message_list(&mut buf)?;
190 let function: Vec<Function> = Self::decode_prost_message_list(&mut buf)?;
191 let connection: Vec<Connection> = Self::decode_prost_message_list(&mut buf)?;
192 let system_param: SystemParams = Self::decode_prost_message(&mut buf)?;
193 let cluster_id: String = Self::decode_prost_message(&mut buf)?;
194 let subscription: Vec<Subscription> =
195 Self::try_decode_prost_message_list(&mut buf).unwrap_or_else(|| Ok(vec![]))?;
196 let secret: Vec<Secret> =
197 Self::try_decode_prost_message_list(&mut buf).unwrap_or_else(|| Ok(vec![]))?;
198
199 Ok(Self {
200 default_cf,
201 hummock_version,
202 version_stats,
203 compaction_groups,
204 database,
205 schema,
206 table,
207 index,
208 sink,
209 source,
210 view,
211 table_fragments,
212 user_info,
213 function,
214 connection,
215 system_param,
216 cluster_id,
217 subscription,
218 secret,
219 })
220 }
221
222 fn encode_prost_message(message: &impl prost::Message, buf: &mut Vec<u8>) {
223 let encoded_message = message.encode_to_vec();
224 buf.put_u32_le(encoded_message.len() as u32);
225 buf.put_slice(&encoded_message);
226 }
227
228 fn decode_prost_message<T>(buf: &mut &[u8]) -> BackupResult<T>
229 where
230 T: prost::Message + Default,
231 {
232 let len = buf.get_u32_le() as usize;
233 let v = buf[..len].to_vec();
234 buf.advance(len);
235 T::decode(v.as_slice()).map_err(|e| BackupError::Decoding(e.into()))
236 }
237
238 fn encode_prost_message_list(messages: &[&impl prost::Message], buf: &mut Vec<u8>) {
239 buf.put_u32_le(messages.len() as u32);
240 for message in messages {
241 Self::encode_prost_message(*message, buf);
242 }
243 }
244
245 fn decode_prost_message_list<T>(buf: &mut &[u8]) -> BackupResult<Vec<T>>
246 where
247 T: prost::Message + Default,
248 {
249 let vec_len = buf.get_u32_le() as usize;
250 let mut result = vec![];
251 for _ in 0..vec_len {
252 let v: T = Self::decode_prost_message(buf)?;
253 result.push(v);
254 }
255 Ok(result)
256 }
257
258 fn try_decode_prost_message_list<T>(buf: &mut &[u8]) -> Option<BackupResult<Vec<T>>>
259 where
260 T: prost::Message + Default,
261 {
262 if buf.is_empty() {
263 return None;
264 }
265 Some(Self::decode_prost_message_list(buf))
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use risingwave_hummock_sdk::HummockVersionId;
272 use risingwave_pb::hummock::{CompactionGroup, TableStats};
273
274 use crate::meta_snapshot_v1::{ClusterMetadata, MetaSnapshotV1};
275
276 type MetaSnapshot = MetaSnapshotV1;
277
278 #[test]
279 fn test_snapshot_encoding_decoding() {
280 let mut metadata = ClusterMetadata::default();
281 metadata.hummock_version.id = HummockVersionId::new(321);
282 let raw = MetaSnapshot {
283 format_version: 0,
284 id: 123,
285 metadata,
286 };
287 let encoded = raw.encode().unwrap();
288 let decoded = MetaSnapshot::decode(&encoded).unwrap();
289 assert_eq!(raw, decoded);
290 }
291
292 #[test]
293 fn test_metadata_encoding_decoding() {
294 let mut buf = vec![];
295 let mut raw = ClusterMetadata::default();
296 raw.default_cf.insert(vec![0, 1, 2], vec![3, 4, 5]);
297 raw.hummock_version.id = HummockVersionId::new(1);
298 raw.version_stats.hummock_version_id = 10.into();
299 raw.version_stats.table_stats.insert(
300 200.into(),
301 TableStats {
302 total_key_count: 1000,
303 ..Default::default()
304 },
305 );
306 raw.compaction_groups.push(CompactionGroup {
307 id: 3000.into(),
308 ..Default::default()
309 });
310 raw.encode_to(&mut buf).unwrap();
311 let decoded = ClusterMetadata::decode(buf.as_slice()).unwrap();
312 assert_eq!(raw, decoded);
313 }
314}