risingwave_backup/
meta_snapshot_v1.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::{Display, Formatter};
17
18use bytes::{Buf, BufMut};
19use itertools::Itertools;
20use risingwave_common::util::iter_util::ZipEqFast;
21use risingwave_hummock_sdk::HummockRawObjectId;
22use risingwave_hummock_sdk::version::HummockVersion;
23use risingwave_pb::catalog::{
24    Connection, Database, Function, Index, Schema, Secret, Sink, Source, Subscription, Table, View,
25};
26use risingwave_pb::hummock::{CompactionGroup, HummockVersionStats, PbHummockVersion};
27use risingwave_pb::meta::{SystemParams, TableFragments};
28use risingwave_pb::user::UserInfo;
29
30use crate::error::{BackupError, BackupResult};
31use crate::meta_snapshot::{MetaSnapshot, Metadata};
32
33// TODO: remove `ClusterMetadata` and even the trait, after applying model v2.
34pub type MetaSnapshotV1 = MetaSnapshot<ClusterMetadata>;
35
36impl Display for ClusterMetadata {
37    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
38        writeln!(f, "default_cf:")?;
39        for (k, v) in &self.default_cf {
40            let key = String::from_utf8(k.clone()).unwrap();
41            writeln!(f, "{} {:x?}", key, v)?;
42        }
43        writeln!(f, "hummock_version:")?;
44        writeln!(f, "{:#?}", self.hummock_version)?;
45        writeln!(f, "version_stats:")?;
46        writeln!(f, "{:#?}", self.version_stats)?;
47        writeln!(f, "compaction_groups:")?;
48        writeln!(f, "{:#?}", self.compaction_groups)?;
49        writeln!(f, "database:")?;
50        writeln!(f, "{:#?}", self.database)?;
51        writeln!(f, "schema:")?;
52        writeln!(f, "{:#?}", self.schema)?;
53        writeln!(f, "table:")?;
54        writeln!(f, "{:#?}", self.table)?;
55        writeln!(f, "index:")?;
56        writeln!(f, "{:#?}", self.index)?;
57        writeln!(f, "sink:")?;
58        writeln!(f, "{:#?}", self.sink)?;
59        writeln!(f, "source:")?;
60        writeln!(f, "{:#?}", self.source)?;
61        writeln!(f, "view:")?;
62        writeln!(f, "{:#?}", self.view)?;
63        writeln!(f, "connection:")?;
64        writeln!(f, "{:#?}", self.connection)?;
65        writeln!(f, "table_fragments:")?;
66        writeln!(f, "{:#?}", self.table_fragments)?;
67        writeln!(f, "user_info:")?;
68        writeln!(f, "{:#?}", self.user_info)?;
69        writeln!(f, "function:")?;
70        writeln!(f, "{:#?}", self.function)?;
71        writeln!(f, "system_param:")?;
72        writeln!(f, "{:#?}", self.system_param)?;
73        writeln!(f, "cluster_id:")?;
74        writeln!(f, "{:#?}", self.cluster_id)?;
75        writeln!(f, "subscription:")?;
76        writeln!(f, "{:#?}", self.subscription)?;
77        writeln!(f, "secret:")?;
78        writeln!(f, "{:#?}", self.secret)?;
79        Ok(())
80    }
81}
82
83impl Metadata for ClusterMetadata {
84    fn encode_to(&self, buf: &mut Vec<u8>) -> BackupResult<()> {
85        self.encode_to(buf)
86    }
87
88    fn decode(buf: &[u8]) -> BackupResult<Self>
89    where
90        Self: Sized,
91    {
92        ClusterMetadata::decode(buf)
93    }
94
95    fn hummock_version_ref(&self) -> &HummockVersion {
96        &self.hummock_version
97    }
98
99    fn hummock_version(self) -> HummockVersion {
100        self.hummock_version
101    }
102
103    fn storage_url(&self) -> BackupResult<String> {
104        unreachable!("");
105    }
106
107    fn storage_directory(&self) -> BackupResult<String> {
108        unreachable!("");
109    }
110
111    fn table_change_log_object_ids(&self) -> HashSet<HummockRawObjectId> {
112        HashSet::default()
113    }
114}
115
116/// For backward compatibility, never remove fields and only append new field.
117#[derive(Debug, Default, Clone, PartialEq)]
118pub struct ClusterMetadata {
119    /// Unlike other metadata that has implemented `MetadataModel`,
120    /// `DEFAULT_COLUMN_FAMILY` stores various single row metadata, e.g. id offset and epoch offset.
121    /// So we use `default_cf` stores raw KVs for them.
122    pub default_cf: HashMap<Vec<u8>, Vec<u8>>,
123    pub hummock_version: HummockVersion,
124    pub version_stats: HummockVersionStats,
125    pub compaction_groups: Vec<CompactionGroup>,
126    pub database: Vec<Database>,
127    pub schema: Vec<Schema>,
128    pub table: Vec<Table>,
129    pub index: Vec<Index>,
130    pub sink: Vec<Sink>,
131    pub source: Vec<Source>,
132    pub view: Vec<View>,
133    pub table_fragments: Vec<TableFragments>,
134    pub user_info: Vec<UserInfo>,
135    pub function: Vec<Function>,
136    pub connection: Vec<Connection>,
137    pub system_param: SystemParams,
138    pub cluster_id: String,
139    pub subscription: Vec<Subscription>,
140    pub secret: Vec<Secret>,
141}
142
143impl ClusterMetadata {
144    pub fn encode_to(&self, buf: &mut Vec<u8>) -> BackupResult<()> {
145        let default_cf_keys = self.default_cf.keys().collect_vec();
146        let default_cf_values = self.default_cf.values().collect_vec();
147        Self::encode_prost_message_list(&default_cf_keys, buf);
148        Self::encode_prost_message_list(&default_cf_values, buf);
149        Self::encode_prost_message(&PbHummockVersion::from(&self.hummock_version), buf);
150        Self::encode_prost_message(&self.version_stats, buf);
151        Self::encode_prost_message_list(&self.compaction_groups.iter().collect_vec(), buf);
152        Self::encode_prost_message_list(&self.table_fragments.iter().collect_vec(), buf);
153        Self::encode_prost_message_list(&self.user_info.iter().collect_vec(), buf);
154        Self::encode_prost_message_list(&self.database.iter().collect_vec(), buf);
155        Self::encode_prost_message_list(&self.schema.iter().collect_vec(), buf);
156        Self::encode_prost_message_list(&self.table.iter().collect_vec(), buf);
157        Self::encode_prost_message_list(&self.index.iter().collect_vec(), buf);
158        Self::encode_prost_message_list(&self.sink.iter().collect_vec(), buf);
159        Self::encode_prost_message_list(&self.source.iter().collect_vec(), buf);
160        Self::encode_prost_message_list(&self.view.iter().collect_vec(), buf);
161        Self::encode_prost_message_list(&self.function.iter().collect_vec(), buf);
162        Self::encode_prost_message_list(&self.connection.iter().collect_vec(), buf);
163        Self::encode_prost_message(&self.system_param, buf);
164        Self::encode_prost_message(&self.cluster_id, buf);
165        Self::encode_prost_message_list(&self.subscription.iter().collect_vec(), buf);
166        Self::encode_prost_message_list(&self.secret.iter().collect_vec(), buf);
167        Ok(())
168    }
169
170    pub fn decode(mut buf: &[u8]) -> BackupResult<Self> {
171        let default_cf_keys: Vec<Vec<u8>> = Self::decode_prost_message_list(&mut buf)?;
172        let default_cf_values: Vec<Vec<u8>> = Self::decode_prost_message_list(&mut buf)?;
173        let default_cf = default_cf_keys
174            .into_iter()
175            .zip_eq_fast(default_cf_values.into_iter())
176            .collect();
177        let hummock_version =
178            HummockVersion::from_persisted_protobuf_owned(Self::decode_prost_message(&mut buf)?);
179        let version_stats = Self::decode_prost_message(&mut buf)?;
180        let compaction_groups: Vec<CompactionGroup> = Self::decode_prost_message_list(&mut buf)?;
181        let table_fragments: Vec<TableFragments> = Self::decode_prost_message_list(&mut buf)?;
182        let user_info: Vec<UserInfo> = Self::decode_prost_message_list(&mut buf)?;
183        let database: Vec<Database> = Self::decode_prost_message_list(&mut buf)?;
184        let schema: Vec<Schema> = Self::decode_prost_message_list(&mut buf)?;
185        let table: Vec<Table> = Self::decode_prost_message_list(&mut buf)?;
186        let index: Vec<Index> = Self::decode_prost_message_list(&mut buf)?;
187        let sink: Vec<Sink> = Self::decode_prost_message_list(&mut buf)?;
188        let source: Vec<Source> = Self::decode_prost_message_list(&mut buf)?;
189        let view: Vec<View> = Self::decode_prost_message_list(&mut buf)?;
190        let function: Vec<Function> = Self::decode_prost_message_list(&mut buf)?;
191        let connection: Vec<Connection> = Self::decode_prost_message_list(&mut buf)?;
192        let system_param: SystemParams = Self::decode_prost_message(&mut buf)?;
193        let cluster_id: String = Self::decode_prost_message(&mut buf)?;
194        let subscription: Vec<Subscription> =
195            Self::try_decode_prost_message_list(&mut buf).unwrap_or_else(|| Ok(vec![]))?;
196        let secret: Vec<Secret> =
197            Self::try_decode_prost_message_list(&mut buf).unwrap_or_else(|| Ok(vec![]))?;
198
199        Ok(Self {
200            default_cf,
201            hummock_version,
202            version_stats,
203            compaction_groups,
204            database,
205            schema,
206            table,
207            index,
208            sink,
209            source,
210            view,
211            table_fragments,
212            user_info,
213            function,
214            connection,
215            system_param,
216            cluster_id,
217            subscription,
218            secret,
219        })
220    }
221
222    fn encode_prost_message(message: &impl prost::Message, buf: &mut Vec<u8>) {
223        let encoded_message = message.encode_to_vec();
224        buf.put_u32_le(encoded_message.len() as u32);
225        buf.put_slice(&encoded_message);
226    }
227
228    fn decode_prost_message<T>(buf: &mut &[u8]) -> BackupResult<T>
229    where
230        T: prost::Message + Default,
231    {
232        let len = buf.get_u32_le() as usize;
233        let v = buf[..len].to_vec();
234        buf.advance(len);
235        T::decode(v.as_slice()).map_err(|e| BackupError::Decoding(e.into()))
236    }
237
238    fn encode_prost_message_list(messages: &[&impl prost::Message], buf: &mut Vec<u8>) {
239        buf.put_u32_le(messages.len() as u32);
240        for message in messages {
241            Self::encode_prost_message(*message, buf);
242        }
243    }
244
245    fn decode_prost_message_list<T>(buf: &mut &[u8]) -> BackupResult<Vec<T>>
246    where
247        T: prost::Message + Default,
248    {
249        let vec_len = buf.get_u32_le() as usize;
250        let mut result = vec![];
251        for _ in 0..vec_len {
252            let v: T = Self::decode_prost_message(buf)?;
253            result.push(v);
254        }
255        Ok(result)
256    }
257
258    fn try_decode_prost_message_list<T>(buf: &mut &[u8]) -> Option<BackupResult<Vec<T>>>
259    where
260        T: prost::Message + Default,
261    {
262        if buf.is_empty() {
263            return None;
264        }
265        Some(Self::decode_prost_message_list(buf))
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use risingwave_hummock_sdk::HummockVersionId;
272    use risingwave_pb::hummock::{CompactionGroup, TableStats};
273
274    use crate::meta_snapshot_v1::{ClusterMetadata, MetaSnapshotV1};
275
276    type MetaSnapshot = MetaSnapshotV1;
277
278    #[test]
279    fn test_snapshot_encoding_decoding() {
280        let mut metadata = ClusterMetadata::default();
281        metadata.hummock_version.id = HummockVersionId::new(321);
282        let raw = MetaSnapshot {
283            format_version: 0,
284            id: 123,
285            metadata,
286        };
287        let encoded = raw.encode().unwrap();
288        let decoded = MetaSnapshot::decode(&encoded).unwrap();
289        assert_eq!(raw, decoded);
290    }
291
292    #[test]
293    fn test_metadata_encoding_decoding() {
294        let mut buf = vec![];
295        let mut raw = ClusterMetadata::default();
296        raw.default_cf.insert(vec![0, 1, 2], vec![3, 4, 5]);
297        raw.hummock_version.id = HummockVersionId::new(1);
298        raw.version_stats.hummock_version_id = 10.into();
299        raw.version_stats.table_stats.insert(
300            200.into(),
301            TableStats {
302                total_key_count: 1000,
303                ..Default::default()
304            },
305        );
306        raw.compaction_groups.push(CompactionGroup {
307            id: 3000.into(),
308            ..Default::default()
309        });
310        raw.encode_to(&mut buf).unwrap();
311        let decoded = ClusterMetadata::decode(buf.as_slice()).unwrap();
312        assert_eq!(raw, decoded);
313    }
314}