risingwave_backup/
meta_snapshot_v1.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17
18use bytes::{Buf, BufMut};
19use itertools::Itertools;
20use risingwave_common::util::iter_util::ZipEqFast;
21use risingwave_hummock_sdk::version::HummockVersion;
22use risingwave_pb::catalog::{
23    Connection, Database, Function, Index, Schema, Secret, Sink, Source, Subscription, Table, View,
24};
25use risingwave_pb::hummock::{CompactionGroup, HummockVersionStats, PbHummockVersion};
26use risingwave_pb::meta::{SystemParams, TableFragments};
27use risingwave_pb::user::UserInfo;
28
29use crate::error::{BackupError, BackupResult};
30use crate::meta_snapshot::{MetaSnapshot, Metadata};
31
32// TODO: remove `ClusterMetadata` and even the trait, after applying model v2.
33pub type MetaSnapshotV1 = MetaSnapshot<ClusterMetadata>;
34
35impl Display for ClusterMetadata {
36    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
37        writeln!(f, "default_cf:")?;
38        for (k, v) in &self.default_cf {
39            let key = String::from_utf8(k.clone()).unwrap();
40            writeln!(f, "{} {:x?}", key, v)?;
41        }
42        writeln!(f, "hummock_version:")?;
43        writeln!(f, "{:#?}", self.hummock_version)?;
44        writeln!(f, "version_stats:")?;
45        writeln!(f, "{:#?}", self.version_stats)?;
46        writeln!(f, "compaction_groups:")?;
47        writeln!(f, "{:#?}", self.compaction_groups)?;
48        writeln!(f, "database:")?;
49        writeln!(f, "{:#?}", self.database)?;
50        writeln!(f, "schema:")?;
51        writeln!(f, "{:#?}", self.schema)?;
52        writeln!(f, "table:")?;
53        writeln!(f, "{:#?}", self.table)?;
54        writeln!(f, "index:")?;
55        writeln!(f, "{:#?}", self.index)?;
56        writeln!(f, "sink:")?;
57        writeln!(f, "{:#?}", self.sink)?;
58        writeln!(f, "source:")?;
59        writeln!(f, "{:#?}", self.source)?;
60        writeln!(f, "view:")?;
61        writeln!(f, "{:#?}", self.view)?;
62        writeln!(f, "connection:")?;
63        writeln!(f, "{:#?}", self.connection)?;
64        writeln!(f, "table_fragments:")?;
65        writeln!(f, "{:#?}", self.table_fragments)?;
66        writeln!(f, "user_info:")?;
67        writeln!(f, "{:#?}", self.user_info)?;
68        writeln!(f, "function:")?;
69        writeln!(f, "{:#?}", self.function)?;
70        writeln!(f, "system_param:")?;
71        writeln!(f, "{:#?}", self.system_param)?;
72        writeln!(f, "cluster_id:")?;
73        writeln!(f, "{:#?}", self.cluster_id)?;
74        writeln!(f, "subscription:")?;
75        writeln!(f, "{:#?}", self.subscription)?;
76        writeln!(f, "secret:")?;
77        writeln!(f, "{:#?}", self.secret)?;
78        Ok(())
79    }
80}
81
82impl Metadata for ClusterMetadata {
83    fn encode_to(&self, buf: &mut Vec<u8>) -> BackupResult<()> {
84        self.encode_to(buf)
85    }
86
87    fn decode(buf: &[u8]) -> BackupResult<Self>
88    where
89        Self: Sized,
90    {
91        ClusterMetadata::decode(buf)
92    }
93
94    fn hummock_version_ref(&self) -> &HummockVersion {
95        &self.hummock_version
96    }
97
98    fn hummock_version(self) -> HummockVersion {
99        self.hummock_version
100    }
101
102    fn storage_url(&self) -> BackupResult<String> {
103        unreachable!("");
104    }
105
106    fn storage_directory(&self) -> BackupResult<String> {
107        unreachable!("");
108    }
109}
110
111/// For backward compatibility, never remove fields and only append new field.
112#[derive(Debug, Default, Clone, PartialEq)]
113pub struct ClusterMetadata {
114    /// Unlike other metadata that has implemented `MetadataModel`,
115    /// `DEFAULT_COLUMN_FAMILY` stores various single row metadata, e.g. id offset and epoch offset.
116    /// So we use `default_cf` stores raw KVs for them.
117    pub default_cf: HashMap<Vec<u8>, Vec<u8>>,
118    pub hummock_version: HummockVersion,
119    pub version_stats: HummockVersionStats,
120    pub compaction_groups: Vec<CompactionGroup>,
121    pub database: Vec<Database>,
122    pub schema: Vec<Schema>,
123    pub table: Vec<Table>,
124    pub index: Vec<Index>,
125    pub sink: Vec<Sink>,
126    pub source: Vec<Source>,
127    pub view: Vec<View>,
128    pub table_fragments: Vec<TableFragments>,
129    pub user_info: Vec<UserInfo>,
130    pub function: Vec<Function>,
131    pub connection: Vec<Connection>,
132    pub system_param: SystemParams,
133    pub cluster_id: String,
134    pub subscription: Vec<Subscription>,
135    pub secret: Vec<Secret>,
136}
137
138impl ClusterMetadata {
139    pub fn encode_to(&self, buf: &mut Vec<u8>) -> BackupResult<()> {
140        let default_cf_keys = self.default_cf.keys().collect_vec();
141        let default_cf_values = self.default_cf.values().collect_vec();
142        Self::encode_prost_message_list(&default_cf_keys, buf);
143        Self::encode_prost_message_list(&default_cf_values, buf);
144        Self::encode_prost_message(&PbHummockVersion::from(&self.hummock_version), buf);
145        Self::encode_prost_message(&self.version_stats, buf);
146        Self::encode_prost_message_list(&self.compaction_groups.iter().collect_vec(), buf);
147        Self::encode_prost_message_list(&self.table_fragments.iter().collect_vec(), buf);
148        Self::encode_prost_message_list(&self.user_info.iter().collect_vec(), buf);
149        Self::encode_prost_message_list(&self.database.iter().collect_vec(), buf);
150        Self::encode_prost_message_list(&self.schema.iter().collect_vec(), buf);
151        Self::encode_prost_message_list(&self.table.iter().collect_vec(), buf);
152        Self::encode_prost_message_list(&self.index.iter().collect_vec(), buf);
153        Self::encode_prost_message_list(&self.sink.iter().collect_vec(), buf);
154        Self::encode_prost_message_list(&self.source.iter().collect_vec(), buf);
155        Self::encode_prost_message_list(&self.view.iter().collect_vec(), buf);
156        Self::encode_prost_message_list(&self.function.iter().collect_vec(), buf);
157        Self::encode_prost_message_list(&self.connection.iter().collect_vec(), buf);
158        Self::encode_prost_message(&self.system_param, buf);
159        Self::encode_prost_message(&self.cluster_id, buf);
160        Self::encode_prost_message_list(&self.subscription.iter().collect_vec(), buf);
161        Self::encode_prost_message_list(&self.secret.iter().collect_vec(), buf);
162        Ok(())
163    }
164
165    pub fn decode(mut buf: &[u8]) -> BackupResult<Self> {
166        let default_cf_keys: Vec<Vec<u8>> = Self::decode_prost_message_list(&mut buf)?;
167        let default_cf_values: Vec<Vec<u8>> = Self::decode_prost_message_list(&mut buf)?;
168        let default_cf = default_cf_keys
169            .into_iter()
170            .zip_eq_fast(default_cf_values.into_iter())
171            .collect();
172        let hummock_version =
173            HummockVersion::from_persisted_protobuf(&Self::decode_prost_message(&mut buf)?);
174        let version_stats = Self::decode_prost_message(&mut buf)?;
175        let compaction_groups: Vec<CompactionGroup> = Self::decode_prost_message_list(&mut buf)?;
176        let table_fragments: Vec<TableFragments> = Self::decode_prost_message_list(&mut buf)?;
177        let user_info: Vec<UserInfo> = Self::decode_prost_message_list(&mut buf)?;
178        let database: Vec<Database> = Self::decode_prost_message_list(&mut buf)?;
179        let schema: Vec<Schema> = Self::decode_prost_message_list(&mut buf)?;
180        let table: Vec<Table> = Self::decode_prost_message_list(&mut buf)?;
181        let index: Vec<Index> = Self::decode_prost_message_list(&mut buf)?;
182        let sink: Vec<Sink> = Self::decode_prost_message_list(&mut buf)?;
183        let source: Vec<Source> = Self::decode_prost_message_list(&mut buf)?;
184        let view: Vec<View> = Self::decode_prost_message_list(&mut buf)?;
185        let function: Vec<Function> = Self::decode_prost_message_list(&mut buf)?;
186        let connection: Vec<Connection> = Self::decode_prost_message_list(&mut buf)?;
187        let system_param: SystemParams = Self::decode_prost_message(&mut buf)?;
188        let cluster_id: String = Self::decode_prost_message(&mut buf)?;
189        let subscription: Vec<Subscription> =
190            Self::try_decode_prost_message_list(&mut buf).unwrap_or_else(|| Ok(vec![]))?;
191        let secret: Vec<Secret> =
192            Self::try_decode_prost_message_list(&mut buf).unwrap_or_else(|| Ok(vec![]))?;
193
194        Ok(Self {
195            default_cf,
196            hummock_version,
197            version_stats,
198            compaction_groups,
199            database,
200            schema,
201            table,
202            index,
203            sink,
204            source,
205            view,
206            table_fragments,
207            user_info,
208            function,
209            connection,
210            system_param,
211            cluster_id,
212            subscription,
213            secret,
214        })
215    }
216
217    fn encode_prost_message(message: &impl prost::Message, buf: &mut Vec<u8>) {
218        let encoded_message = message.encode_to_vec();
219        buf.put_u32_le(encoded_message.len() as u32);
220        buf.put_slice(&encoded_message);
221    }
222
223    fn decode_prost_message<T>(buf: &mut &[u8]) -> BackupResult<T>
224    where
225        T: prost::Message + Default,
226    {
227        let len = buf.get_u32_le() as usize;
228        let v = buf[..len].to_vec();
229        buf.advance(len);
230        T::decode(v.as_slice()).map_err(|e| BackupError::Decoding(e.into()))
231    }
232
233    fn encode_prost_message_list(messages: &[&impl prost::Message], buf: &mut Vec<u8>) {
234        buf.put_u32_le(messages.len() as u32);
235        for message in messages {
236            Self::encode_prost_message(*message, buf);
237        }
238    }
239
240    fn decode_prost_message_list<T>(buf: &mut &[u8]) -> BackupResult<Vec<T>>
241    where
242        T: prost::Message + Default,
243    {
244        let vec_len = buf.get_u32_le() as usize;
245        let mut result = vec![];
246        for _ in 0..vec_len {
247            let v: T = Self::decode_prost_message(buf)?;
248            result.push(v);
249        }
250        Ok(result)
251    }
252
253    fn try_decode_prost_message_list<T>(buf: &mut &[u8]) -> Option<BackupResult<Vec<T>>>
254    where
255        T: prost::Message + Default,
256    {
257        if buf.is_empty() {
258            return None;
259        }
260        Some(Self::decode_prost_message_list(buf))
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use risingwave_hummock_sdk::HummockVersionId;
267    use risingwave_pb::hummock::{CompactionGroup, TableStats};
268
269    use crate::meta_snapshot_v1::{ClusterMetadata, MetaSnapshotV1};
270
271    type MetaSnapshot = MetaSnapshotV1;
272
273    #[test]
274    fn test_snapshot_encoding_decoding() {
275        let mut metadata = ClusterMetadata::default();
276        metadata.hummock_version.id = HummockVersionId::new(321);
277        let raw = MetaSnapshot {
278            format_version: 0,
279            id: 123,
280            metadata,
281        };
282        let encoded = raw.encode().unwrap();
283        let decoded = MetaSnapshot::decode(&encoded).unwrap();
284        assert_eq!(raw, decoded);
285    }
286
287    #[test]
288    fn test_metadata_encoding_decoding() {
289        let mut buf = vec![];
290        let mut raw = ClusterMetadata::default();
291        raw.default_cf.insert(vec![0, 1, 2], vec![3, 4, 5]);
292        raw.hummock_version.id = HummockVersionId::new(1);
293        raw.version_stats.hummock_version_id = 10.into();
294        raw.version_stats.table_stats.insert(
295            200.into(),
296            TableStats {
297                total_key_count: 1000,
298                ..Default::default()
299            },
300        );
301        raw.compaction_groups.push(CompactionGroup {
302            id: 3000.into(),
303            ..Default::default()
304        });
305        raw.encode_to(&mut buf).unwrap();
306        let decoded = ClusterMetadata::decode(buf.as_slice()).unwrap();
307        assert_eq!(raw, decoded);
308    }
309}