1use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17
18use bytes::{Buf, BufMut};
19use itertools::Itertools;
20use risingwave_common::util::iter_util::ZipEqFast;
21use risingwave_hummock_sdk::version::HummockVersion;
22use risingwave_pb::catalog::{
23 Connection, Database, Function, Index, Schema, Secret, Sink, Source, Subscription, Table, View,
24};
25use risingwave_pb::hummock::{CompactionGroup, HummockVersionStats, PbHummockVersion};
26use risingwave_pb::meta::{SystemParams, TableFragments};
27use risingwave_pb::user::UserInfo;
28
29use crate::error::{BackupError, BackupResult};
30use crate::meta_snapshot::{MetaSnapshot, Metadata};
31
32pub type MetaSnapshotV1 = MetaSnapshot<ClusterMetadata>;
34
35impl Display for ClusterMetadata {
36 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
37 writeln!(f, "default_cf:")?;
38 for (k, v) in &self.default_cf {
39 let key = String::from_utf8(k.clone()).unwrap();
40 writeln!(f, "{} {:x?}", key, v)?;
41 }
42 writeln!(f, "hummock_version:")?;
43 writeln!(f, "{:#?}", self.hummock_version)?;
44 writeln!(f, "version_stats:")?;
45 writeln!(f, "{:#?}", self.version_stats)?;
46 writeln!(f, "compaction_groups:")?;
47 writeln!(f, "{:#?}", self.compaction_groups)?;
48 writeln!(f, "database:")?;
49 writeln!(f, "{:#?}", self.database)?;
50 writeln!(f, "schema:")?;
51 writeln!(f, "{:#?}", self.schema)?;
52 writeln!(f, "table:")?;
53 writeln!(f, "{:#?}", self.table)?;
54 writeln!(f, "index:")?;
55 writeln!(f, "{:#?}", self.index)?;
56 writeln!(f, "sink:")?;
57 writeln!(f, "{:#?}", self.sink)?;
58 writeln!(f, "source:")?;
59 writeln!(f, "{:#?}", self.source)?;
60 writeln!(f, "view:")?;
61 writeln!(f, "{:#?}", self.view)?;
62 writeln!(f, "connection:")?;
63 writeln!(f, "{:#?}", self.connection)?;
64 writeln!(f, "table_fragments:")?;
65 writeln!(f, "{:#?}", self.table_fragments)?;
66 writeln!(f, "user_info:")?;
67 writeln!(f, "{:#?}", self.user_info)?;
68 writeln!(f, "function:")?;
69 writeln!(f, "{:#?}", self.function)?;
70 writeln!(f, "system_param:")?;
71 writeln!(f, "{:#?}", self.system_param)?;
72 writeln!(f, "cluster_id:")?;
73 writeln!(f, "{:#?}", self.cluster_id)?;
74 writeln!(f, "subscription:")?;
75 writeln!(f, "{:#?}", self.subscription)?;
76 writeln!(f, "secret:")?;
77 writeln!(f, "{:#?}", self.secret)?;
78 Ok(())
79 }
80}
81
82impl Metadata for ClusterMetadata {
83 fn encode_to(&self, buf: &mut Vec<u8>) -> BackupResult<()> {
84 self.encode_to(buf)
85 }
86
87 fn decode(buf: &[u8]) -> BackupResult<Self>
88 where
89 Self: Sized,
90 {
91 ClusterMetadata::decode(buf)
92 }
93
94 fn hummock_version_ref(&self) -> &HummockVersion {
95 &self.hummock_version
96 }
97
98 fn hummock_version(self) -> HummockVersion {
99 self.hummock_version
100 }
101}
102
103#[derive(Debug, Default, Clone, PartialEq)]
105pub struct ClusterMetadata {
106 pub default_cf: HashMap<Vec<u8>, Vec<u8>>,
110 pub hummock_version: HummockVersion,
111 pub version_stats: HummockVersionStats,
112 pub compaction_groups: Vec<CompactionGroup>,
113 pub database: Vec<Database>,
114 pub schema: Vec<Schema>,
115 pub table: Vec<Table>,
116 pub index: Vec<Index>,
117 pub sink: Vec<Sink>,
118 pub source: Vec<Source>,
119 pub view: Vec<View>,
120 pub table_fragments: Vec<TableFragments>,
121 pub user_info: Vec<UserInfo>,
122 pub function: Vec<Function>,
123 pub connection: Vec<Connection>,
124 pub system_param: SystemParams,
125 pub cluster_id: String,
126 pub subscription: Vec<Subscription>,
127 pub secret: Vec<Secret>,
128}
129
130impl ClusterMetadata {
131 pub fn encode_to(&self, buf: &mut Vec<u8>) -> BackupResult<()> {
132 let default_cf_keys = self.default_cf.keys().collect_vec();
133 let default_cf_values = self.default_cf.values().collect_vec();
134 Self::encode_prost_message_list(&default_cf_keys, buf);
135 Self::encode_prost_message_list(&default_cf_values, buf);
136 Self::encode_prost_message(&PbHummockVersion::from(&self.hummock_version), buf);
137 Self::encode_prost_message(&self.version_stats, buf);
138 Self::encode_prost_message_list(&self.compaction_groups.iter().collect_vec(), buf);
139 Self::encode_prost_message_list(&self.table_fragments.iter().collect_vec(), buf);
140 Self::encode_prost_message_list(&self.user_info.iter().collect_vec(), buf);
141 Self::encode_prost_message_list(&self.database.iter().collect_vec(), buf);
142 Self::encode_prost_message_list(&self.schema.iter().collect_vec(), buf);
143 Self::encode_prost_message_list(&self.table.iter().collect_vec(), buf);
144 Self::encode_prost_message_list(&self.index.iter().collect_vec(), buf);
145 Self::encode_prost_message_list(&self.sink.iter().collect_vec(), buf);
146 Self::encode_prost_message_list(&self.source.iter().collect_vec(), buf);
147 Self::encode_prost_message_list(&self.view.iter().collect_vec(), buf);
148 Self::encode_prost_message_list(&self.function.iter().collect_vec(), buf);
149 Self::encode_prost_message_list(&self.connection.iter().collect_vec(), buf);
150 Self::encode_prost_message(&self.system_param, buf);
151 Self::encode_prost_message(&self.cluster_id, buf);
152 Self::encode_prost_message_list(&self.subscription.iter().collect_vec(), buf);
153 Self::encode_prost_message_list(&self.secret.iter().collect_vec(), buf);
154 Ok(())
155 }
156
157 pub fn decode(mut buf: &[u8]) -> BackupResult<Self> {
158 let default_cf_keys: Vec<Vec<u8>> = Self::decode_prost_message_list(&mut buf)?;
159 let default_cf_values: Vec<Vec<u8>> = Self::decode_prost_message_list(&mut buf)?;
160 let default_cf = default_cf_keys
161 .into_iter()
162 .zip_eq_fast(default_cf_values.into_iter())
163 .collect();
164 let hummock_version =
165 HummockVersion::from_persisted_protobuf(&Self::decode_prost_message(&mut buf)?);
166 let version_stats = Self::decode_prost_message(&mut buf)?;
167 let compaction_groups: Vec<CompactionGroup> = Self::decode_prost_message_list(&mut buf)?;
168 let table_fragments: Vec<TableFragments> = Self::decode_prost_message_list(&mut buf)?;
169 let user_info: Vec<UserInfo> = Self::decode_prost_message_list(&mut buf)?;
170 let database: Vec<Database> = Self::decode_prost_message_list(&mut buf)?;
171 let schema: Vec<Schema> = Self::decode_prost_message_list(&mut buf)?;
172 let table: Vec<Table> = Self::decode_prost_message_list(&mut buf)?;
173 let index: Vec<Index> = Self::decode_prost_message_list(&mut buf)?;
174 let sink: Vec<Sink> = Self::decode_prost_message_list(&mut buf)?;
175 let source: Vec<Source> = Self::decode_prost_message_list(&mut buf)?;
176 let view: Vec<View> = Self::decode_prost_message_list(&mut buf)?;
177 let function: Vec<Function> = Self::decode_prost_message_list(&mut buf)?;
178 let connection: Vec<Connection> = Self::decode_prost_message_list(&mut buf)?;
179 let system_param: SystemParams = Self::decode_prost_message(&mut buf)?;
180 let cluster_id: String = Self::decode_prost_message(&mut buf)?;
181 let subscription: Vec<Subscription> =
182 Self::try_decode_prost_message_list(&mut buf).unwrap_or_else(|| Ok(vec![]))?;
183 let secret: Vec<Secret> =
184 Self::try_decode_prost_message_list(&mut buf).unwrap_or_else(|| Ok(vec![]))?;
185
186 Ok(Self {
187 default_cf,
188 hummock_version,
189 version_stats,
190 compaction_groups,
191 database,
192 schema,
193 table,
194 index,
195 sink,
196 source,
197 view,
198 table_fragments,
199 user_info,
200 function,
201 connection,
202 system_param,
203 cluster_id,
204 subscription,
205 secret,
206 })
207 }
208
209 fn encode_prost_message(message: &impl prost::Message, buf: &mut Vec<u8>) {
210 let encoded_message = message.encode_to_vec();
211 buf.put_u32_le(encoded_message.len() as u32);
212 buf.put_slice(&encoded_message);
213 }
214
215 fn decode_prost_message<T>(buf: &mut &[u8]) -> BackupResult<T>
216 where
217 T: prost::Message + Default,
218 {
219 let len = buf.get_u32_le() as usize;
220 let v = buf[..len].to_vec();
221 buf.advance(len);
222 T::decode(v.as_slice()).map_err(|e| BackupError::Decoding(e.into()))
223 }
224
225 fn encode_prost_message_list(messages: &[&impl prost::Message], buf: &mut Vec<u8>) {
226 buf.put_u32_le(messages.len() as u32);
227 for message in messages {
228 Self::encode_prost_message(*message, buf);
229 }
230 }
231
232 fn decode_prost_message_list<T>(buf: &mut &[u8]) -> BackupResult<Vec<T>>
233 where
234 T: prost::Message + Default,
235 {
236 let vec_len = buf.get_u32_le() as usize;
237 let mut result = vec![];
238 for _ in 0..vec_len {
239 let v: T = Self::decode_prost_message(buf)?;
240 result.push(v);
241 }
242 Ok(result)
243 }
244
245 fn try_decode_prost_message_list<T>(buf: &mut &[u8]) -> Option<BackupResult<Vec<T>>>
246 where
247 T: prost::Message + Default,
248 {
249 if buf.is_empty() {
250 return None;
251 }
252 Some(Self::decode_prost_message_list(buf))
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use risingwave_hummock_sdk::HummockVersionId;
259 use risingwave_pb::hummock::{CompactionGroup, TableStats};
260
261 use crate::meta_snapshot_v1::{ClusterMetadata, MetaSnapshotV1};
262
263 type MetaSnapshot = MetaSnapshotV1;
264
265 #[test]
266 fn test_snapshot_encoding_decoding() {
267 let mut metadata = ClusterMetadata::default();
268 metadata.hummock_version.id = HummockVersionId::new(321);
269 let raw = MetaSnapshot {
270 format_version: 0,
271 id: 123,
272 metadata,
273 };
274 let encoded = raw.encode().unwrap();
275 let decoded = MetaSnapshot::decode(&encoded).unwrap();
276 assert_eq!(raw, decoded);
277 }
278
279 #[test]
280 fn test_metadata_encoding_decoding() {
281 let mut buf = vec![];
282 let mut raw = ClusterMetadata::default();
283 raw.default_cf.insert(vec![0, 1, 2], vec![3, 4, 5]);
284 raw.hummock_version.id = HummockVersionId::new(1);
285 raw.version_stats.hummock_version_id = 10;
286 raw.version_stats.table_stats.insert(
287 200,
288 TableStats {
289 total_key_count: 1000,
290 ..Default::default()
291 },
292 );
293 raw.compaction_groups.push(CompactionGroup {
294 id: 3000,
295 ..Default::default()
296 });
297 raw.encode_to(&mut buf).unwrap();
298 let decoded = ClusterMetadata::decode(buf.as_slice()).unwrap();
299 assert_eq!(raw, decoded);
300 }
301}